EpcTools
An event based multi-threaded C++ development framework.
esocket.h
Go to the documentation of this file.
1 /*
2 * Copyright (c) 2009-2019 Brian Waters
3 * Copyright (c) 2019 Sprint
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 
18 #ifndef __esocket_h_included
19 #define __esocket_h_included
20 
21 #include <csignal>
22 #include <cstring>
23 #include <unordered_map>
24 
25 #include <errno.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
30 #include <netdb.h>
31 
32 #include "ebase.h"
33 #include "ecbuf.h"
34 #include "eerror.h"
35 #include "estatic.h"
36 #include "estring.h"
37 #include "etevent.h"
38 #include "ehash.h"
39 
42 
44 namespace ESocket
45 {
47  // (Maximum message length) = (max IP packet size) - (min IP header length) - (udp header length)
48  // 65507 = 65535 - 20 - 8
49  const Int UPD_MAX_MSG_LENGTH = 65507;
50 
51  const Int EPC_INVALID_SOCKET = -1;
52  const Int EPC_SOCKET_ERROR = -1;
53  typedef Int EPC_SOCKET;
54  typedef void *PSOCKETOPT;
55  typedef void *PSNDRCVBUFFER;
56  typedef socklen_t EPC_SOCKLEN;
57 
58  namespace TCP
59  {
60  template<class TQueue, class TMessage> class Talker;
61  template<class TQueue, class TMessage> class Listener;
62  }
63  template<class TQueue, class TMessage> class UDP;
64  template<class TQueue, class TMessage> class Thread;
66 
68  enum class Family
69  {
71  Undefined,
73  INET,
75  INET6
76  };
77 
79  enum class SocketType
80  {
82  Undefined,
84  TcpTalker,
88  Udp
89  };
90 
92  enum class SocketState
93  {
95  Undefined,
99  Connecting,
101  Listening,
103  Connected
104  };
105 
108 
110  DECLARE_ERROR_ADVANCED(AddressError_UnknownAddressType);
111  DECLARE_ERROR_ADVANCED(AddressError_CannotConvertInet2Inet6);
112  DECLARE_ERROR_ADVANCED(AddressError_CannotConvertInet62Inet);
113  DECLARE_ERROR_ADVANCED(AddressError_ConvertingToString);
114  DECLARE_ERROR_ADVANCED(AddressError_UndefinedFamily);
115 
116  DECLARE_ERROR_ADVANCED(BaseError_UnableToCreateSocket);
117  DECLARE_ERROR_ADVANCED(BaseError_GetPeerNameError);
118 
119  DECLARE_ERROR_ADVANCED(TcpTalkerError_InvalidRemoteAddress);
120  DECLARE_ERROR_ADVANCED(TcpTalkerError_UnableToConnect);
121  DECLARE_ERROR_ADVANCED(TcpTalkerError_UnableToRecvData);
122  DECLARE_ERROR_ADVANCED4(TcpTalkerError_InvalidSendState);
123  DECLARE_ERROR_ADVANCED4(TcpTalkerError_InvalidReadState);
124  DECLARE_ERROR_ADVANCED4(TcpTalkerError_InvalidWriteState);
125  DECLARE_ERROR_ADVANCED4(TcpTalkerError_ReadingWritePacketLength);
126  DECLARE_ERROR_ADVANCED(TcpTalkerError_SendingPacket);
127  DECLARE_ERROR_ADVANCED(TcpTalkerError_InvalidReceiveState);
128 
129  DECLARE_ERROR_ADVANCED(TcpListenerError_UnableToListen);
130  DECLARE_ERROR_ADVANCED(TcpListenerError_UnableToBindSocket);
131  DECLARE_ERROR_ADVANCED(TcpListenerError_UnableToAcceptSocket);
132 
133  DECLARE_ERROR_ADVANCED(UdpError_AlreadyBound);
134  DECLARE_ERROR_ADVANCED(UdpError_UnableToBindSocket);
135  DECLARE_ERROR_ADVANCED(UdpError_UnableToRecvData);
136  DECLARE_ERROR_ADVANCED(UdpError_SendingPacket);
137  DECLARE_ERROR_ADVANCED4(UdpError_ReadingWritePacketLength);
138 
139  DECLARE_ERROR_ADVANCED(ThreadError_UnableToOpenPipe);
140  DECLARE_ERROR_ADVANCED(ThreadError_UnableToReadPipe);
141  DECLARE_ERROR_ADVANCED(ThreadError_UnableToWritePipe);
143 
146 
148  class Address
149  {
150  public:
152  Address() : m_addr() {}
156  Address(cpStr addr, UShort port) { setAddress(addr,port); }
160  Address(const struct in_addr &addr, UShort port) { setAddress(addr,port); }
164  Address(const struct in6_addr &addr, UShort port) { setAddress(addr,port); }
167  Address(struct sockaddr_in &addr) { memcpy(&m_addr, &addr, sizeof(addr)); }
170  Address(struct sockaddr_in6 &addr) { memcpy(&m_addr, &addr, sizeof(addr)); }
173  Address(const Address &addr) { memcpy(&m_addr, &addr, sizeof(addr)); }
174 
177  operator EString() const
178  {
179  Char buf[INET6_ADDRSTRLEN];
180 
181  if (m_addr.ss_family == AF_INET)
182  {
183  if (!inet_ntop(m_addr.ss_family,&((struct sockaddr_in*)&m_addr)->sin_addr.s_addr,buf,sizeof(buf)))
184  throw AddressError_ConvertingToString();
185  }
186  else // AF_INET6
187  {
188  if (!inet_ntop(m_addr.ss_family,&((struct sockaddr_in6*)&m_addr)->sin6_addr.s6_addr,buf,sizeof(buf)))
189  throw AddressError_ConvertingToString();
190  }
191  return EString(buf);
192  }
193 
196  operator UShort() const
197  {
198  if (m_addr.ss_family == AF_INET)
199  return ntohs(((struct sockaddr_in*)&m_addr)->sin_port);
200  if (m_addr.ss_family == AF_INET6)
201  return ntohs(((struct sockaddr_in6*)&m_addr)->sin6_port);
202  throw AddressError_UndefinedFamily();
203  }
204 
207  EString getAddress() const { return *this; }
210  UShort getPort() const { return *this; }
211 
214  const struct sockaddr_storage &getSockAddrStorage() const
215  {
216  return m_addr;
217  }
218 
221  struct sockaddr *getSockAddr() const
222  {
223  return (struct sockaddr *)&m_addr;
224  }
225 
228  socklen_t getSockAddrLen() const
229  {
230  if (m_addr.ss_family == AF_INET)
231  return sizeof(struct sockaddr_in);
232  if (m_addr.ss_family == AF_INET6)
233  return sizeof(struct sockaddr_in6);
234  return sizeof(struct sockaddr_storage);
235  }
236 
240  Address &operator=(const Address& addr)
241  {
242  memcpy(&m_addr, &addr, sizeof(m_addr));
243  return *this;
244  }
245 
249  Address &operator=(const sockaddr_in &addr)
250  {
251  return setAddress(addr);
252  }
253 
257  Address &operator=(const sockaddr_in6 &addr)
258  {
259  return setAddress(addr);
260  }
261 
265  Address &operator=(UShort port)
266  {
267  return setAddress(port);
268  }
269 
273  {
274  return m_addr.ss_family == AF_INET ? Family::INET :
275  m_addr.ss_family == AF_INET6 ? Family::INET6 : Family::Undefined;
276  }
277 
280  const struct sockaddr_storage &getStorage() const
281  {
282  return m_addr;
283  }
284 
287  const struct sockaddr_in &getInet() const
288  {
289  if (m_addr.ss_family != AF_INET)
290  throw AddressError_CannotConvertInet62Inet();
291  return (struct sockaddr_in &)m_addr;
292  }
293 
296  const struct sockaddr_in6 &getInet6() const
297  {
298  if (m_addr.ss_family != AF_INET6)
299  throw AddressError_CannotConvertInet2Inet6();
300  return (struct sockaddr_in6 &)m_addr;
301  }
302 
307  Address &setAddress(cpStr addr, UShort port)
308  {
309  clear();
310  if (inet_pton(AF_INET,addr,&((struct sockaddr_in*)&m_addr)->sin_addr) == 1)
311  {
312  ((struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
313  ((struct sockaddr_in*)&m_addr)->sin_port = htons(port);
314  return *this;
315  }
316 
317  clear();
318  if (inet_pton(AF_INET6,addr,&((struct sockaddr_in6*)&m_addr)->sin6_addr) == 1)
319  {
320  ((struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
321  ((struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
322  ((struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
323  ((struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
324  return *this;
325  }
326 
327  throw AddressError_UnknownAddressType();
328  }
329 
334  Address &setAddress(const struct in_addr &addr, UShort port)
335  {
336  clear();
337  std::memcpy(&((struct sockaddr_in*)&m_addr)->sin_addr, &addr, sizeof(((struct sockaddr_in*)&m_addr)->sin_addr));
338  ((struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
339  ((struct sockaddr_in*)&m_addr)->sin_port = htons(port);
340  return *this;
341  }
342 
347  Address &setAddress(const struct in6_addr &addr, UShort port)
348  {
349  clear();
350  std::memcpy(&((struct sockaddr_in6*)&m_addr)->sin6_addr, &addr, sizeof(((struct sockaddr_in6*)&m_addr)->sin6_addr));
351  ((struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
352  ((struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
353  ((struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
354  ((struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
355  return *this;
356  }
357 
362  Address &setAddress(UShort port, Family fam = Family::INET6)
363  {
364  switch (fam)
365  {
366  case Family::INET:
367  {
368  ((struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
369  ((struct sockaddr_in*)&m_addr)->sin_port = htons(port);
370  ((struct sockaddr_in*)&m_addr)->sin_addr.s_addr = INADDR_ANY;
371  break;
372  }
373  case Family::INET6:
374  {
375  ((struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
376  ((struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
377  ((struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
378  ((struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
379  ((struct sockaddr_in6*)&m_addr)->sin6_addr = in6addr_any;
380  break;
381  }
382  default:
383  {
384  throw AddressError_UndefinedFamily();
385  }
386  }
387  return *this;
388  }
389 
390  Address &setAddress(const sockaddr_in &addr)
391  {
392  clear();
393  ((struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
394  ((struct sockaddr_in*)&m_addr)->sin_port = addr.sin_port;
395  ((struct sockaddr_in*)&m_addr)->sin_addr.s_addr = addr.sin_addr.s_addr;
396  return *this;
397  }
398 
399  Address &setAddress(const sockaddr_in6 &addr)
400  {
401  clear();
402  ((struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
403  ((struct sockaddr_in6*)&m_addr)->sin6_port = addr.sin6_port;
404  ((struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
405  ((struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
406  ((struct sockaddr_in6*)&m_addr)->sin6_addr = addr.sin6_addr;
407  return *this;
408  }
409 
413  {
414  memset( &m_addr, 0, sizeof(m_addr) );
415  return *this;
416  }
417 
420  static Family getFamily(cpStr addr)
421  {
422  in_addr ipv4;
423  in6_addr ipv6;
424 
425  if (inet_pton(AF_INET,addr,&ipv4) == 1)
426  return Family::INET;
427  if (inet_pton(AF_INET6,addr,&ipv6) == 1)
428  return Family::INET6;
429  return Family::Undefined;
430  }
431 
432  private:
433  struct sockaddr_storage m_addr;
434  };
435 
438 
440  template <class TQueue, class TMessage>
441  class Base
442  {
443  friend class TCP::Talker<TQueue,TMessage>;
444  friend class TCP::Listener<TQueue,TMessage>;
445  friend class UDP<TQueue,TMessage>;
446  friend class Thread<TQueue,TMessage>;
447 
448  public:
450  virtual ~Base()
451  {
452  close();
453  }
454 
458  {
459  return m_thread;
460  }
461 
465  {
466  return m_socktype;
467  }
468 
471  Int getFamily()
472  {
473  return m_family;
474  }
475 
478  Int getType()
479  {
480  return m_type;
481  }
482 
486  {
487  return m_protocol;
488  }
489 
492  Int getError()
493  {
494  return m_error;
495  }
496 
498  {
499  static Char desc[256];
500  return strerror_r(m_error, desc, sizeof(desc));
501  }
502 
504  Void close()
505  {
506  disconnect();
507  onClose();
508  }
509 
511  virtual Void disconnect()
512  {
513  getThread().unregisterSocket(this);
514  if (m_handle != EPC_INVALID_SOCKET)
515  {
516  ::close(m_handle);
517  m_handle = EPC_INVALID_SOCKET;
518  }
519  }
520 
523  Int getHandle()
524  {
525  return m_handle;
526  }
527 
531  {
532  switch (state)
533  {
534  case SocketState::Disconnected: return "DISCONNECTED";
535  case SocketState::Connecting: return "CONNECTING";
536  case SocketState::Connected: return "CONNECTED";
537  case SocketState::Listening: return "LISTENING";
538  default: return "UNDEFINED";
539  }
540  }
541 
542  protected:
544  Base(Thread<TQueue,TMessage> &thread, SocketType socktype, Int family, Int type, Int protocol)
545  : m_thread( thread ),
546  m_socktype( socktype ),
547  m_family( family ),
548  m_type( type ),
549  m_protocol( protocol ),
550  m_error( 0 ),
551  m_handle( EPC_INVALID_SOCKET )
552  {
553  }
554 
555  Void createSocket(Int family, Int type, Int protocol)
556  {
557  m_handle = socket(family, type, protocol);
558  if (m_handle == EPC_INVALID_SOCKET)
559  throw BaseError_UnableToCreateSocket();
560 
561  m_family = family;
562  m_type = type;
563  m_protocol = protocol;
564 
565  setOptions();
566  }
567 
568  Void assignAddress(cpStr ipaddr, UShort port, Int family, Int socktype,
569  Int flags, Int protocol, struct addrinfo **address);
570  Int setError()
571  {
572  m_error = errno;
573  return m_error;
574  }
575 
576  Void setError(Int error)
577  {
578  m_error = error;
579  }
580 
581  Void setHandle(Int handle)
582  {
583  disconnect();
584  m_handle = handle;
585  setOptions();
586  }
587 
588  Base &setFamily(Int family)
589  {
590  m_family = family;
591  return *this;
592  }
593 
594  Address &setLocalAddress(Address &addr)
595  {
596  addr.clear();
597 
598  socklen_t sockaddrlen = addr.getSockAddrLen();;
599 
600  if (getsockname(m_handle, addr.getSockAddr(), &sockaddrlen) < 0)
601  throw BaseError_GetPeerNameError();
602 
603  return addr;
604  }
605 
606  Address &setRemoteAddress(Address &addr)
607  {
608  addr.clear();
609 
610  socklen_t sockaddrlen = addr.getSockAddrLen();;
611 
612  if (getpeername(m_handle, addr.getSockAddr(), &sockaddrlen) < 0)
613  throw BaseError_GetPeerNameError();
614 
615  return addr;
616  }
617 
618  virtual Void onReceive()
619  {
620  }
621 
622  virtual Void onConnect()
623  {
624  }
625 
626  virtual Void onClose()
627  {
628  }
629 
630  virtual Void onError()
631  {
632  }
634 
635  private:
636  Void setOptions()
637  {
638  struct linger l;
639  l.l_onoff = 1;
640  l.l_linger = 0;
641  setsockopt(m_handle, SOL_SOCKET, SO_LINGER, (PSOCKETOPT)&l, sizeof(l));
642 
643  fcntl(m_handle, F_SETFL, O_NONBLOCK);
644 
645  getThread().registerSocket(this);
646  }
647 
648  Thread<TQueue,TMessage> &m_thread;
649 
650  SocketType m_socktype;
651  Int m_family;
652  Int m_type;
653  Int m_protocol;
654  Int m_error;
655 
656  Int m_handle;
657  };
658 
661 
663  namespace TCP
664  {
667 
669  template <class TQueue, class TMessage>
670  class Talker : public Base<TQueue,TMessage>
671  {
672  friend class Thread<TQueue,TMessage>;
673 
674  public:
678  Talker(Thread<TQueue,TMessage> &thread, Int bufsize=2097152)
679  : Base<TQueue,TMessage>(thread, SocketType::TcpTalker, AF_INET6, SOCK_STREAM, IPPROTO_TCP),
680  m_state( SocketState::Undefined ),
681  m_sending(False),
682  m_rbuf(bufsize),
683  m_wbuf(bufsize)
684  {
685  }
687  virtual ~Talker()
688  {
689  }
693  {
694  return m_local;
695  }
699  {
700  return m_local;
701  }
704  UShort getLocalPort() const
705  {
706  return m_local;
707  }
712  Talker &setLocal(cpStr addr, UShort port)
713  {
714  m_local.setAddress(addr,port);
715  return *this;
716  }
720  Talker &setLocal(const Address &addr)
721  {
722  m_local = addr;
723  return *this;
724  }
728  {
729  return m_remote;
730  }
734  {
735  return m_remote;
736  }
739  UShort getRemotePort() const
740  {
741  return m_remote;
742  }
747  Talker &setRemote(cpStr addr, UShort port)
748  {
749  m_remote.setAddress(addr,port);
750  return *this;
751  }
755  Talker &setRemote(const Address &addr)
756  {
757  m_remote = addr;
758  return *this;
759  }
761  Void connect()
762  {
763  if (getRemote().getFamily() != Family::INET && getRemote().getFamily() != Family::INET6)
764  throw TcpTalkerError_InvalidRemoteAddress();
765 
766  Int family = getRemote().getFamily() == Family::INET ? AF_INET : AF_INET6;
767  Int type = this->getType();
768  Int protocol = this->getProtocol();
769 
770  this->createSocket( family, type, protocol );
771 
772  int result = ::connect(this->getHandle(), getRemote().getSockAddr(), getRemote().getSockAddrLen());
773 
774  if (result == 0)
775  {
776  setState( SocketState::Connected );
777  onConnect();
778  }
779  else if (result == -1)
780  {
781  this->setError();
782  if (this->getError() != EINPROGRESS && this->getError() != EWOULDBLOCK)
783  throw TcpTalkerError_UnableToConnect();
784 
785  setState( SocketState::Connecting );
786 
787  this->getThread().bump();
788 
789 
790 
791  }
792  }
795  Void connect(Address &addr)
796  {
797  m_remote = addr;
798  connect();
799  }
803  Void connect(cpStr addr, UShort port)
804  {
805  m_remote.setAddress( addr, port );
806  connect();
807  }
811  {
812  return m_rbuf.used();
813  }
819  Int peek(pUChar dest, Int len)
820  {
821  return m_rbuf.peekData(dest, 0, len);
822  }
827  Int read(pUChar dest, Int len)
828  {
829  return m_rbuf.readData(dest, 0, len);
830  }
834  Void write(cpUChar src, Int len)
835  {
836  {
837  EMutexLock l(m_wbuf.getMutex());
838  m_wbuf.writeData((cpUChar)&len, 0, sizeof(len), True);
839  m_wbuf.writeData(src, 0, len, True);
840  }
841 
842  send();
843  }
846  Bool getSending()
847  {
848  return m_sending;
849  }
853  {
854  return m_state;
855  }
859  {
861  }
863  Void disconnect()
864  {
866  m_state = SocketState::Disconnected;
867  m_remote.clear();
868  }
870  virtual Void onReceive()
871  {
872  }
874  virtual Void onConnect()
875  {
876  }
878  virtual Void onClose()
879  {
880  this->close();
881  }
883  virtual Void onError()
884  {
885  }
886 
887  protected:
889  Talker &setAddresses()
890  {
893  return *this;
894  }
895 
896  Talker &setState(SocketState state)
897  {
898  m_state = state;
899  return *this;
900  }
901 
902  Int recv()
903  {
904  //
905  // modified this routine to use a buffer allocated from the stack
906  // instead of a single buffer allocated from the heap (which had
907  // been used for both reading and writing) to avoid between the
908  // read and write process
909  //
910  UChar buf[2048];
911  Int totalReceived = 0;
912 
913  while (True)
914  {
915  Int amtReceived = ::recv(this->getHandle(), (PSNDRCVBUFFER)buf, sizeof(buf), 0);
916  if (amtReceived > 0)
917  {
918  m_rbuf.writeData(buf, 0, amtReceived);
919  totalReceived += amtReceived;
920  }
921  else if (amtReceived == 0)
922  {
923  setState( SocketState::Disconnected );
924  break;
925  }
926  else
927  {
928  this->setError();
929  if (this->getError() == EWOULDBLOCK)
930  break;
931  throw TcpTalkerError_UnableToRecvData();
932  }
933  }
934 
935  return totalReceived;
936  }
937 
938  Void send(Bool override = False)
939  {
940  UChar buf[2048];
941 
942  EMutexLock lck(m_sendmtx, False);
943  if (!lck.acquire(False))
944  return;
945 
946  if (!override && m_sending)
947  return;
948 
949  if (m_wbuf.isEmpty())
950  {
951  m_sending = false;
952  return;
953  }
954 
955  if (getState() != SocketState::Connected)
956  {
957  // std::raise(SIGINT);
958  throw TcpTalkerError_InvalidSendState(getStateDescription());
959  }
960 
961  m_sending = true;
962  while (true)
963  {
964  if (m_wbuf.isEmpty())
965  {
966  m_sending = false;
967  break;
968  }
969 
970  Int packetLength = 0;
971  Int amtRead = m_wbuf.peekData((pUChar)&packetLength, 0, sizeof(packetLength));
972  if (amtRead != sizeof(packetLength))
973  {
974  EString msg;
975  msg.format("expected %d bytes, read %d bytes", sizeof(packetLength), amtRead);
976  throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
977  }
978 
979  Int sentLength = 0;
980  while (sentLength < packetLength)
981  {
982  Int sendLength = packetLength - sentLength;
983  if (sendLength > (Int)sizeof(buf))
984  sendLength = sizeof(buf);
985 
986  // get data from the circular buffer
987  amtRead = m_wbuf.peekData((pUChar)buf, sizeof(packetLength) + sentLength, sendLength);
988  if (amtRead != sendLength)
989  {
990  EString msg;
991  msg.format("expected %d bytes, read %d bytes", sendLength, amtRead);
992  throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
993  }
994 
995  // write the data to the socket
996  Int amtWritten = send(buf, sendLength);
997  if (amtWritten == -1) // EWOULDBLOCK
998  break;
999 
1000  sentLength += amtWritten;
1001  if (amtWritten != sendLength) // only part of the data was written
1002  break;
1003  }
1004 
1005  packetLength -= sentLength;
1006  m_wbuf.readData(NULL, 0, sentLength + (!packetLength ? sizeof(packetLength) : 0));
1007  if (packetLength > 0)
1008  {
1009  // need to update the buffer indicating the amount of the
1010  // message remaining in the circular buffer
1011  //fprintf(stderr,"wrote %d bytes of %d\n", sentLength, packetLength + sentLength);
1012  m_wbuf.modifyData((pUChar)&packetLength, 0, (Int)sizeof(packetLength));
1013  break;
1014  }
1015  }
1016  }
1018 
1019  private:
1020  Int send(pUChar pData, Int length)
1021  {
1022  Int result = ::send(this->getHandle(), (PSNDRCVBUFFER)pData, length, MSG_NOSIGNAL);
1023 
1024  if (result == -1)
1025  {
1026  this->setError();
1027  if (this->getError() != EWOULDBLOCK)
1028  throw TcpTalkerError_SendingPacket();
1029  }
1030 
1031  return result;
1032  }
1033 
1034  SocketState m_state;
1035  Address m_local;
1036  Address m_remote;
1037  EMutexPrivate m_sendmtx;
1038  Bool m_sending;
1039 
1040  ECircularBuffer m_rbuf;
1041  ECircularBuffer m_wbuf;
1042  };
1043 
1046 
1048  template <class TQueue, class TMessage>
1049  class Listener : public Base<TQueue,TMessage>
1050  {
1051  friend class Thread<TQueue,TMessage>;
1052 
1053  public:
1058  : Base<TQueue,TMessage>(thread, SocketType::TcpListener,
1059  family == Family::INET ? AF_INET : AF_INET6,
1060  SOCK_STREAM, IPPROTO_TCP),
1061  m_state( SocketState::Undefined ),
1062  m_backlog( -1 )
1063  {
1064  }
1069  Listener(Thread<TQueue,TMessage> &thread, UShort port, Family family = Family::INET6)
1070  : Base<TQueue,TMessage>(thread, SocketType::TcpListener,
1071  family == Family::INET ? AF_INET : AF_INET6,
1072  SOCK_STREAM, IPPROTO_TCP),
1073  m_state( SocketState::Undefined ),
1074  m_backlog( -1 )
1075  {
1076  setPort( port );
1077  }
1083  Listener(Thread<TQueue,TMessage> &thread, UShort port, Int backlog, Family family = Family::INET6)
1084  : Base<TQueue,TMessage>(thread, SocketType::TcpListener,
1085  family == Family::INET ? AF_INET : AF_INET6,
1086  SOCK_STREAM, IPPROTO_TCP),
1087  m_state( SocketState::Undefined ),
1088  m_backlog( backlog )
1089  {
1090  setPort( port );
1091  }
1093  virtual ~Listener()
1094  {
1095  }
1099  {
1100  return m_state;
1101  }
1105  {
1106  return m_local;
1107  }
1110  Void setPort(UShort port)
1111  {
1112  m_local = port;
1113  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1114  }
1117  UShort getPort()
1118  {
1119  return m_local;
1120  }
1123  Void setBacklog(Int backlog)
1124  {
1125  m_backlog = backlog;
1126  }
1130  {
1131  return m_backlog;
1132  }
1134  Void listen()
1135  {
1136  bind();
1137  if (::listen(this->getHandle(), getBacklog()) == EPC_SOCKET_ERROR)
1138  throw TcpListenerError_UnableToListen();
1139  setState( SocketState::Listening );
1140  }
1144  Void listen(UShort port, Int backlog)
1145  {
1146  setPort(port);
1147  setBacklog(backlog);
1148  listen();
1149  }
1153  virtual Talker<TQueue,TMessage> *createSocket(Thread<TQueue,TMessage> &thread) = 0;
1157  {
1158  return createSocket(this->getThread());
1159  }
1161  virtual Void onClose()
1162  {
1164  setState( SocketState::Undefined );
1165  }
1167  virtual Void onError()
1168  {
1169  }
1170 
1171  private:
1172  Void bind()
1173  {
1174  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1175 
1176  Base<TQueue,TMessage>::createSocket(this->getFamily(), this->getType(), this->getProtocol());
1177 
1178  int result = ::bind(this->getHandle(), getLocalAddress().getSockAddr(), getLocalAddress().getSockAddrLen());
1179  if (result == -1)
1180  {
1181  TcpListenerError_UnableToBindSocket err;
1182  this->close();
1183  throw err;
1184  }
1185  }
1186 
1187  Listener<TQueue,TMessage> &setState( SocketState state )
1188  {
1189  m_state = state;
1190  return *this;
1191  }
1192 
1193  SocketState m_state;
1194  Address m_local;
1195  Int m_backlog;
1196  };
1197  }
1198 
1201 
1203  template <class TQueue, class TMessage>
1204  class UDP : public Base<TQueue,TMessage>
1205  {
1206  friend class Thread<TQueue,TMessage>;
1207 
1208  public:
1212  UDP(Thread<TQueue,TMessage> &thread, Int bufsize=2097152)
1213  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1214  m_sending(False),
1215  m_rbuf(bufsize),
1216  m_wbuf(bufsize),
1217  m_rcvmsg(NULL),
1218  m_sndmsg(NULL)
1219  {
1220  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1221  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1222  }
1227  UDP(Thread<TQueue,TMessage> &thread, UShort port, Int bufsize=2097152)
1228  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1229  m_sending(False),
1230  m_rbuf(bufsize),
1231  m_wbuf(bufsize),
1232  m_rcvmsg(NULL),
1233  m_sndmsg(NULL)
1234  {
1235  m_local = port;
1236  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1237  this->bind();
1238  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1239  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1240  }
1246  UDP(Thread<TQueue,TMessage> &thread, cpStr ipaddr, UShort port, Int bufsize=2097152)
1247  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1248  m_sending(False),
1249  m_rbuf(bufsize),
1250  m_wbuf(bufsize),
1251  m_rcvmsg(NULL),
1252  m_sndmsg(NULL)
1253  {
1254  m_local.setAddress( ipaddr, port );
1255  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1256  this->bind();
1257  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1258  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1259  }
1264  UDP(Thread <TQueue,TMessage>&thread, Address &addr, Int bufsize=2097152)
1265  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1266  m_sending(False),
1267  m_rbuf(bufsize),
1268  m_wbuf(bufsize),
1269  m_rcvmsg(NULL),
1270  m_sndmsg(NULL)
1271  {
1272  m_local = addr;
1273  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1274  this->bind();
1275  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1276  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1277  }
1279  virtual ~UDP()
1280  {
1281  if (m_rcvmsg)
1282  delete [] reinterpret_cast<pUChar>(m_rcvmsg);
1283  if (m_sndmsg)
1284  delete [] reinterpret_cast<pUChar>(m_sndmsg);
1285  }
1289  {
1290  return m_local;
1291  }
1295  {
1296  return m_local;
1297  }
1300  UShort getLocalPort()
1301  {
1302  return m_local;
1303  }
1308  UDP &setLocal(cpStr addr, UShort port)
1309  {
1310  m_local.setAddress(addr,port);
1311  return *this;
1312  }
1316  UDP &setLocal(const Address &addr)
1317  {
1318  m_local = addr;
1319  return *this;
1320  }
1325  Void write(const Address &to, cpUChar src, Int len)
1326  {
1327  write(Address(), to, src, len);
1328  }
1334  Void write(const Address &from, const Address &to, cpUChar src, Int len)
1335  {
1336  UDPMessage msg;
1337  msg.total_length = sizeof(msg) + len;
1338  msg.data_length = len;
1339  msg.local = from;
1340  msg.remote = to;
1341 
1342  {
1343  EMutexLock l(m_wbuf.getMutex());
1344  m_wbuf.writeData(reinterpret_cast<cpUChar>(&msg), 0, sizeof(msg), True);
1345  m_wbuf.writeData(src, 0, len, True);
1346  }
1347 
1348  send();
1349  }
1352  Bool getSending()
1353  {
1354  return m_sending;
1355  }
1358  Void bind(UShort port)
1359  {
1360  if (this->getHandle() != EPC_INVALID_SOCKET)
1361  throw UdpError_AlreadyBound();
1362  m_local = port;
1363  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1364  bind();
1365  }
1369  Void bind(cpStr ipaddr, UShort port)
1370  {
1371  if (this->getHandle() != EPC_INVALID_SOCKET)
1372  throw UdpError_AlreadyBound();
1373  m_local.setAddress( ipaddr, port );
1374  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1375  bind();
1376  }
1379  Void bind(const Address &addr)
1380  {
1381  if (this->getHandle() != EPC_INVALID_SOCKET)
1382  throw UdpError_AlreadyBound();
1383  m_local = addr;
1384  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1385  bind();
1386  }
1388  Void disconnect()
1389  {
1391  m_local.clear();
1392  }
1398  virtual Void onReceive(const Address &from, const Address &to, cpUChar msg, Int len)
1399  {
1400  }
1402  virtual Void onError()
1403  {
1404  }
1405 
1406  protected:
1408  UDP &setAddresses()
1409  {
1411  }
1412 
1413  Int recv()
1414  {
1415  union ControlData
1416  {
1417  struct cmsghdr header;
1418  struct in_pktinfo pktinfo;
1419  struct in6_pktinfo pktinfo6;
1420  };
1421 
1422  Int totalReceived = 0;
1423  Address local;
1424  Address remote;
1425  // socklen_t addrlen;
1426  Int flags = 0;
1427  UChar cmsgdata[CMSG_SPACE(sizeof(ControlData))];
1428  struct iovec iov[1];
1429  struct msghdr mh;
1430 
1431  std::memset(&mh, 0, sizeof(mh));
1432  mh.msg_control = cmsgdata;
1433  mh.msg_controllen = sizeof(cmsgdata);
1434  mh.msg_iov = iov;
1435  mh.msg_iovlen = 1;
1436  iov[0].iov_base = m_rcvmsg->data;
1437  iov[0].iov_len = UPD_MAX_MSG_LENGTH;
1438  mh.msg_name = (pVoid)&remote.getStorage();
1439  mh.msg_namelen = sizeof(remote.getStorage());
1440 
1441  while (True)
1442  {
1443  // addrlen = addr.getSockAddrLen();
1444  // Int amtReceived = ::recvfrom(this->getHandle(), m_rcvmsg->data, UPD_MAX_MSG_LENGTH, flags, addr.getSockAddr(), &addrlen);
1445  Int amtReceived = ::recvmsg(this->getHandle(), &mh, flags);
1446  if (amtReceived >= 0)
1447  {
1448  m_rcvmsg->total_length = sizeof(UDPMessage) + amtReceived;
1449  m_rcvmsg->data_length = amtReceived;
1450  m_rcvmsg->local = getLocal();
1451  m_rcvmsg->remote = remote;
1452 
1453  for (struct cmsghdr *cp = CMSG_FIRSTHDR(&mh); cp != NULL; cp = CMSG_NXTHDR(&mh,cp))
1454  {
1455  if (cp->cmsg_level == IPPROTO_IP && cp->cmsg_type == IP_PKTINFO)
1456  {
1457  struct in_pktinfo *p = (struct in_pktinfo *)CMSG_DATA(cp);
1458  sockaddr_in ipv4;
1459  std::memset(&ipv4, 0, sizeof(ipv4));
1460  ipv4.sin_family = AF_INET;
1461  ipv4.sin_port = ((struct sockaddr_in&)getLocal().getStorage()).sin_port;
1462  ipv4.sin_addr.s_addr = p->ipi_spec_dst.s_addr;
1463  m_rcvmsg->local = ipv4;
1464  break;
1465  }
1466  if (cp->cmsg_level == IPPROTO_IPV6 && cp->cmsg_type == IPV6_PKTINFO)
1467  {
1468  struct in6_pktinfo *p = (struct in6_pktinfo *)CMSG_DATA(cp);
1469  sockaddr_in6 ipv6;
1470  std::memset(&ipv6, 0, sizeof(ipv6));
1471  ipv6.sin6_family = AF_INET6;
1472  ipv6.sin6_port = ((struct sockaddr_in6&)getLocal().getStorage()).sin6_port;
1473  ipv6.sin6_flowinfo = 0;
1474  ipv6.sin6_scope_id = 0;
1475  ipv6.sin6_addr = p->ipi6_addr;
1476  m_rcvmsg->local = ipv6;
1477  break;
1478  }
1479  }
1480 
1481  m_rbuf.writeData( reinterpret_cast<pUChar>(m_rcvmsg), 0, m_rcvmsg->total_length);
1482  totalReceived += amtReceived;
1483  }
1484  else
1485  {
1486  this->setError();
1487  if (this->getError() == EWOULDBLOCK)
1488  break;
1489  throw UdpError_UnableToRecvData();
1490  }
1491  }
1492 
1493  return totalReceived;
1494  }
1495 
1496  Void send(Bool override = False)
1497  {
1498  EMutexLock lck(m_sendmtx, False);
1499  if (!lck.acquire(False))
1500  return;
1501 
1502  if (!override && m_sending)
1503  return;
1504 
1505  if (m_wbuf.isEmpty())
1506  {
1507  m_sending = false;
1508  return;
1509  }
1510 
1511  m_sending = true;
1512  while (true)
1513  {
1514  if (m_wbuf.isEmpty())
1515  {
1516  m_sending = false;
1517  break;
1518  }
1519 
1520  size_t packetLength = 0;
1521  Int amtRead = m_wbuf.peekData(reinterpret_cast<pUChar>(&packetLength), 0, sizeof(packetLength));
1522  if ((size_t)amtRead != sizeof(packetLength))
1523  {
1524  EString msg;
1525  msg.format("expected %d bytes, read %d bytes", sizeof(packetLength), amtRead);
1526  throw UdpError_ReadingWritePacketLength(msg.c_str());
1527  }
1528 
1529  amtRead = m_wbuf.peekData(reinterpret_cast<pUChar>(m_sndmsg), 0, packetLength);
1530  if ((size_t)amtRead != packetLength)
1531  {
1532  EString msg;
1533  msg.format("expected %d bytes, read %d bytes", sizeof(packetLength), amtRead);
1534  throw UdpError_ReadingWritePacketLength(msg.c_str());
1535  }
1536 
1537  if (send(m_sndmsg->local, m_sndmsg->remote, m_sndmsg->data, m_sndmsg->data_length) == -1)
1538  {
1539  // unable to send this message so get out, it will be sent when the socket is ready for writing
1540  break;
1541  }
1542 
1543  m_wbuf.readData(NULL, 0, m_sndmsg->total_length);
1544  }
1545  }
1547 
1548  private:
1549  #pragma pack(push,1)
1550  struct UDPMessage
1551  {
1552  size_t total_length;
1553  size_t data_length;
1554  Address local;
1555  Address remote;
1556  UChar data[0];
1557  };
1558  #pragma pack(pop)
1559 
1560  Void onConnect()
1561  {
1562  }
1563 
1564  Void onClose()
1565  {
1566  }
1567 
1568  Void onReceive()
1569  {
1570  while (readMessage(*m_rcvmsg))
1571  {
1572  onReceive(m_rcvmsg->remote, m_rcvmsg->local, m_rcvmsg->data, m_rcvmsg->data_length);
1573  }
1574  }
1575 
1576  Void bind()
1577  {
1578  if (this->getHandle() != EPC_INVALID_SOCKET)
1579  throw UdpError_AlreadyBound();
1580 
1581  Base<TQueue,TMessage>::createSocket(this->getFamily(), this->getType(), this->getProtocol());
1582 
1583  int result;
1584  int sockopt = 1;
1585  result = setsockopt(this->getHandle(), IPPROTO_IP, IP_PKTINFO, &sockopt, sizeof(sockopt));
1586  if (result == -1)
1587  {
1588  UdpError_UnableToBindSocket err;
1589  err.appendTextf(" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1590  this->close();
1591  throw err;
1592  }
1593  if (getLocal().getFamily() == Family::INET6)
1594  {
1595  result = setsockopt(this->getHandle(), IPPROTO_IPV6, IPV6_RECVPKTINFO, &sockopt, sizeof(sockopt));
1596  if (result == -1)
1597  {
1598  UdpError_UnableToBindSocket err;
1599  err.appendTextf(" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1600  this->close();
1601  throw err;
1602  }
1603  }
1604 
1605  result = ::bind(this->getHandle(), getLocal().getSockAddr(), getLocal().getSockAddrLen());
1606  if (result == -1)
1607  {
1608  UdpError_UnableToBindSocket err;
1609  err.appendTextf(" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1610  this->close();
1611  throw err;
1612  }
1613  }
1614 
1615  Bool readMessage(UDPMessage &msg)
1616  {
1617  if (m_rbuf.peekData(reinterpret_cast<pUChar>(&msg), 0, sizeof(msg)))
1618  {
1619  m_rbuf.readData(reinterpret_cast<pUChar>(&msg), 0, msg.total_length);
1620  return True;
1621  }
1622 
1623  return False;
1624  }
1625 
1626  Int send(Address &from, Address &to, cpVoid pData, Int length)
1627  {
1628  Int flags = MSG_NOSIGNAL;
1629  Int result = sendto(this->getHandle(), pData, length, flags, to.getSockAddr(), to.getSockAddrLen());
1630 
1631  if (result == -1)
1632  {
1633  this->setError();
1634  if (this->getError() != EMSGSIZE)
1635  throw UdpError_SendingPacket();
1636  }
1637 
1638  return result;
1639  }
1640 
1641  Address m_local;
1642  EMutexPrivate m_sendmtx;
1643  Bool m_sending;
1644 
1645  ECircularBuffer m_rbuf;
1646  ECircularBuffer m_wbuf;
1647  UDPMessage *m_rcvmsg;
1648  UDPMessage *m_sndmsg;
1649  };
1650 
1653 
1655  template <class TQueue, class TMessage>
1656  class Thread : public EThreadEvent<TQueue,TMessage>
1657  {
1658  friend class TCP::Talker<TQueue,TMessage>;
1659  friend class TCP::Listener<TQueue,TMessage>;
1660  friend class UDP<TQueue,TMessage>;
1661 
1662  public:
1665  {
1666  int *pipefd = this->getBumpPipe();
1667 
1668  m_error = 0;
1669 
1670  int result = pipe(pipefd);
1671  if (result == -1)
1672  throw ThreadError_UnableToOpenPipe();
1673  fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
1674 
1675  FD_ZERO(&m_master);
1676 
1677  getMaxFileDescriptor(True);
1678  }
1680  virtual ~Thread()
1681  {
1682  int *pipefd = this->getBumpPipe();
1683  close(pipefd[0]);
1684  close(pipefd[1]);
1685  }
1689  {
1690  m_socketmap.insert(std::make_pair(socket->getHandle(), socket));
1691  FD_SET(socket->getHandle(), &m_master);
1692  getMaxFileDescriptor(True);
1693  bump();
1694  }
1698  {
1699  if (m_socketmap.erase(socket->getHandle()))
1700  {
1701  FD_CLR(socket->getHandle(), &m_master);
1702  getMaxFileDescriptor(True);
1703  bump();
1704  }
1705  }
1707  Int getError() { return m_error; }
1708 
1709  protected:
1711  virtual Void pumpMessages()
1712  {
1713  int maxfd, fd, fdcnt;
1714  fd_set readworking, writeworking, errorworking;
1715 
1716  onInit();
1717 
1718  while (true)
1719  {
1720  {
1721  memcpy(&readworking, &m_master, sizeof(m_master));
1722  FD_SET(this->getBumpPipe()[0], &readworking);
1723 
1724  FD_ZERO(&writeworking);
1725  for (auto it = m_socketmap.begin(); it != m_socketmap.end(); it++)
1726  {
1727  Base<TQueue,TMessage> *pSocket = it->second;
1728  if ((pSocket->getSocketType() == SocketType::TcpTalker &&
1729  ((static_cast<TCP::Talker<TQueue,TMessage>*>(pSocket))->getSending() ||
1730  (static_cast<TCP::Talker<TQueue,TMessage>*>(pSocket))->getState() == SocketState::Connecting)) ||
1731  (pSocket->getSocketType() == SocketType::Udp && (static_cast<UDP<TQueue,TMessage>*>(pSocket))->getSending()))
1732  {
1733  FD_SET(it->first, &writeworking);
1734  }
1735  }
1736 
1737  memcpy(&errorworking, &m_master, sizeof(m_master));
1738 
1739  maxfd = getMaxFileDescriptor() + 1;
1740  }
1741 
1742  fdcnt = select(maxfd, &readworking, &writeworking, &errorworking, NULL);
1743  if (fdcnt == -1)
1744  {
1745  if (errno == EINTR || errno == 514 /*ERESTARTNOHAND*/)
1746  {
1747  if (!pumpMessagesInternal())
1748  break;
1749  }
1750  else
1751  {
1752  onError();
1753  }
1754  continue;
1755  }
1756 
1758  // Process any thread messages
1760  if (FD_ISSET(this->getBumpPipe()[0], &readworking))
1761  {
1762  --fdcnt;
1763  if (!pumpMessagesInternal())
1764  break;
1765  }
1766 
1768  // Process any socket messages
1770  for (fd = 0; fd < maxfd && fdcnt > 0; fd++)
1771  {
1772  if (FD_ISSET(fd, &errorworking))
1773  {
1774  auto socket_it = m_socketmap.find(fd);
1775  if (socket_it != m_socketmap.end())
1776  {
1777  Base<TQueue,TMessage> *pSocket = socket_it->second;
1778  if (pSocket)
1779  {
1780  int error;
1781  socklen_t optlen = sizeof(error);
1782  getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
1783  pSocket->setError(error);
1784  processSelectError(pSocket);
1785  }
1786  }
1787  fdcnt--;
1788  }
1789 
1790  Bool result = True;
1791 
1792  if (fdcnt > 0 && FD_ISSET(fd, &readworking))
1793  {
1794  auto socket_it = m_socketmap.find(fd);
1795  if (socket_it != m_socketmap.end())
1796  {
1797  Base<TQueue,TMessage> *pSocket = socket_it->second;
1798  if (pSocket)
1799  result = processSelectRead(pSocket);
1800  }
1801  fdcnt--;
1802  }
1803 
1804  if (fdcnt > 0 && FD_ISSET(fd, &writeworking))
1805  {
1806  auto socket_it = m_socketmap.find(fd);
1807  if (result && socket_it != m_socketmap.end())
1808  {
1809  Base<TQueue,TMessage> *pSocket = socket_it->second;
1810  if (pSocket)
1811  processSelectWrite(pSocket);
1812  }
1813  fdcnt--;
1814  }
1815  }
1816 
1818  // Process any thread messages that may have been posted while
1819  // processing the socket events
1821  if (!pumpMessagesInternal())
1822  break;
1823 
1824  clearBump();
1825  }
1826 
1827  while (true)
1828  {
1829  auto it = m_socketmap.begin();
1830  if (it == m_socketmap.end())
1831  break;
1832  Base<TQueue,TMessage> *psocket = it->second;
1833  m_socketmap.erase(it);
1834  delete psocket;
1835  }
1836  }
1837 
1838  virtual Void errorHandler(EError &err, Base<TQueue,TMessage> *psocket) = 0;
1839  virtual Void onSocketClosed(Base<TQueue,TMessage> *psocket)
1840  {
1841  }
1842  virtual Void onSocketError(Base<TQueue,TMessage> *psocket)
1843  {
1844  }
1845 
1846  virtual Void onInit()
1847  {
1849  }
1850 
1851  virtual Void onQuit()
1852  {
1854  }
1855 
1856  virtual Void onMessageQueued(const TMessage &msg)
1857  {
1859  bump();
1860  }
1861 
1862  virtual Void onError()
1863  {
1864  }
1865 
1866  Void bump()
1867  {
1868  if (write(this->getBumpPipe()[1], "~", 1) == -1)
1869  throw ThreadError_UnableToWritePipe();
1870  }
1871 
1872  Void clearBump()
1873  {
1874  char buf[1];
1875  while (true)
1876  {
1877  if (read(this->getBumpPipe()[0], buf, 1) == -1)
1878  {
1879  if (errno == EWOULDBLOCK)
1880  break;
1881  throw ThreadError_UnableToReadPipe();
1882  }
1883  }
1884  }
1885 
1886  virtual const typename EThreadEvent<TQueue,TMessage>::msgmap_t *GetMessageMap() const
1887  {
1888  return GetThisMessageMap();
1889  }
1890 
1891  static const typename EThreadEvent<TQueue,TMessage>::msgmap_t *GetThisMessageMap()
1892  {
1893  static const typename EThreadEvent<TQueue,TMessage>::msgentry_t _msgEntries[] =
1894  {
1895  {0, (typename EThreadEvent<TQueue,TMessage>::msgfxn_t)NULL}
1896  };
1897  static const typename EThreadEvent<TQueue,TMessage>::msgmap_t msgMap =
1899  return &msgMap;
1900  }
1902 
1903  private:
1904  Void setError(Int error) { m_error = error; }
1905 
1906  Bool pumpMessagesInternal()
1907  {
1908  TMessage msg;
1909 
1910  try
1911  {
1912  while (True)
1913  {
1914  if (!EThreadEvent<TQueue,TMessage>::pumpMessage(msg, false) || msg.getMessageId() == EM_QUIT)
1915  break;
1916  }
1917  }
1918  catch (...)
1919  {
1920  throw;
1921  }
1922 
1924  // get out if the thread has been told to stop
1926  //return (keepGoing() && msg.getMsgId() != EM_QUIT);
1927  return msg.getMessageId() != EM_QUIT;
1928  }
1929 
1930  Void processSelectAccept(Base<TQueue,TMessage> *psocket)
1931  {
1932  if (psocket->getSocketType() == SocketType::TcpListener)
1933  {
1934  bool more = true;
1935  while (more)
1936  {
1937  try
1938  {
1939  struct sockaddr ipaddr;
1940  socklen_t ipaddrlen = sizeof(ipaddr);
1941 
1942  EPC_SOCKET handle = ::accept((static_cast<TCP::Listener<TQueue,TMessage>*>(psocket))->getHandle(), &ipaddr, &ipaddrlen);
1943  if (handle == EPC_INVALID_SOCKET)
1944  {
1945  Int err = errno;
1946  if (err == EWOULDBLOCK)
1947  break;
1948  throw TcpListenerError_UnableToAcceptSocket();
1949  }
1950 
1951  TCP::Talker<TQueue,TMessage> *pnewsocket = (static_cast<TCP::Listener<TQueue,TMessage>*>(psocket))->createSocket(*this);
1952  if (pnewsocket)
1953  {
1954  pnewsocket->setHandle(handle);
1955  pnewsocket->setAddresses();
1956  pnewsocket->setState( SocketState::Connected );
1957  registerSocket(pnewsocket);
1958  pnewsocket->onConnect();
1959  }
1960  else
1961  {
1962  // the connection is being refused, so close the handle
1963  close(handle);
1964  }
1965  }
1966  catch (EError &err)
1967  {
1968  if (err.getLastOsError() != EWOULDBLOCK)
1969  {
1970  //printf("errorHandler() 1 %d\n", err->getLastOsError());
1971  errorHandler(err, NULL);
1972  }
1973  more = false;
1974  }
1975  }
1976  }
1977  }
1978 
1979  Void processSelectConnect(Base<TQueue,TMessage> *psocket)
1980  {
1981  if (psocket->getSocketType() == SocketType::TcpTalker)
1982  ((TCP::Talker<TQueue,TMessage>*)psocket)->onConnect();
1983  }
1984 
1985  Bool processSelectRead(Base<TQueue,TMessage> *psocket)
1986  {
1987  if (psocket->getSocketType() == SocketType::TcpListener)
1988  {
1989  processSelectAccept(psocket);
1990  }
1991  else if (psocket->getSocketType() == SocketType::TcpTalker)
1992  {
1993  switch ((static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getState())
1994  {
1996  {
1997  Int result, socketError;
1998  socklen_t socketErrorLen = sizeof(socketError);
1999 
2000  result = getsockopt(psocket->getHandle(), SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen);
2001 
2002  try
2003  {
2004  if (result == -1 || socketError != 0)
2005  {
2006  psocket->setError(socketError);
2007  TcpTalkerError_UnableToConnect ex;
2008  ex.appendTextf(" ESocket::Thread<TQueue,TMessage>::processSelectRead() socketError=%d (%s)",
2009  psocket->getError(), psocket->getErrorDescription());
2010  throw ex;
2011  }
2012 
2013  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Connected );
2014  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setAddresses();
2015  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->onConnect();
2016  }
2017  catch (EError &ex)
2018  {
2019  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Undefined );
2020  processSelectError(psocket);
2021  return False;
2022  }
2023  // fall thru
2024  }
2026  {
2027  while (true)
2028  {
2029  try
2030  {
2031  Int amtRead = (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->recv();
2032  if (amtRead <= 0)
2033  break;
2034  }
2035  catch (EError &err)
2036  {
2037  //printf("errorHandler() 2\n");
2038  errorHandler(err, psocket);
2039  }
2040  }
2041 
2042  ((TCP::Talker<TQueue,TMessage>*)psocket)->onReceive();
2043 
2044  if ((static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getState() == SocketState::Disconnected)
2045  processSelectClose(psocket);
2046 
2047  break;
2048  }
2049  default:
2050  {
2051  // throw TcpTalkerError_InvalidReadState(
2052  // static_cast<TCP::Talker<TQueue,TMessage>*>(psocket)->getStateDescription());
2053  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Undefined );
2054  processSelectError(psocket);
2055  return False;
2056  }
2057  }
2058  }
2059  else if (psocket->getSocketType() == SocketType::Udp)
2060  {
2061  while (true)
2062  {
2063  try
2064  {
2065  Int amtRead = (static_cast<UDP<TQueue,TMessage>*>(psocket))->recv();
2066  if (amtRead <= 0)
2067  break;
2068  }
2069  catch (EError &err)
2070  {
2071  //printf("errorHandler() 2\n");
2072  errorHandler(err, psocket);
2073  }
2074  }
2075 
2076  (reinterpret_cast<UDP<TQueue,TMessage>*>(psocket))->onReceive();
2077  }
2078 
2079  return True;
2080  }
2081 
2082  Void processSelectWrite(Base<TQueue,TMessage> *psocket)
2083  {
2084  if (psocket->getSocketType() == SocketType::TcpTalker)
2085  {
2086  switch ((static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getState())
2087  {
2089  {
2090  Int result, socketError;
2091  socklen_t socketErrorLen = sizeof(socketError);
2092 
2093  result = getsockopt(psocket->getHandle(), SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen);
2094 
2095  try
2096  {
2097  if (result == -1 || socketError != 0)
2098  {
2099  psocket->setError(socketError);
2100  TcpTalkerError_UnableToConnect ex;
2101  ex.appendTextf(" ESocket::Thread<TQueue,TMessage>::processSelectWrite() socketError=%d (%s)",
2102  psocket->getError(), psocket->getErrorDescription());
2103  throw ex;
2104  }
2105 
2106  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Connected );
2107  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setAddresses();
2108  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->onConnect();
2109  }
2110  catch (EError &ex)
2111  {
2112  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Undefined );
2113  processSelectError(psocket);
2114  return;
2115  }
2116  break;
2117  }
2119  {
2120  try
2121  {
2122  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->send(True);
2123  }
2124  catch (EError &err)
2125  {
2126  // errorHandler(err, psocket);
2127  processSelectError(psocket);
2128  }
2129  break;
2130  }
2131  default:
2132  {
2133  // throw TcpTalkerError_InvalidWriteState(
2134  // (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getStateDescription());
2135  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Undefined );
2136  processSelectError(psocket);
2137  return;
2138  }
2139  }
2140  }
2141  else if (psocket->getSocketType() == SocketType::Udp)
2142  {
2143  try
2144  {
2145  (static_cast<UDP<TQueue,TMessage>*>(psocket))->send(True);
2146  }
2147  catch (EError &err)
2148  {
2149  //printf("errorHandler() 3\n");
2150  errorHandler(err, psocket);
2151  }
2152  }
2153  }
2154 
2155  Void processSelectError(Base<TQueue,TMessage> *psocket)
2156  {
2157  psocket->onError();
2158  onSocketError(psocket);
2159  }
2160 
2161  Void processSelectClose(Base<TQueue,TMessage> *psocket)
2162  {
2163  psocket->onClose();
2164  onSocketClosed(psocket);
2165  }
2166 
2167  int getMaxFileDescriptor(Bool calc=False)
2168  {
2169  if (calc)
2170  {
2171  m_maxfd = this->getBumpPipe()[0];
2172 
2173  for (auto entry : m_socketmap)
2174  if (entry.second->getHandle() > m_maxfd)
2175  m_maxfd = entry.second->getHandle();
2176  }
2177 
2178  return m_maxfd;
2179  }
2180 
2181  Int m_error;
2182  std::unordered_map<Int,Base<TQueue,TMessage>*> m_socketmap;
2183  fd_set m_master;
2184  Int m_maxfd;
2185  };
2186 
2191  namespace TCP
2192  {
2197  }
2200 }
2201 
2202 namespace std
2203 {
2204  template<>
2205  struct hash<ESocket::Address>
2206  {
2207  std::size_t operator()(const ESocket::Address &addr) const noexcept
2208  {
2209  size_t addrhash = EMurmurHash64::getHash(reinterpret_cast<cpUChar>(addr.getSockAddr()), addr.getSockAddrLen());
2210  size_t porthash = std::hash<UShort>{}(addr.getPort());
2211  return EMurmurHash64::combine(addrhash, porthash);
2212  }
2213  };
2214 }
2215 
2216 #endif // #define __esocket_h_included
Address(const Address &addr)
Copy constructor.
Definition: esocket.h:173
Performs static initialization associated with any EpcTools class that requires it. Initialization and uninitialization is performed by EpcTools::Initialize() and EpcTools::UnInitialize().
socklen_t getSockAddrLen() const
retrieves the length of the current socket address.
Definition: esocket.h:228
virtual ~Thread()
Class destructor.
Definition: esocket.h:1680
EString getAddress() const
Retrieves the printable IP address.
Definition: esocket.h:207
const struct sockaddr_storage & getSockAddrStorage() const
Retrieves a sockaddr pointer to the socket address.
Definition: esocket.h:214
Encapsulates and extends a std::string object.
#define True
True.
Definition: ebase.h:25
UShort getPort() const
Retrievs the port.
Definition: esocket.h:210
Void setBacklog(Int backlog)
Assigns the maximum number of "unaccepted" connections.
Definition: esocket.h:1123
Implements a circular buffer.
virtual Void onReceive(const Address &from, const Address &to, cpUChar msg, Int len)
Called for each message that is received.
Definition: esocket.h:1398
Address & getLocal()
Retrieves the local socket address.
Definition: esocket.h:692
const struct sockaddr_in6 & getInet6() const
Retrieves a reference to this address as an IPv6 address.
Definition: esocket.h:296
#define DECLARE_ERROR_ADVANCED4(__e__)
Declares exception class derived from EError with an const char* as a constructor parameter and devel...
Definition: eerror.h:85
Macros for various standard C library functions and standard includes.
Int getHandle()
Retrieves the socket file handle.
Definition: esocket.h:523
Address & operator=(const sockaddr_in &addr)
Assignment operator.
Definition: esocket.h:249
Int getType()
Retrieves the socket type.
Definition: esocket.h:478
Talker< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > TalkerPrivate
Definition: esocket.h:2194
Thread< EThreadQueuePublic< EThreadMessage >, EThreadMessage > ThreadPublic
Definition: esocket.h:2189
Listener(Thread< TQueue, TMessage > &thread, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1057
Void setPort(UShort port)
Assigns the port to listen for incoming connections on.
Definition: esocket.h:1110
EString getLocalAddress() const
Retrieves the IP address associated with the local socket.
Definition: esocket.h:698
Listener< EThreadQueuePublic< EThreadMessage >, EThreadMessage > ListenerPublic
Definition: esocket.h:2195
base class for EThreadPrivate and EThreadPublic
Definition: etevent.h:1062
cpStr getErrorDescription()
Definition: esocket.h:497
Listens for incoming TCP/IP connections.
Definition: esocket.h:1049
SocketState
The socket connection state.
Definition: esocket.h:92
EString getLocalAddress()
Retrieves the IP address for this socket.
Definition: esocket.h:1294
Talker(Thread< TQueue, TMessage > &thread, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:678
static Family getFamily(cpStr addr)
Determines the address family.
Definition: esocket.h:420
Address & operator=(UShort port)
Assigns a port value (allowing IPADDR_ANY).
Definition: esocket.h:265
UDP(Thread< TQueue, TMessage > &thread, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1212
Talker & setRemote(cpStr addr, UShort port)
Assigns the remote socket address.
Definition: esocket.h:747
Listener(Thread< TQueue, TMessage > &thread, UShort port, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1069
Int getError()
Called when an error is detected.
Definition: esocket.h:1707
Void disconnect()
Disconnects this socket.
Definition: esocket.h:863
SocketType
Defines the possible socket types.
Definition: esocket.h:79
virtual Void onMessageQueued(const TMessage &msg)
Called when an event message is queued.
Definition: etevent.h:1254
STL namespace.
Address & setAddress(const struct in_addr &addr, UShort port)
Assigns the IPv4 socket address.
Definition: esocket.h:334
virtual Void onClose()
Called when this socket is closed.
Definition: esocket.h:1161
a TCP talker socket
Thread< TQueue, TMessage > & getThread()
Retrieves the socket thread that this socket is associated with.
Definition: esocket.h:457
Bool getSending()
Retrieves indication if this socket is in the process of sending data.
Definition: esocket.h:846
Thread< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > ThreadPrivate
Definition: esocket.h:2190
A TCP socket class capabile of sending and receiving data.
Definition: esocket.h:670
Int getFamily()
Retrieves the address family.
Definition: esocket.h:471
Address & operator=(const Address &addr)
Assignment operator.
Definition: esocket.h:240
Void bind(UShort port)
Binds this socket to a local port and IPADDR_ANY.
Definition: esocket.h:1358
const struct sockaddr_in & getInet() const
Retrieves a reference to this address as an IPv4 address.
Definition: esocket.h:287
Void unregisterSocket(Base< TQueue, TMessage > *socket)
Called by the framework to unregister a Base derived socket object with this thread.
Definition: esocket.h:1697
Base< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > BasePrivate
Definition: esocket.h:2188
Listener(Thread< TQueue, TMessage > &thread, UShort port, Int backlog, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1083
cpStr getStateDescription(SocketState state)
Retrieves the description of the connection state.
Definition: esocket.h:530
Void listen()
Starts listening for incoming connections.
Definition: esocket.h:1134
#define False
False.
Definition: ebase.h:27
Void registerSocket(Base< TQueue, TMessage > *socket)
Called by the framework to register a Base derived socket object with this thread.
Definition: esocket.h:1688
Address(struct sockaddr_in &addr)
Class constructor.
Definition: esocket.h:167
SocketState getState()
Retrieves the current socket state.
Definition: esocket.h:1098
Address & setAddress(const struct in6_addr &addr, UShort port)
Assigns the IPv6 socket address.
Definition: esocket.h:347
Address(cpStr addr, UShort port)
Class constructor.
Definition: esocket.h:156
UDP< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > UdpPrivate
Definition: esocket.h:2199
Void disconnect()
Disconnects the socket.
Definition: esocket.h:1388
UShort getRemotePort() const
Retrieves the port associated with the remote socket.
Definition: esocket.h:739
Void connect(cpStr addr, UShort port)
Initiates an IP connection.
Definition: esocket.h:803
Void close()
Closes this socket.
Definition: esocket.h:504
Bool getSending()
Retrieves indication if this socket is in the process of sending data.
Definition: esocket.h:1352
virtual Void onError()
Called when an error is detected on this socket.
Definition: esocket.h:1167
SocketType getSocketType()
Retrieves the socket type.
Definition: esocket.h:464
Void connect(Address &addr)
Initiates an IP connection.
Definition: esocket.h:795
virtual Void onConnect()
Called when a connection has been established.
Definition: esocket.h:874
cpStr getStateDescription()
Retrieves the description of the current connection state.
Definition: esocket.h:858
virtual Void disconnect()
Disconnects this socket.
Definition: esocket.h:511
Thread()
Default constructor.
Definition: esocket.h:1664
Talker & setLocal(cpStr addr, UShort port)
Assigns the local socket address.
Definition: esocket.h:712
const Address & getLocal()
Retrieves the local address for this socket.
Definition: esocket.h:1288
Address(const struct in_addr &addr, UShort port)
Class constructor.
Definition: esocket.h:160
UDP(Thread< TQueue, TMessage > &thread, Address &addr, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1264
Talker & setLocal(const Address &addr)
Assigns the local socket address.
Definition: esocket.h:720
Defines base class for exceptions and declaration helper macros.
A UDP socket class capabile of sending and receiving data.
Definition: esocket.h:1204
Address & operator=(const sockaddr_in6 &addr)
Assignment operator.
Definition: esocket.h:257
An event message that is to be sent to a thread.
Definition: etevent.h:264
The socket thread base class. An event based thread class capable of surfacing socket events...
Definition: esocket.h:1656
virtual Void onError()
Called when an error is detected on this socket.
Definition: esocket.h:1402
Listener< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > ListenerPrivate
Definition: esocket.h:2196
Address & getLocalAddress()
Retrieves the local listening address.
Definition: esocket.h:1104
Base< EThreadQueuePublic< EThreadMessage >, EThreadMessage > BasePublic
Definition: esocket.h:2187
Int read(pUChar dest, Int len)
Rtrieves the specified number of bytes from the receive buffer.
Definition: esocket.h:827
The base class for exceptions derived from std::exception.
Definition: eerror.h:94
Hash calculation functions for strings and byte arrays.
struct sockaddr * getSockAddr() const
Retrieves a sockaddr_storage reference to the socket address.
Definition: esocket.h:221
Talker< TQueue, TMessage > * createSocket()
Called to create a talking socket when a incoming connection is received.
Definition: esocket.h:1156
#define EM_QUIT
thread quit event
Definition: etevent.h:796
Void bind(cpStr ipaddr, UShort port)
Binds this socket to a local address.
Definition: esocket.h:1369
Address(struct sockaddr_in6 &addr)
Class constructor.
Definition: esocket.h:170
Int getProtocol()
Retrieves the protocol.
Definition: esocket.h:485
Implements a circular buffer.
Definition: ecbuf.h:45
std::size_t operator()(const ESocket::Address &addr) const noexcept
Definition: esocket.h:2207
UDP(Thread< TQueue, TMessage > &thread, cpStr ipaddr, UShort port, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1246
static size_t combine(size_t h1, size_t h2)
Combines 2 64-bit hash values.
Definition: ehash.h:300
The base socket class.
Definition: esocket.h:441
Address(const struct in6_addr &addr, UShort port)
Class constructor.
Definition: esocket.h:164
Void write(cpUChar src, Int len)
Writes data to the socket. This is a thread safe method.
Definition: esocket.h:834
virtual Void onError()
Called when an error is detected on the socket.
Definition: esocket.h:883
virtual ~Base()
Virtual class destructor.
Definition: esocket.h:450
Void write(const Address &from, const Address &to, cpUChar src, Int len)
Sends data to the specified recipient address.
Definition: esocket.h:1334
Acquires and holds a lock on the specified mutex.
Definition: esynch.h:133
Int getError()
Retrieves the last error value.
Definition: esocket.h:492
Bool acquire(Bool wait=True)
Manually acquires a lock on the mutex.
Definition: esynch.h:155
a TCP listener socket
socket is disconnected
IPv4 address.
Address()
Default constructor.
Definition: esocket.h:152
EString getRemoteAddress() const
Retrieves the IP address associated with the remote socket.
Definition: esocket.h:733
virtual ~Talker()
Class destrucor.
Definition: esocket.h:687
virtual Void onClose()
Called when the socket has been closed.
Definition: esocket.h:878
virtual Void onInit()
Called in the context of the thread prior to processing teh first event message.
Definition: etevent.h:1190
Void write(const Address &to, cpUChar src, Int len)
Sends data to the specified recipient address.
Definition: esocket.h:1325
UShort getLocalPort() const
Retrieves the port associated with the local socket.
Definition: esocket.h:704
UShort getPort()
Retrieves the port being listened on for incoming connections.
Definition: esocket.h:1117
UDP & setLocal(const Address &addr)
Assigns the socket address for this socket.
Definition: esocket.h:1316
UDP(Thread< TQueue, TMessage > &thread, UShort port, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1227
Address & setAddress(const sockaddr_in6 &addr)
Definition: esocket.h:399
Address & setAddress(const sockaddr_in &addr)
Definition: esocket.h:390
virtual Void onReceive()
Called when data has been received.
Definition: esocket.h:870
A private mutex (the mutex data is allocated from either the heap or stack).
Definition: esynch.h:175
const struct sockaddr_storage & getStorage() const
Retrieves the sockaddr_storage.
Definition: esocket.h:280
static size_t getHash(cChar val, size_t seed=0xc70f6907UL)
Calculates a 64-bit murmur hash for the value.
Definition: ehash.h:273
Talker & setRemote(const Address &addr)
Assigns the remote socket address.
Definition: esocket.h:755
EString & format(cpChar pszFormat,...)
Sets the value to the string using a "printf" style format string and arguments.
Definition: estring.cpp:38
UDP< EThreadQueuePublic< EThreadMessage >, EThreadMessage > UdpPublic
Definition: esocket.h:2198
UShort getLocalPort()
Retrieves the port for this socket.
Definition: esocket.h:1300
Address & setAddress(UShort port, Family fam=Family::INET6)
Assigns the socket address.
Definition: esocket.h:362
#define DECLARE_ERROR_ADVANCED(__e__)
Declares exception class derived from EError with no constructor parameters and developer defined con...
Definition: eerror.h:61
Address & clear()
Clears this address.
Definition: esocket.h:412
String class.
Definition: estring.h:31
Encapsulates a sockaddr_storage structure that represents a socket address.
Definition: esocket.h:148
Int peek(pUChar dest, Int len)
Rtrieves the specified number of bytes from the receive buffer without updating the read position...
Definition: esocket.h:819
UDP & setLocal(cpStr addr, UShort port)
Assigns the socket address for this socket.
Definition: esocket.h:1308
SocketState getState()
Retrieves the connection state.
Definition: esocket.h:852
Family getFamily() const
Retrieves the address family for this address.
Definition: esocket.h:272
Int getBacklog()
Retrieves the maximum number of "unaccepted" connections.
Definition: esocket.h:1129
The namespace for all socket related classes.
Definition: esocket.h:44
virtual ~UDP()
Class destructor.
Definition: esocket.h:1279
Dword getLastOsError()
Returns the current value of m_dwError.
Definition: eerror.h:303
Int bytesPending()
Retrieves the number of bytes in the receive buffer.
Definition: esocket.h:810
virtual Void onQuit()
Called in the context of the thread when the EM_QUIT event is processed.
Definition: etevent.h:1194
Void bind(const Address &addr)
Binds this socket to a local address.
Definition: esocket.h:1379
Talker< EThreadQueuePublic< EThreadMessage >, EThreadMessage > TalkerPublic
Definition: esocket.h:2193
virtual ~Listener()
Class destructor.
Definition: esocket.h:1093
Address & setAddress(cpStr addr, UShort port)
Assigns the socket address.
Definition: esocket.h:307
Void listen(UShort port, Int backlog)
Starts listening for incoming connections.
Definition: esocket.h:1144
Family
Defines the possible address family values.
Definition: esocket.h:68
Address & getRemote()
Retrieves the remote socket address.
Definition: esocket.h:727
Void connect()
Initiates an IP connection with to the previously assigned remote socket address. ...
Definition: esocket.h:761