EpcTools
An event based multi-threaded C++ development framework.
etevent.h
Go to the documentation of this file.
1 /*
2 * Copyright (c) 2019 Sprint
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17 #ifndef __ETEVENT_H
18 #define __ETEVENT_H
19 
20 #include <unistd.h>
21 #include <sys/syscall.h>
22 
23 #include "ebase.h"
24 #include "etbasic.h"
25 #include "eerror.h"
26 #include "egetopt.h"
27 #include "eshmem.h"
28 #include "esynch.h"
29 #include "esynch2.h"
30 #include "etime.h"
31 #include "etimer.h"
32 
35 
37 
38 DECLARE_ERROR(EThreadQueueBaseError_NotOpenForWriting);
39 DECLARE_ERROR(EThreadQueueBaseError_NotOpenForReading);
40 DECLARE_ERROR(EThreadQueueBaseError_MultipleReadersNotAllowed);
41 DECLARE_ERROR(EThreadQueuePublicError_UnInitialized);
42 
43 DECLARE_ERROR_ADVANCED(EThreadTimerError_UnableToInitialize);
44 DECLARE_ERROR_ADVANCED(EThreadTimerError_NotInitialized);
45 DECLARE_ERROR_ADVANCED(EThreadTimerError_AlreadyInitialized);
46 DECLARE_ERROR_ADVANCED(EThreadTimerError_UnableToStart);
47 DECLARE_ERROR_ADVANCED(EThreadTimerError_UnableToRegisterTimerHandler);
48 
50 
53 
59 {
60 public:
62  EThreadEventMessageDataBase() : m_msgid() {}
65  EThreadEventMessageDataBase(UInt msgid) : m_msgid(msgid) {}
68 
71  UInt getMessageId() { return m_msgid; }
75  EThreadEventMessageDataBase &setMessageId(UInt msgid) { m_msgid = msgid; return *this; }
79  virtual pVoid getVoidPtr() = 0;
83  virtual Void setVoidPtr(pVoid p) = 0;
84 
89  ETimer &getTimer() { return m_timer; }
90 
91 private:
92  ETimer m_timer;
93  UInt m_msgid;
94 };
95 
98 {
99 public:
101  typedef union {
102  pVoid voidptr;
103  LongLong int64;
104  Long int32[sizeof(pVoid) / sizeof(Long)];
105  Short int16[sizeof(pVoid) / sizeof(Short)];
106  Char int8[sizeof(pVoid) / sizeof(Char)];
107  ULongLong uint64;
108  ULong uint32[sizeof(pVoid) / sizeof(ULong)];
109  UShort uint16[sizeof(pVoid) / sizeof(UShort)];
110  UChar uint8[sizeof(pVoid) / sizeof(UChar)];
111  } DataUnion;
113 
118  EThreadEventMessageData(UInt msgid) : EThreadEventMessageDataBase(msgid), m_data() {}
122  EThreadEventMessageData(UInt msgid, EThreadEventMessageData &data) : EThreadEventMessageDataBase(msgid), m_data(data.data()) {}
126  EThreadEventMessageData(UInt msgid, pVoid v) : EThreadEventMessageDataBase(msgid), m_data{.voidptr=v} {}
130  EThreadEventMessageData(UInt msgid, LongLong v) : EThreadEventMessageDataBase(msgid), m_data{.int64=v} {}
134  EThreadEventMessageData(UInt msgid, ULongLong v) : EThreadEventMessageDataBase(msgid), m_data{.uint64=v} {}
139  EThreadEventMessageData(UInt msgid, Long v1, Long v2) : EThreadEventMessageDataBase(msgid), m_data{.int32={v1,v2}} {}
144  EThreadEventMessageData(UInt msgid, ULong v1, ULong v2) : EThreadEventMessageDataBase(msgid), m_data{.uint32={v1,v2}} {}
151  EThreadEventMessageData(UInt msgid, Short v1, Short v2, Short v3, Short v4) : EThreadEventMessageDataBase(msgid), m_data{.int16={v1,v2,v3,v4}} {}
158  EThreadEventMessageData(UInt msgid, UShort v1, UShort v2, UShort v3, UShort v4) : EThreadEventMessageDataBase(msgid), m_data{.uint16={v1,v2,v3,v4}} {}
169  EThreadEventMessageData(UInt msgid, Char v1, Char v2, Char v3, Char v4, Char v5, Char v6, Char v7, Char v8) : EThreadEventMessageDataBase(msgid), m_data{.int8={v1,v2,v3,v4,v5,v6,v7,v8}} {}
180  EThreadEventMessageData(UInt msgid, UChar v1, UChar v2, UChar v3, UChar v4, UChar v5, UChar v6, UChar v7, UChar v8) : EThreadEventMessageDataBase(msgid), m_data{.uint8={v1,v2,v3,v4,v5,v6,v7,v8}} {}
183 
186  DataUnion &data() { return m_data; }
187 
190  pVoid getVoidPtr() { return m_data.voidptr; }
194  Void setVoidPtr(pVoid p) { m_data.voidptr = p; }
195 
196 private:
197  DataUnion m_data;
198 };
199 
204 {
205 public:
208 };
209 
211 template <class T>
213 {
214 public:
216  EThreadEventMessageBase() : m_data() {}
219  EThreadEventMessageBase(const T &data) : m_data( data ) {}
224  T &operator=(const T &data)
225  {
226  m_data = data;
227  return *this;
228  }
231  T &data()
232  {
233  return m_data;
234  }
237  UInt getMessageId() { return m_data.getMessageId(); }
241  EThreadEventMessageBase &setMessageId(UInt msgid) { m_data.setMessageid( msgid ); return *this; }
244  pVoid getVoidPtr() { return m_data.getVoidPtr(); }
248  Void setVoidPtr(pVoid p) { m_data.setVoidPtr(p); }
253  ETimer &getTimer() { return m_data.getTimer(); }
254 
255 private:
256  T m_data;
257 };
258 
260 
265 {
266 public:
279  EThreadMessage(UInt msgid, pVoid v) : _EThreadMessage(EThreadEventMessageData(msgid, v)) {}
283  EThreadMessage(UInt msgid, LongLong v) : _EThreadMessage(EThreadEventMessageData(msgid, v)) {}
287  EThreadMessage(UInt msgid, ULongLong v) : _EThreadMessage(EThreadEventMessageData(msgid, v)) {}
292  EThreadMessage(UInt msgid, Long v1, Long v2=0) : _EThreadMessage(EThreadEventMessageData(msgid, v1, v2)) {}
297  EThreadMessage(UInt msgid, ULong v1, ULong v2=0) : _EThreadMessage(EThreadEventMessageData(msgid, v1, v2)) {}
304  EThreadMessage(UInt msgid, Short v1, Short v2=0, Short v3=0, Short v4=0) : _EThreadMessage(EThreadEventMessageData(msgid, v1, v2, v3, v4)) {}
311  EThreadMessage(UInt msgid, UShort v1, UShort v2=0, UShort v3=0, UShort v4=0) : _EThreadMessage(EThreadEventMessageData(msgid, v1, v2, v3, v4)) {}
322  EThreadMessage(UInt msgid, Char v1, Char v2=0, Char v3=0, Char v4=0, Char v5=0, Char v6=0, Char v7=0, Char v8=0)
323  : _EThreadMessage(EThreadEventMessageData(msgid, v1, v2, v3, v4, v5, v6, v7, v8)) {}
334  EThreadMessage(UInt msgid, UChar v1, UChar v2=0, UChar v3=0, UChar v4=0, UChar v5=0, UChar v6=0, UChar v7=0, UChar v8=0)
335  : _EThreadMessage(EThreadEventMessageData(msgid, v1, v2, v3, v4, v5, v6, v7, v8)) {}
337  virtual ~EThreadMessage() {}
338 };
339 
342 
345 {
347  ReadOnly,
349  WriteOnly,
351  ReadWrite
352 };
353 
358 template <class T>
360 {
361 public:
364  Int queueSize() const { return msgCnt(); }
365 
372  Bool push(const T &msg, Bool wait = True)
373  {
374  if (m_mode == EThreadQueueMode::ReadOnly)
375  throw EThreadQueueBaseError_NotOpenForWriting();
376 
377  if (!semFree().Decrement(wait))
378  return False;
379 
380  {
381  EMutexLock l(mutex());
382 
383  data()[msgHead()] = msg;
384  data()[msgHead()].data().getTimer().Start();
385 
386  msgHead()++;
387 
388  if (msgHead() >= msgCnt())
389  msgHead() = 0;
390  }
391 
392  semMsgs().Increment();
393 
394  return True;
395  }
401  Bool pop(T &msg, Bool wait = True)
402  {
403  if (m_mode == EThreadQueueMode::WriteOnly)
404  throw EThreadQueueBaseError_NotOpenForReading();
405 
406  if (!semMsgs().Decrement(wait))
407  return False;
408 
409  {
410  EMutexLock l(mutex(),False);
411 
412  if (multipleReaders())
413  l.acquire();
414 
415  msg = data()[msgTail()++];
416 
417  if (msgTail() >= msgCnt())
418  msgTail() = 0;
419 
420  semFree().Increment();
421  }
422 
423  return True;
424  }
425 
428  Bool isInitialized() { return m_initialized; }
431  EThreadQueueMode mode() { return m_mode; }
432 
433 protected:
435 
436  virtual Bool isPublic() = 0;
437  virtual Int &msgCnt() = 0;
438  virtual Int &msgHead() = 0;
439  virtual Int &msgTail() = 0;
440  virtual Bool &multipleReaders() = 0;
441  virtual Bool &multipleWriters() = 0;
442  virtual Int &numReaders() = 0;
443  virtual Int &numWriters() = 0;
444  virtual Int &refCnt() = 0;
445  virtual T *data() = 0;
446  virtual Void allocDataSpace(cpStr sFile, Char cId, Int nSize) = 0;
447  virtual Void initMutex() = 0;
448  virtual Void initSemFree(UInt initialCount) = 0;
449  virtual Void initSemMsgs(UInt initialCount) = 0;
450 
451  virtual EMutexData &mutex() = 0;
452  virtual ESemaphoreData &semMsgs() = 0;
453  virtual ESemaphoreData &semFree() = 0;
454 
455  virtual int *getBumpPipe() = 0;
456 
458  {
459  m_initialized = False;
460  }
462  {
463  }
464 
465  Void init(Int nMsgCnt, Int threadId, Bool bMultipleWriters, EThreadQueueMode eMode, Bool bMultipleReaders = False)
466  {
467  m_mode = eMode;
468 
469  // construct the shared memory name
470  Char szName[EPC_FILENAME_MAX];
471  epc_sprintf_s(szName, sizeof(szName), "%d", threadId);
472 
473  // calcuate the space required
474  int nSize = sizeof(T) * nMsgCnt;
475 
476  // initialize the shared memory
477  allocDataSpace(szName, 'A', nSize);
478 
479  // initialize the control block values
480  if (refCnt() == 0)
481  {
482  refCnt() = 0;
483  numReaders() = 0;
484  numWriters() = 0;
485  multipleReaders() = bMultipleReaders;
486  multipleWriters() = bMultipleWriters;
487  msgCnt() = nMsgCnt;
488  msgHead() = 0;
489  msgTail() = 0;
490 
491  // initialize the control mutex and semaphores
492  initMutex();
493  initSemFree(msgCnt());
494  initSemMsgs(0);
495  }
496 
497  attach(eMode);
498 
499  m_initialized = True;
500  }
501 
502  Void attach(EThreadQueueMode eMode)
503  {
504  EMutexLock l(mutex());
505 
506  if (!multipleReaders() && numReaders() > 0 &&
508  {
509  throw EThreadQueueBaseError_MultipleReadersNotAllowed();
510  }
511 
512  refCnt()++;
513  numReaders() += (eMode == EThreadQueueMode::ReadOnly || eMode == EThreadQueueMode::ReadWrite) ? 1 : 0;
514  numWriters() += (eMode == EThreadQueueMode::WriteOnly || eMode == EThreadQueueMode::ReadWrite) ? 1 : 0;
515  }
516 
517  Void destroy()
518  {
519  Bool destroyMutex = False;
520 
521  if (m_initialized)
522  {
523  EMutexLock l(mutex());
524 
525  if (refCnt() == 1)
526  {
527  semFree().destroy();
528  semMsgs().destroy();
529 
530  destroyMutex = True;
531 
532  m_initialized = False;
533  }
534  else
535  {
536  refCnt()--;
537  numReaders() -= (m_mode == EThreadQueueMode::ReadOnly || m_mode == EThreadQueueMode::ReadWrite) ? 1 : 0;
538  numWriters() -= (m_mode == EThreadQueueMode::WriteOnly || m_mode == EThreadQueueMode::ReadWrite) ? 1 : 0;
539  }
540  }
541 
542  if (destroyMutex)
543  mutex().destroy();
544  }
546 
547 private:
548  Bool _push(const _EThreadEventMessageBase &msg, Bool wait)
549  {
550  return push( (const T &)msg, wait );
551  }
552 
553  Bool m_initialized;
554  EThreadQueueMode m_mode;
555 };
556 
559 
565 template <class T>
567 {
568  template <class TQueue, class TMessage> friend class EThreadEvent;
569  template <class TQueue, class TMessage> friend class EThreadEventWorker;
570  template <class TQueue, class TMessage, class TWorker> friend class EThreadEventWorkGroup;
571 public:
574  {
575  m_pCtrl = NULL;
576  m_pData = NULL;
577  }
580  {
582  }
591  Void init(Int nMsgCnt, Int threadId, Bool bMultipleWriters,
592  EThreadQueueMode eMode)
593  {
594  EThreadQueueBase<T>::init(nMsgCnt, threadId, bMultipleWriters, eMode);
595  }
596 
597 protected:
599  Void init(Int nMsgCnt, Int threadId, Bool bMultipleWriters,
600  EThreadQueueMode eMode, Bool bMultipleReaders)
601  {
602  EThreadQueueBase<T>::init(nMsgCnt, threadId, bMultipleWriters, eMode, bMultipleReaders);
603  }
604 
605  Bool isPublic() { return True; }
606  Int &msgCnt() { return m_pCtrl->m_msgCnt; }
607  Int &msgHead() { return m_pCtrl->m_head; }
608  Int &msgTail() { return m_pCtrl->m_tail; }
609  Bool &multipleReaders() { return m_pCtrl->m_multipleReaders; }
610  Bool &multipleWriters() { return m_pCtrl->m_multipleWriters; }
611  Int &numReaders() { return m_pCtrl->m_numReaders; }
612  Int &numWriters() { return m_pCtrl->m_numWriters; }
613  Int &refCnt() { return m_pCtrl->m_refCnt; }
614  T *data() { return m_pData; }
615  Void allocDataSpace(cpStr sFile, Char cId, Int nSize)
616  {
617  m_sharedmem.init(sFile, cId, nSize + sizeof(ethreadmessagequeue_ctrl_t));
618  m_pCtrl = (ethreadmessagequeue_ctrl_t *)m_sharedmem.getDataPtr();
619  m_pData = (T *)(((pChar)m_sharedmem.getDataPtr()) + sizeof(ethreadmessagequeue_ctrl_t));
620  }
621  Void initMutex()
622  {
623  EMutexPublic m;
624  m_pCtrl->m_mutexid = m.mutexId();
625  m.detach();
626  }
627  Void initSemFree(UInt initialCount)
628  {
629  ESemaphorePublic s(initialCount);
630  m_pCtrl->m_freeSemId = s.semIndex();
631  s.detach();
632  }
633  Void initSemMsgs(UInt initialCount)
634  {
635  ESemaphorePublic s(initialCount);
636  m_pCtrl->m_msgsSemId = s.semIndex();
637  s.detach();
638  }
639 
640  EMutexData &mutex() { return ESynchObjects::getMutex(m_pCtrl->m_mutexid); }
641  ESemaphoreData &semFree() { return ESynchObjects::getSemaphore(m_pCtrl->m_freeSemId); }
642  ESemaphoreData &semMsgs() { return ESynchObjects::getSemaphore(m_pCtrl->m_msgsSemId); }
643 
644  int *getBumpPipe() { return m_pCtrl->m_bumppipe; }
646 
647 private:
648  typedef struct
649  {
650  Int m_refCnt;
651  Int m_numReaders;
652  Int m_numWriters;
653  Bool m_multipleReaders;
654  Bool m_multipleWriters;
655 
656  Int m_msgCnt;
657  Int m_head; // next location to write
658  Int m_tail; // next location to read
659 
660  Int m_mutexid;
661  Int m_freeSemId;
662  Int m_msgsSemId;
663 
664  int m_bumppipe[2];
665  } ethreadmessagequeue_ctrl_t;
666 
667  ESharedMemory m_sharedmem;
668  ethreadmessagequeue_ctrl_t *m_pCtrl;
669  T *m_pData;
670 };
671 
674 
680 template <class T>
682 {
683  template <class TQueue, class TMessage> friend class EThreadEvent;
684  template <class TQueue, class TMessage> friend class EThreadEventWorker;
685  template <class TQueue, class TMessage, class TWorker> friend class EThreadEventWorkGroup;
686 public:
689  : m_mutex(False), m_semFree(0, False), m_semMsgs(0, False)
690  {
691  m_refCnt = 0;
692  m_numReaders = 0;
693  m_numWriters = 0;
694  m_multipleReaders = False;
695  m_multipleWriters = False;
696 
697  m_msgCnt = 0;
698  m_head = 0; // next location to write
699  m_tail = 0; // next location to read
700  m_pData = NULL;
701  }
704  {
706  if (m_pData && refCnt() == 1)
707  delete[](pChar) m_pData;
708  m_pData = NULL;
709  }
718  Void init(Int nMsgCnt, Int threadId, Bool bMultipleWriters,
719  EThreadQueueMode eMode)
720  {
721  EThreadQueueBase<T>::init(nMsgCnt, threadId, bMultipleWriters, eMode);
722  }
723 
724 protected:
726  Void init(Int nMsgCnt, Int threadId, Bool bMultipleWriters,
727  EThreadQueueMode eMode, Bool bMultipleReaders)
728  {
729  EThreadQueueBase<T>::init(nMsgCnt, threadId, bMultipleWriters, eMode, bMultipleReaders);
730  }
731 
732  Bool isPublic() { return False; }
733  Int &msgCnt() { return m_msgCnt; }
734  Int &msgHead() { return m_head; }
735  Int &msgTail() { return m_tail; }
736  Bool &multipleReaders() { return m_multipleReaders; }
737  Bool &multipleWriters() { return m_multipleWriters; }
738  Int &numReaders() { return m_numReaders; }
739  Int &numWriters() { return m_numWriters; }
740  Int &refCnt() { return m_refCnt; }
741  T *data() { return m_pData; }
742  Void allocDataSpace(cpStr sFile, Char cId, Int nSize)
743  {
744  if (!m_pData)
745  {
746  m_pData = (EThreadMessage *)new Char[nSize];
747  memset((pChar)m_pData, 0, nSize);
748  }
749  }
750  Void initMutex()
751  {
752  m_mutex.init();
753  }
754  Void initSemFree(UInt initialCount)
755  {
756  m_semFree.init(initialCount);
757  }
758  Void initSemMsgs(UInt initialCount)
759  {
760  m_semMsgs.init(initialCount);
761  }
762 
763  EMutexData &mutex() { return m_mutex; }
764  ESemaphoreData &semFree() { return m_semFree; }
765  ESemaphoreData &semMsgs() { return m_semMsgs; }
766 
767  int *getBumpPipe() { return m_bumppipe; }
769 
770 private:
771  Int m_refCnt;
772  Int m_numReaders;
773  Int m_numWriters;
774  Bool m_multipleReaders;
775  Bool m_multipleWriters;
776 
777  Int m_msgCnt;
778  Int m_head; // next location to write
779  Int m_tail; // next location to read
780 
781  EMutexPrivate m_mutex;
782  ESemaphorePrivate m_semFree;
783  ESemaphorePrivate m_semMsgs;
784 
785  int m_bumppipe[2];
786 
787  T *m_pData;
788 };
789 
792 
794 #define EM_INIT 1
795 #define EM_QUIT 2
797 #define EM_SUSPEND 3
799 #define EM_TIMER 4
801 #define EM_SOCKETSELECT_READ 5
803 #define EM_SOCKETSELECT_WRITE 6
805 #define EM_SOCKETSELECT_ERROR 7
807 #define EM_SOCKETSELECT_EXCEPTION 8
809 #define EM_USER 10000
811 
814 
816 class _EThreadEventNotification
817 {
818 public:
819  virtual Bool _sendTimerExpiration(const _EThreadEventMessageBase &msg, Bool wait = True) = 0;
820  virtual Bool _sendThreadMessage(const _EThreadEventMessageBase &msg, Bool wait = True) = 0;
821 };
823 
829 {
830  friend class EThreadEventTimerHandler;
831  template <class TQueue, class TMessage> friend class EThreadEvent;
832  template <class TQueue, class TMessage, class TWorker> friend class EThreadEventWorkGroup;
834 protected:
835  Void init(_EThreadEventNotification *notify, _EThreadEventMessageBase *msg)
836  {
837  if (isInitialized())
838  throw EThreadTimerError_AlreadyInitialized();
839 
840  m_notify = notify;
841  m_msg = msg;
842 
843  struct sigevent sev = {};
844  sev.sigev_notify = SIGEV_SIGNAL;
845  sev.sigev_signo = SIGRTMIN;
846  sev.sigev_value.sival_ptr = this;
847  if (timer_create(CLOCK_REALTIME, &sev, &m_timer) == -1)
848  throw EThreadTimerError_UnableToInitialize();
849  m_initialized = True;
850  }
852 
853 public:
856  {
857  m_initialized = False;
858  // assign the id
859  m_id = atomic_inc(m_nextid);
860  // m_thread = NULL;
861  m_notify = NULL;
862  m_msg = NULL;
863  m_interval = 0;
864  m_oneshot = True;
865  m_timer = NULL;
866  }
870  EThreadEventTimer(Long milliseconds, Bool oneshot = False)
871  {
872  m_initialized = False;
873  // assign the id
874  m_id = atomic_inc(m_nextid);
875  // m_thread = NULL;
876  m_notify = NULL;
877  m_msg = NULL;
878  m_interval = milliseconds;
879  m_oneshot = oneshot;
880  m_timer = NULL;
881  }
883  {
884  destroy();
885  }
889  Void destroy()
890  {
891  if (isInitialized())
892  {
893  stop();
894  timer_delete(m_timer);
895  m_timer = NULL;
896  m_initialized = False;
897  }
898  if (m_msg)
899  {
900  delete m_msg;
901  m_msg = NULL;
902  }
903  m_notify = NULL;
904  }
908  Void start()
909  {
910  if (!isInitialized())
911  throw EThreadTimerError_NotInitialized();
912 
913  struct itimerspec its;
914  its.it_value.tv_sec = m_interval / 1000; // seconds
915  its.it_value.tv_nsec = (m_interval % 1000) * 1000000; // nano-seconds
916  its.it_interval.tv_sec = m_oneshot ? 0 : its.it_value.tv_sec;
917  its.it_interval.tv_nsec = m_oneshot ? 0 : its.it_value.tv_nsec;
918  if (timer_settime(m_timer, 0, &its, NULL) == -1)
919  throw EThreadTimerError_UnableToStart();
920  }
922  Void stop()
923  {
924  if (isInitialized())
925  {
926  struct itimerspec its;
927  its.it_value.tv_sec = 0; // seconds
928  its.it_value.tv_nsec = 0; // nano-seconds
929  its.it_interval.tv_sec = its.it_value.tv_sec;
930  its.it_interval.tv_nsec = its.it_value.tv_nsec;
931  timer_settime(m_timer, 0, &its, NULL);
932  }
933  }
935  Long getInterval() { return m_interval; }
938  Void setInterval(LongLong interval) { m_interval = interval; }
945  Void setInterval(const ETime &t)
946  {
947  ETime dur = t >= ETime::Now() ? t - ETime::Now() : t;
948  m_interval = dur.getTimeVal().tv_sec * 1000 + dur.getTimeVal().tv_usec / 1000;
949  }
952  Void setOneShot(Bool oneshot) { m_oneshot = oneshot; }
957  Long getId() { return m_id; }
962  Bool isInitialized() { return m_initialized; }
963 
964 protected:
966  static void _timerHandler(int signo, siginfo_t *pinfo, void *pcontext)
967  {
968  EThreadEventTimer *timer = (EThreadEventTimer*)pinfo->si_value.sival_ptr;
969  if (timer)
970  timer->m_notify->_sendTimerExpiration(*timer->m_msg);
971  }
973 
974 private:
975  static Long m_nextid;
976 
977  Bool m_initialized;
978  Long m_id;
979  _EThreadEventNotification *m_notify;
981  Bool m_oneshot;
982  LongLong m_interval;
983  timer_t m_timer;
984 };
985 
987 class EThreadEventTimerHandler : EStatic
988 {
989 public:
990  EThreadEventTimerHandler() {}
991  ~EThreadEventTimerHandler() {}
992 
993  virtual Int getInitType() { return STATIC_INIT_TYPE_THREADS; }
994  Void init(EGetOpt &options)
995  {
996  struct sigaction sa;
997  sa.sa_flags = SA_SIGINFO;
998  sa.sa_sigaction = EThreadEventTimer::_timerHandler;
999  sigemptyset(&sa.sa_mask);
1000  int signo = SIGRTMIN;
1001  if (sigaction(signo, &sa, NULL) == -1)
1002  throw EThreadTimerError_UnableToRegisterTimerHandler();
1003  }
1004  Void uninit()
1005  {
1006  }
1007 };
1009 
1012 
1015 #define DECLARE_MESSAGE_MAP() \
1016 protected: \
1017  static const msgmap_t *GetThisMessageMap(); \
1018  virtual const msgmap_t *GetMessageMap() const;
1019 
1025 
1026 
1027 #define BEGIN_MESSAGE_MAP(theClass, baseClass) \
1028  const theClass::msgmap_t *theClass::GetMessageMap() const \
1029  { \
1030  return GetThisMessageMap(); \
1031  } \
1032  const theClass::msgmap_t *theClass::GetThisMessageMap() \
1033  { \
1034  typedef baseClass TheBaseClass; \
1035  _Pragma("GCC diagnostic push") \
1036  _Pragma("GCC diagnostic ignored \"-Wpmf-conversions\"") \
1037  static const msgentry_t _msgEntries[] = \
1038  {
1039 
1041 #define ON_MESSAGE(id, memberFxn) \
1042  {id, (msgfxn_t)&memberFxn},
1043 
1045 #define END_MESSAGE_MAP() \
1046  {0, (msgfxn_t)NULL} \
1047  }; \
1048  _Pragma("GCC diagnostic pop") \
1049  static const msgmap_t msgMap = \
1050  {&TheBaseClass::GetThisMessageMap, &_msgEntries[0]}; \
1051  return &msgMap; \
1052  }
1053 
1061 template <class TQueue, class TMessage>
1062 class EThreadEvent : public EThreadBasic, public _EThreadEventNotification
1063 {
1064 public:
1066  typedef Void (EThreadEvent::*msgfxn_t)(TMessage &);
1067  typedef struct
1068  {
1069  UInt nMessage; // message
1070  msgfxn_t pFn; // routine to call (or special value)
1071  } msgentry_t;
1072 
1073  struct msgmap_t
1074  {
1075  const msgmap_t *(*pfnGetBaseMap)();
1076  const msgentry_t *lpEntries;
1077  };
1079 
1082  : EThreadBasic(),
1083  m_tid(-1),
1084  m_arg(NULL),
1085  m_stacksize(0),
1086  m_suspendCnt(0),
1087  m_suspendSem(0)
1088  {
1089  }
1092  {
1093  }
1094 
1101  Bool sendMessage(UInt message, Bool wait = True)
1102  {
1103  TMessage msg(message);
1104  Bool result = m_queue.push(msg, wait);
1105  if (result)
1106  onMessageQueued(msg);
1107  return result;
1108  }
1116  Bool sendMessage(UInt message, pVoid voidptr, Bool wait = True)
1117  {
1118  TMessage msg(message);
1119  msg.setVoidPtr(voidptr);
1120  Bool result = m_queue.push(msg, wait);
1121  if (result)
1122  onMessageQueued(msg);
1123  return result;
1124  }
1130  Bool sendMessage(const TMessage &msg, Bool wait = True)
1131  {
1132  Bool result = m_queue.push(msg, wait);
1133  if (result)
1134  onMessageQueued(msg);
1135  return result;
1136  }
1137 
1146  virtual Void init(Short appId, UShort threadId, pVoid arg, Int queueSize = 16384, Bool suspended = False, Dword stackSize = 0)
1147  {
1148  m_appId = appId;
1149  m_threadId = threadId;
1150  m_queueSize = queueSize;
1151  m_stacksize = stackSize;
1152 
1153  long id = m_appId * 100000 + 10000 + m_threadId;
1154 
1155  m_queue.init(m_queueSize, id, True, EThreadQueueMode::ReadWrite);
1156 
1157  if (!suspended)
1158  start();
1159  }
1160 
1162  Void quit()
1163  {
1164  sendMessage(EM_QUIT);
1165  }
1167  Void start()
1168  {
1169  if (!isInitialized())
1170  EThreadBasic::init(m_arg, m_stacksize);
1171  }
1178  Void suspend()
1179  {
1180  if (atomic_inc(m_suspendCnt) == 1)
1181  sendMessage(EM_SUSPEND);
1182  }
1184  Void resume()
1185  {
1186  if (atomic_dec(m_suspendCnt) == 0)
1187  m_suspendSem.Increment();
1188  }
1190  virtual Void onInit()
1191  {
1192  }
1194  virtual Void onQuit()
1195  {
1196  }
1198  virtual Void onSuspend()
1199  {
1200  }
1203  virtual Void onTimer(EThreadEventTimer *ptimer)
1204  {
1205  }
1209  {
1210  TMessage *msg = new TMessage(EM_TIMER);
1211  msg->setVoidPtr(&t);
1212  t.init(this, msg);
1213  }
1216  {
1217  return m_queue.semMsgs();
1218  }
1219 
1220 protected:
1222  virtual const msgmap_t *GetMessageMap() const
1223  {
1224  return GetThisMessageMap();
1225  }
1226  static const msgmap_t *GetThisMessageMap()
1227  {
1228  return NULL;
1229  }
1230  int *getBumpPipe()
1231  {
1232  return m_queue.getBumpPipe();
1233  }
1234  Bool _sendTimerExpiration(const _EThreadEventMessageBase &msg, Bool wait = True)
1235  {
1236  Bool result = m_queue.push(static_cast<const TMessage &>(msg), wait);
1237  if (result)
1238  onMessageQueued(static_cast<const TMessage &>(msg));
1239  return result;
1240  }
1241  Bool _sendThreadMessage(const _EThreadEventMessageBase &msg, Bool wait = True)
1242  {
1243  Bool result = m_queue.push(static_cast<const TMessage &>(msg), wait);
1244  if (result)
1245  onMessageQueued(static_cast<const TMessage &>(msg));
1246  return result;
1247  }
1249 
1254  virtual Void onMessageQueued(const TMessage &msg)
1255  {
1256  }
1269  Bool pumpMessage(TMessage &msg, Bool wait = true)
1270  {
1271  Bool bMsg = m_queue.pop(msg, wait);
1272  if (bMsg)
1273  dispatch(msg);
1274 
1275  return bMsg;
1276  }
1283  virtual Void pumpMessages()
1284  {
1285  TMessage msg;
1286 
1287  onInit();
1288 
1289  try
1290  {
1291  while (True)
1292  {
1293  if (pumpMessage(msg))
1294  {
1295  if (msg.getMessageId() == EM_QUIT)
1296  break;
1297  if (msg.getMessageId() == EM_SUSPEND)
1298  m_suspendSem.Decrement();
1299  }
1300  }
1301  }
1302  catch (EError &e)
1303  {
1304  throw;
1305  }
1306  catch (...)
1307  {
1308  throw;
1309  }
1310  }
1316  virtual Void defaultMessageHandler(TMessage &msg)
1317  {
1318  }
1321  pid_t getThreadId()
1322  {
1323  if (m_tid == -1)
1324  m_tid = syscall(SYS_gettid);
1325  return m_tid;
1326  }
1327 
1328 private:
1329  Dword threadProc(pVoid arg)
1330  {
1331  pumpMessages();
1332  return 0;
1333  }
1334 
1335  Bool dispatch(TMessage &msg)
1336  {
1337  Bool keepgoing = True;
1338  const msgmap_t *pMap;
1339  const msgentry_t *pEntries;
1340 
1341  if (msg.getMessageId() >= EM_USER)
1342  {
1343  // interate through each map
1344  for (pMap = GetMessageMap(); keepgoing && pMap && pMap->pfnGetBaseMap != NULL; pMap = (*pMap->pfnGetBaseMap)())
1345  {
1346  // interate through each entry for the map
1347  for (pEntries = pMap->lpEntries; pEntries->nMessage; pEntries++)
1348  {
1349  if (pEntries->nMessage == msg.getMessageId())
1350  {
1351  (this->*pEntries->pFn)(msg);
1352  keepgoing = False;
1353  break;
1354  }
1355  }
1356  }
1357 
1358  if (pMap == NULL)
1359  defaultMessageHandler(msg);
1360  }
1361  else
1362  {
1363  switch (msg.getMessageId())
1364  {
1365  case EM_INIT: { onInit(); break; }
1366  case EM_QUIT: { onQuit(); break; }
1367  case EM_SUSPEND: { onSuspend(); break; }
1368  case EM_TIMER: { onTimer( (EThreadEventTimer*)msg.getVoidPtr() ); break; }
1369  default: { break; }
1370  }
1371  }
1372 
1373  return keepgoing;
1374  }
1375 
1376  pid_t m_tid;
1377  pVoid m_arg;
1378  size_t m_stacksize;
1379  Int m_suspendCnt;
1380  ESemaphorePrivate m_suspendSem;
1381 
1382  Short m_appId;
1383  UShort m_threadId;
1384  Int m_queueSize;
1385  TQueue m_queue;
1386 };
1387 
1390 
1393 
1395 class EThreadEventWorkerBase
1396 {
1397 public:
1398  // typedef Void (*msgfxn_t)(TMessage &);
1399  typedef Void (EThreadEventWorkerBase::*msgfxnvoid_t)();
1400  typedef struct
1401  {
1402  UInt nMessage; // message
1403  msgfxnvoid_t pFn; // routine to call (or special value)
1404  } msgentry_t;
1405 
1406  struct msgmap_t
1407  {
1408  // const _msgmap_t *(*pfnGetBaseMap)();
1409  const msgmap_t *(*pfnGetBaseMap)();
1410  const msgentry_t *lpEntries;
1411  };
1412 };
1414 
1415 #define BEGIN_MESSAGE_MAP2(theClass, baseClass) \
1416  virtual const EThreadEventWorkerBase::msgmap_t *GetMessageMap() const \
1417  { \
1418  return GetThisMessageMap(); \
1419  } \
1420  static const EThreadEventWorkerBase::msgmap_t *GetThisMessageMap() \
1421  { \
1422  typedef baseClass TheBaseClass; \
1423  _Pragma("GCC diagnostic push") \
1424  _Pragma("GCC diagnostic ignored \"-Wpmf-conversions\"") \
1425  static const EThreadEventWorkerBase::msgentry_t _msgEntries[] = \
1426  {
1427 
1429 #define ON_MESSAGE2(id, memberFxn) \
1430  {id, (EThreadEventWorkerBase::msgfxnvoid_t)&memberFxn},
1431 
1433 #define END_MESSAGE_MAP2() \
1434  {0, (EThreadEventWorkerBase::msgfxnvoid_t)NULL} \
1435  }; \
1436  _Pragma("GCC diagnostic pop") \
1437  static const EThreadEventWorkerBase::msgmap_t msgMap = \
1438  {&TheBaseClass::GetThisMessageMap, &_msgEntries[0]}; \
1439  return &msgMap; \
1440  }
1441 
1443 template <class TQueue, class TMessage>
1444 class EThreadEventWorker : public EThreadBasic, public EThreadEventWorkerBase
1445 {
1446  template <class T1, class T2, class T3> friend class EThreadEventWorkGroup;
1447 public:
1448  Int workerId() const
1449  {
1450  return m_workerid;
1451  }
1452 
1454  virtual Void onInit()
1455  {
1456  }
1458  virtual Void onQuit()
1459  {
1460  }
1463  virtual Void onTimer(EThreadEventTimer *ptimer)
1464  {
1465  }
1468  {
1469  return m_queue->semMsgs();
1470  }
1471 
1478  Bool sendMessage(UInt message, Bool wait = True)
1479  {
1480  TMessage msg(message);
1481  Bool result = m_queue->push(msg, wait);
1482  if (result)
1483  onMessageQueued(msg);
1484  return result;
1485  }
1493  Bool sendMessage(UInt message, pVoid voidptr, Bool wait = True)
1494  {
1495  TMessage msg(message);
1496  msg.setVoidPtr(voidptr);
1497  Bool result = m_queue->push(msg, wait);
1498  if (result)
1499  onMessageQueued(msg);
1500  return result;
1501  }
1507  Bool sendMessage(const TMessage &msg, Bool wait = True)
1508  {
1509  Bool result = m_queue->push(msg, wait);
1510  if (result)
1511  onMessageQueued(msg);
1512  return result;
1513  }
1514 
1515 protected:
1518  : EThreadBasic(),
1519  m_workerid(0),
1520  m_queue(nullptr),
1521  m_arg(nullptr),
1522  m_stacksize(0),
1523  m_tid(-1)
1524  {
1525  }
1528  {
1529  }
1530 
1536  virtual Void onMessageQueued(const TMessage &msg)
1537  {
1538  }
1539 
1546  virtual Void init(TQueue &queue, Int workerid, pVoid arg, Dword stackSize = 0)
1547  {
1548  m_queue = &queue;
1549  m_workerid = workerid;
1550  m_stacksize = stackSize;
1551  m_arg = arg;
1552  m_queue->attach(EThreadQueueMode::ReadWrite);
1553  start();
1554  }
1555 
1557  Void start()
1558  {
1559  if (!isInitialized())
1560  EThreadBasic::init(m_arg, m_stacksize);
1561  }
1562 
1564  virtual const msgmap_t *GetMessageMap() const
1565  {
1566  return GetThisMessageMap();
1567  }
1568  static const msgmap_t *GetThisMessageMap()
1569  {
1570  return NULL;
1571  }
1573 
1586  Bool pumpMessage(TMessage &msg, Bool wait = true)
1587  {
1588  Bool bMsg = m_queue->pop(msg, wait);
1589  if (bMsg)
1590  dispatch(msg);
1591 
1592  return bMsg;
1593  }
1600  virtual Void pumpMessages()
1601  {
1602  TMessage msg;
1603 
1604  onInit();
1605 
1606  try
1607  {
1608  while (True)
1609  {
1610  if (pumpMessage(msg))
1611  {
1612  if (msg.getMessageId() == EM_QUIT)
1613  break;
1614  }
1615  }
1616  }
1617  catch (EError &e)
1618  {
1619  throw;
1620  }
1621  catch (...)
1622  {
1623  throw;
1624  }
1625  }
1631  virtual Void defaultMessageHandler(TMessage &msg)
1632  {
1633  }
1636  pid_t getThreadId()
1637  {
1638  if (m_tid == -1)
1639  m_tid = syscall(SYS_gettid);
1640  return m_tid;
1641  }
1642 
1643 private:
1644  Dword threadProc(pVoid arg)
1645  {
1646  pumpMessages();
1647  return 0;
1648  }
1649 
1650  Bool dispatch(TMessage &msg)
1651  {
1652  Bool keepgoing = True;
1653  const msgmap_t *pMap;
1654  const msgentry_t *pEntries;
1655 
1656  if (msg.getMessageId() >= EM_USER)
1657  {
1658  // interate through each map
1659  for (pMap = (msgmap_t*)GetMessageMap(); keepgoing && pMap && pMap->pfnGetBaseMap != NULL; pMap = (msgmap_t*)(*pMap->pfnGetBaseMap)())
1660  {
1661  // interate through each entry for the map
1662  for (pEntries = pMap->lpEntries; pEntries->nMessage; pEntries++)
1663  {
1664  if (pEntries->nMessage == msg.getMessageId())
1665  {
1666  (this->*((void(EThreadEventWorkerBase::*)(TMessage&))pEntries->pFn))(msg);
1667  keepgoing = False;
1668  break;
1669  }
1670  }
1671  }
1672 
1673  if (pMap == NULL)
1674  defaultMessageHandler(msg);
1675  }
1676  else
1677  {
1678  switch (msg.getMessageId())
1679  {
1680  case EM_QUIT: { onQuit(); break; }
1681  case EM_TIMER: { onTimer( (EThreadEventTimer*)msg.getVoidPtr() ); break; }
1682  default: { break; }
1683  }
1684  }
1685 
1686  return keepgoing;
1687  }
1688 
1689  Int m_workerid;
1690  TQueue *m_queue;
1691  pVoid m_arg;
1692  size_t m_stacksize;
1693  pid_t m_tid;
1694 };
1695 
1698 
1703 template <class TQueue, class TMessage, class TWorker>
1704 class EThreadEventWorkGroup : public _EThreadEventNotification
1705 {
1706 public:
1709  : m_initialized(False),
1710  m_arg(NULL),
1711  m_stacksize(0),
1712  m_appId(0),
1713  m_workGroupId(0),
1714  m_queueSize(0),
1715  m_minWorkers(0),
1716  m_maxWorkers(0),
1717  m_actvWorkers(0)
1718  {
1719  }
1722  {
1723  for (int i=0; i<m_actvWorkers; i++)
1724  {
1725  delete m_workers[i];
1726  m_workers[i] = nullptr;
1727  }
1728  }
1729 
1732  Bool isInitialized() { return m_initialized; }
1733 
1740  Bool sendMessage(UInt message, Bool wait = True)
1741  {
1742  TMessage msg(message);
1743  Bool result = m_queue.push(msg, wait);
1744  if (result)
1745  onMessageQueued(msg);
1746  return result;
1747  }
1755  Bool sendMessage(UInt message, pVoid voidptr, Bool wait = True)
1756  {
1757  TMessage msg(message);
1758  msg.setVoidPtr(voidptr);
1759  Bool result = m_queue.push(msg, wait);
1760  if (result)
1761  onMessageQueued(msg);
1762  return result;
1763  }
1769  Bool sendMessage(const TMessage &msg, Bool wait = True)
1770  {
1771  Bool result = m_queue.push(msg, wait);
1772  if (result)
1773  onMessageQueued(msg);
1774  return result;
1775  }
1776 
1788  virtual Void init(Short appId, UShort workGroupId, Int minWorkers, Int maxWorkers = -1,
1789  Int queueSize = 16384, pVoid arg = nullptr, Bool suspended = False, Dword stackSize = 0)
1790  {
1791  m_appId = appId;
1792  m_workGroupId = workGroupId;
1793  m_arg = arg;
1794  m_queueSize = queueSize;
1795  m_stacksize = stackSize;
1796  m_minWorkers = minWorkers;
1797  m_maxWorkers = maxWorkers < minWorkers ? minWorkers : maxWorkers;
1798 
1799  long id = m_appId * 100000 + 20000 + m_workGroupId;
1800 
1801  m_queue.init(m_queueSize, id, True, EThreadQueueMode::ReadWrite, True);
1802 
1803  if (!suspended)
1804  start();
1805  }
1806 
1808  Void join()
1809  {
1810  for (auto worker : m_workers)
1811  worker->join();
1812  }
1814  Void quit()
1815  {
1816  for (int i=0; i<m_actvWorkers; i++)
1817  sendMessage(EM_QUIT);
1818  }
1820  Void start()
1821  {
1822  if (!isInitialized())
1823  {
1824  m_workers.resize(m_maxWorkers);
1825  for (Int i=0; i<m_minWorkers; i++)
1826  {
1827  m_workers[i] = nullptr;
1828  if (i < m_minWorkers)
1829  addWorker();
1830  }
1831  }
1832  }
1833 
1836  Bool addWorker()
1837  {
1838  EMutexLock l(m_mutex);
1839  if (m_actvWorkers >= m_maxWorkers)
1840  return False;
1841  addWorker(m_actvWorkers++);
1842  return True;
1843  }
1844 
1849  {
1850  TMessage *msg = new TMessage(EM_TIMER);
1851  msg->setVoidPtr(&t);
1852  t.init(this, msg);
1853  }
1856  {
1857  return m_queue.semMsgs();
1858  }
1859 
1860 protected:
1866  virtual Void onMessageQueued(const TMessage &msg)
1867  {
1868  }
1871  virtual Void onCreateWorker(TWorker &worker)
1872  {
1873  }
1874 
1875 private:
1876  Bool _sendTimerExpiration(const _EThreadEventMessageBase &msg, Bool wait = True)
1877  {
1878  Bool result = m_queue.push(static_cast<const TMessage &>(msg), wait);
1879  if (result)
1880  onMessageQueued(static_cast<const TMessage &>(msg));
1881  return result;
1882  }
1883  Bool _sendThreadMessage(const _EThreadEventMessageBase &msg, Bool wait = True)
1884  {
1885  Bool result = m_queue.push(static_cast<const TMessage &>(msg), wait);
1886  if (result)
1887  onMessageQueued(static_cast<const TMessage &>(msg));
1888  return result;
1889  }
1890 
1891  Void addWorker(Int idx)
1892  {
1893  TWorker *worker = new TWorker();
1894  // worker->setGroup(*this);
1895  m_workers[idx] = worker;
1896  worker->init(m_queue, idx + 1, m_arg, m_stacksize);
1897  onCreateWorker(*worker);
1898  }
1899 
1900  EMutexPrivate m_mutex;
1901  Bool m_initialized;
1902  pVoid m_arg;
1903  size_t m_stacksize;
1904 
1905  Short m_appId;
1906  UShort m_workGroupId;
1907  Int m_queueSize;
1908  TQueue m_queue;
1909 
1910  Int m_minWorkers;
1911  Int m_maxWorkers;
1912  Int m_actvWorkers;
1913  std::vector<TWorker*> m_workers;
1914 };
1915 
1917 template <class TWorker> using EThreadWorkGroupPublic = EThreadEventWorkGroup<EThreadQueuePublic<EThreadMessage>,EThreadMessage,TWorker>;
1918 
1920 template <class TWorker> using EThreadWorkGroupPrivate = EThreadEventWorkGroup<EThreadQueuePrivate<EThreadMessage>,EThreadMessage,TWorker>;
1921 
1924 
1925 #endif // #ifndef __ETEVENT_H
#define atomic_inc(a)
atomic increment - increments a by 1
Definition: eatomic.h:27
Bool sendMessage(UInt message, pVoid voidptr, Bool wait=True)
Sends event message to this thread.
Definition: etevent.h:1116
virtual Void onMessageQueued(const TMessage &msg)
Called when an event message is queued.
Definition: etevent.h:1536
Definition of a public event thread message queue.
Definition: etevent.h:566
virtual Void init(Short appId, UShort threadId, pVoid arg, Int queueSize=16384, Bool suspended=False, Dword stackSize=0)
Initializes the thread object.
Definition: etevent.h:1146
virtual Void onMessageQueued(const TMessage &msg)
Called when an event message is queued.
Definition: etevent.h:1866
~EThreadEvent()
The class destructor.
Definition: etevent.h:1091
#define True
True.
Definition: ebase.h:25
Bool sendMessage(UInt message, Bool wait=True)
Sends event message to this work group.
Definition: etevent.h:1478
EThreadEventMessageBase< EThreadEventMessageData > _EThreadMessage
Definition: etevent.h:259
Bool pumpMessage(TMessage &msg, Bool wait=true)
Dispatches the next thread event message.
Definition: etevent.h:1586
Allows read only access.
Void init(Int nMsgCnt, Int threadId, Bool bMultipleWriters, EThreadQueueMode eMode)
Initializes this public event thead message queue object.
Definition: etevent.h:591
Work group template definition. The work group contains the event queue that all of the associated wo...
Definition: etevent.h:1704
EThreadMessage(UInt msgid, Char v1, Char v2=0, Char v3=0, Char v4=0, Char v5=0, Char v6=0, Char v7=0, Char v8=0)
Class constructor.
Definition: etevent.h:322
EThreadEventMessageDataBase(UInt msgid)
Class constructor.
Definition: etevent.h:65
static ETime Now()
Retrieves the current time.
Definition: etime.cpp:1147
EThreadQueueMode mode()
Retrieves the access mode associated with this queue object.
Definition: etevent.h:431
~EThreadEventTimer()
Class destructor.
Definition: etevent.h:882
EThreadEventMessageData(UInt msgid, Short v1, Short v2, Short v3, Short v4)
Class constructor.
Definition: etevent.h:151
Macros for various standard C library functions and standard includes.
virtual ~_EThreadEventMessageBase()
Virtual class destructor.
Definition: etevent.h:207
EThreadEventMessageData(UInt msgid, UShort v1, UShort v2, UShort v3, UShort v4)
Class constructor.
Definition: etevent.h:158
EThreadMessage(UInt msgid)
Class constructor.
Definition: etevent.h:271
virtual Void onInit()
Called in the context of the thread prior to processing teh first event message.
Definition: etevent.h:1454
Bool sendMessage(UInt message, pVoid voidptr, Bool wait=True)
Sends event message to this work group.
Definition: etevent.h:1493
The event message base class.
Definition: etevent.h:203
EThreadEventMessageData()
Default constructor.
Definition: etevent.h:115
Defines a class for access to shared memory.
EThreadEvent()
Default class constructor.
Definition: etevent.h:1081
base class for EThreadPrivate and EThreadPublic
Definition: etevent.h:1062
Void init(pVoid arg, size_t stackSize=0)
Initialize and start the thread.
Definition: etbasic.cpp:106
EThreadEventMessageBase & setMessageId(UInt msgid)
Sets the event message ID for this event message.
Definition: etevent.h:241
EThreadQueuePublic()
Default constructor.
Definition: etevent.h:573
The shared memory access class.
Definition: eshmem.h:43
virtual ~EThreadMessage()
Class destructor.
Definition: etevent.h:337
EThreadEventMessageData(UInt msgid, UChar v1, UChar v2, UChar v3, UChar v4, UChar v5, UChar v6, UChar v7, UChar v8)
Class constructor.
Definition: etevent.h:180
Void resume()
Resumes a suspended thread.
Definition: etevent.h:1184
Bool pumpMessage(TMessage &msg, Bool wait=true)
Dispatches the next thread event message.
Definition: etevent.h:1269
EThreadQueueMode
Defines how a client can access a thread queue.
Definition: etevent.h:344
Represents a public semaphore, the semaphore data is located in shared memory.
Definition: esynch.h:446
Any EpcTools that needs to have initialization performed as part of EpcTools::Initialize() should der...
Definition: estatic.h:51
EThreadMessage(UInt msgid, pVoid v)
Class constructor.
Definition: etevent.h:279
Bool sendMessage(const TMessage &msg, Bool wait=True)
Sends event message to this work group.
Definition: etevent.h:1507
Definition of a private event thread message queue.
Definition: etevent.h:681
EThreadMessage(UInt msgid, UShort v1, UShort v2=0, UShort v3=0, UShort v4=0)
Class constructor.
Definition: etevent.h:311
Void quit()
Posts the quit message to all of the worker threads.
Definition: etevent.h:1814
virtual Void onMessageQueued(const TMessage &msg)
Called when an event message is queued.
Definition: etevent.h:1254
Manages configuration parameters from a file and the command line.
Allows read or write access.
virtual ~EThreadEventMessageBase()
Class destructor.
Definition: etevent.h:221
virtual Void defaultMessageHandler(TMessage &msg)
The default event message handler.
Definition: etevent.h:1631
Void initTimer(EThreadEventTimer &t)
Intializes an EThreadEvent::Timer object and associates it with this work group.
Definition: etevent.h:1848
Long getId()
Returns the unique timer id.
Definition: etevent.h:957
EThreadMessage()
Default class constructor.
Definition: etevent.h:268
EThreadMessage(UInt msgid, UChar v1, UChar v2=0, UChar v3=0, UChar v4=0, UChar v5=0, UChar v6=0, UChar v7=0, UChar v8=0)
Class constructor.
Definition: etevent.h:334
#define epc_sprintf_s
epc_sprintf_s - sprintf_s
Definition: ebase.h:46
EThreadEventWorker()
Default class constructor.
Definition: etevent.h:1517
virtual Void onQuit()
Called in the context of the thread when the EM_QUIT event is processed.
Definition: etevent.h:1458
EThreadEventMessageDataBase & setMessageId(UInt msgid)
Sets the event message ID.
Definition: etevent.h:75
EThreadEventTimer(Long milliseconds, Bool oneshot=False)
Class constructor with configuration parameters.
Definition: etevent.h:870
Implements a stopwatch style timer.
Definition: etimer.h:26
Contains the data associated with a public or private mutex.
Definition: esynch.h:72
Provides class for manipulating time of day values.
virtual Void onTimer(EThreadEventTimer *ptimer)
Called in the context of the thread when th EM_TIMER event is processed.
Definition: etevent.h:1203
EThreadEventMessageBase()
Default constructor.
Definition: etevent.h:216
Contains the data associated with a public or private semaphore.
Definition: esynch.h:268
Definition: egetopt.h:31
virtual Void onTimer(EThreadEventTimer *ptimer)
Called in the context of the thread when th EM_TIMER event is processed.
Definition: etevent.h:1463
Void destroy()
Stops and destroys the underlying timer object.
Definition: etevent.h:889
EThreadEventTimer()
Default class constructor.
Definition: etevent.h:855
Void detach()
Detaches from the semaphore data.
Definition: esynch.cpp:569
virtual Void pumpMessages()
Process event messages.
Definition: etevent.h:1600
Void setVoidPtr(pVoid p)
Sets the void pointer.
Definition: etevent.h:194
Void init(Int nMsgCnt, Int threadId, Bool bMultipleWriters, EThreadQueueMode eMode)
Initializes this public event thead message queue object.
Definition: etevent.h:718
pVoid getVoidPtr()
Retrieves the void pointer.
Definition: etevent.h:190
virtual ~EThreadEventMessageData()
Class destructor.
Definition: etevent.h:182
#define False
False.
Definition: ebase.h:27
UInt getMessageId()
Retrieves the event message ID.
Definition: etevent.h:71
EThreadEventMessageData(UInt msgid, LongLong v)
Class constructor.
Definition: etevent.h:130
Class for manipulating date and time of day values.
Definition: etime.h:199
EThreadEventMessageDataBase()
Default constructor.
Definition: etevent.h:62
Void setInterval(const ETime &t)
sets the timer interval.
Definition: etevent.h:945
#define EM_INIT
thread initialization event
Definition: etevent.h:794
ESemaphoreData & getMsgSemaphore()
Returns the semaphore associated with this thread&#39;s event queue.
Definition: etevent.h:1215
A template class that all event message classes should derive from.
Definition: etevent.h:212
A public mutex (the mutex data is located in shared memory).
Definition: esynch.h:223
Represents a worker thread that is part of a work group.
Definition: etevent.h:1444
EThreadMessage(UInt msgid, Long v1, Long v2=0)
Class constructor.
Definition: etevent.h:292
ETimer & getTimer()
Retrieves the timer associated with this event message.
Definition: etevent.h:89
virtual Void setVoidPtr(pVoid p)=0
Sets the void pointer.
virtual Void init(Short appId, UShort workGroupId, Int minWorkers, Int maxWorkers=-1, Int queueSize=16384, pVoid arg=nullptr, Bool suspended=False, Dword stackSize=0)
Initializes the thread object.
Definition: etevent.h:1788
#define EM_USER
beginning of user events
Definition: etevent.h:810
Bool sendMessage(const TMessage &msg, Bool wait=True)
Sends event message to this thread.
Definition: etevent.h:1130
EThreadEventWorkGroup()
Default class constructor.
Definition: etevent.h:1708
virtual Void pumpMessages()
Process event messages.
Definition: etevent.h:1283
~EThreadQueuePublic()
Class destructor.
Definition: etevent.h:579
Void detach()
Detaches from the public mutex object.
Definition: esynch.cpp:432
Thread timer class.
Definition: etevent.h:828
Defines base class for exceptions and declaration helper macros.
Bool push(const T &msg, Bool wait=True)
Adds the specified message to the thread event queue.
Definition: etevent.h:372
DataUnion & data()
Retrieves the data union object.
Definition: etevent.h:186
Void setOneShot(Bool oneshot)
sets the type of timer
Definition: etevent.h:952
#define EM_SUSPEND
thread suspend event
Definition: etevent.h:798
An event message that is to be sent to a thread.
Definition: etevent.h:264
EThreadEventMessageData(UInt msgid, Long v1, Long v2)
Class constructor.
Definition: etevent.h:139
Allows read only access.
Int & semIndex()
Retrieves the semaphore data index.
Definition: esynch.cpp:555
Bool isInitialized()
Indicates if this timer object has been initialized.
Definition: etevent.h:962
Void setVoidPtr(pVoid p)
Sets the void pointer for this event message.
Definition: etevent.h:248
EThreadEvent< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > EThreadPrivate
Definition: etevent.h:1389
Void initTimer(EThreadEventTimer &t)
Intializes an EThreadEvent::Timer object and associates with this thread.
Definition: etevent.h:1208
Bool isInitialized()
Retrieves indication if this queue object has been initialized.
Definition: etevent.h:428
An abstract class that represents contains the threadProc() that will be run in a separate thread...
Definition: etbasic.h:53
Int mutexId()
Retrieves the mutex ID associated with this public mutex.
Definition: esynch.h:253
virtual Void init(TQueue &queue, Int workerid, pVoid arg, Dword stackSize=0)
Initializes the thread object.
Definition: etevent.h:1546
EThreadMessage(UInt msgid, EThreadEventMessageData &data)
Class constructor.
Definition: etevent.h:275
The base class for exceptions derived from std::exception.
Definition: eerror.h:94
Bool pop(T &msg, Bool wait=True)
Removes the next message from the thread event queue.
Definition: etevent.h:401
Long getInterval()
Returns the timer interval in milliseconds.
Definition: etevent.h:935
Void setInterval(LongLong interval)
sets the timer interval
Definition: etevent.h:938
Void quit()
Posts the quit message to this thread.
Definition: etevent.h:1162
Defines the functionality for the thread queue.
Definition: etevent.h:359
virtual ~EThreadEventMessageDataBase()
Class destructor.
Definition: etevent.h:67
EThreadEventMessageData(UInt msgid, Char v1, Char v2, Char v3, Char v4, Char v5, Char v6, Char v7, Char v8)
Class constructor.
Definition: etevent.h:169
EThreadQueuePrivate()
Default constructor.
Definition: etevent.h:688
virtual Void init(EGetOpt &opt)
Performs class specific initialization.
Definition: estatic.h:61
Void stop()
Stops the timer.
Definition: etevent.h:922
#define EM_QUIT
thread quit event
Definition: etevent.h:796
virtual pVoid getVoidPtr()=0
Retrieves the void pointer.
virtual Void defaultMessageHandler(TMessage &msg)
The default event message handler.
Definition: etevent.h:1316
Bool addWorker()
Creates a new worker thread if the current number of workers is less than the maximum configured...
Definition: etevent.h:1836
Bool sendMessage(UInt message, Bool wait=True)
Sends event message to this work group.
Definition: etevent.h:1740
Void start()
Initializes the thread when it was suspended at init().
Definition: etevent.h:1167
Bool sendMessage(UInt message, Bool wait=True)
Sends event message to this thread.
Definition: etevent.h:1101
virtual Void onCreateWorker(TWorker &worker)
Called when a new worker thread object is created.
Definition: etevent.h:1871
Int workerId() const
Definition: etevent.h:1448
Acquires and holds a lock on the specified mutex.
Definition: esynch.h:133
EThreadEventMessageBase(const T &data)
Copy constructor.
Definition: etevent.h:219
Bool sendMessage(const TMessage &msg, Bool wait=True)
Sends event message to this work group.
Definition: etevent.h:1769
Bool acquire(Bool wait=True)
Manually acquires a lock on the mutex.
Definition: esynch.h:155
ESemaphoreData & getMsgSemaphore()
Returns the semaphore associated with this thread&#39;s event queue.
Definition: etevent.h:1855
Void suspend()
Suspends a running thread.
Definition: etevent.h:1178
~EThreadEventWorker()
The class destructor.
Definition: etevent.h:1527
#define DECLARE_ERROR(__e__)
Declares exception class derived from EError with no constructor parameters.
Definition: eerror.h:53
EThreadMessage(UInt msgid, ULong v1, ULong v2=0)
Class constructor.
Definition: etevent.h:297
pid_t getThreadId()
Retrieves the internal thread ID.
Definition: etevent.h:1636
EThreadMessage(UInt msgid, ULongLong v)
Class constructor.
Definition: etevent.h:287
EThreadEventMessageData(UInt msgid, EThreadEventMessageData &data)
Class constructor.
Definition: etevent.h:122
pid_t getThreadId()
Retrieves the internal thread ID.
Definition: etevent.h:1321
virtual Void onInit()
Called in the context of the thread prior to processing teh first event message.
Definition: etevent.h:1190
#define atomic_dec(a)
atomic decrement - decrements a by 1
Definition: eatomic.h:25
~EThreadQueuePrivate()
Class destructor.
Definition: etevent.h:703
T & operator=(const T &data)
Assignment operator.
Definition: etevent.h:224
#define EPC_FILENAME_MAX
maximum file name length
Definition: ebase.h:37
Int queueSize() const
Returns the maximum number of events that can be present in the event queue.
Definition: etevent.h:364
Bool sendMessage(UInt message, pVoid voidptr, Bool wait=True)
Sends event message to this work group.
Definition: etevent.h:1755
EThreadEvent< EThreadQueuePublic< EThreadMessage >, EThreadMessage > EThreadPublic
Definition: etevent.h:1388
EThreadEventMessageData(UInt msgid, ULongLong v)
Class constructor.
Definition: etevent.h:134
Represents a private semaphore, the semaphore data is allocated from either the stack or heap...
Definition: esynch.h:382
The base class that all event message data objects should be derived from.
Definition: etevent.h:58
A private mutex (the mutex data is allocated from either the heap or stack).
Definition: esynch.h:175
EThreadMessage(UInt msgid, LongLong v)
Class constructor.
Definition: etevent.h:283
virtual Void onSuspend()
Called in the context of the thread when th EM_SUSPEND event is processed.
Definition: etevent.h:1198
ESemaphoreData & getMsgSemaphore()
Returns the semaphore associated with this work groups&#39; event queue.
Definition: etevent.h:1467
Void start()
Starts the timer.
Definition: etevent.h:908
Bool isInitialized()
Retrieves indication if this work group object has been initialized.
Definition: etevent.h:1732
T & data()
Retrieves the data portion of the message.
Definition: etevent.h:231
#define DECLARE_ERROR_ADVANCED(__e__)
Declares exception class derived from EError with no constructor parameters and developer defined con...
Definition: eerror.h:61
EThreadMessage(UInt msgid, Short v1, Short v2=0, Short v3=0, Short v4=0)
Class constructor.
Definition: etevent.h:304
const timeval & getTimeVal() const
Retrieves a reference to the timeval structure.
Definition: etime.h:446
UInt getMessageId()
Retrieves the event message ID associated with this event message.
Definition: etevent.h:237
An event message data object that provides access to a union over 8 bytes of data.
Definition: etevent.h:97
Contains definitions for synchronization objects.
Void start()
Initializes the thread when it was suspended at init().
Definition: etevent.h:1820
EThreadEventMessageData(UInt msgid, ULong v1, ULong v2)
Class constructor.
Definition: etevent.h:144
Void join()
Returns after successfully joining each worker thread.
Definition: etevent.h:1808
#define EM_TIMER
thread timer expiration event
Definition: etevent.h:800
virtual Void onQuit()
Called in the context of the thread when the EM_QUIT event is processed.
Definition: etevent.h:1194
~EThreadEventWorkGroup()
The class destructor.
Definition: etevent.h:1721
pVoid getVoidPtr()
Retrieves the void pointer from the data portion of this event message.
Definition: etevent.h:244
Void start()
Initializes the thread when it was suspended at init().
Definition: etevent.h:1557
EThreadEventMessageData(UInt msgid, pVoid v)
Class constructor.
Definition: etevent.h:126
EThreadEventMessageData(UInt msgid)
Class constructor.
Definition: etevent.h:118
ETimer & getTimer()
Retrieves the timer associated with this event message.
Definition: etevent.h:253