18 #ifndef __esocket_h_included 19 #define __esocket_h_included 23 #include <unordered_map> 26 #include <sys/types.h> 27 #include <sys/socket.h> 28 #include <netinet/in.h> 29 #include <arpa/inet.h> 49 const Int UPD_MAX_MSG_LENGTH = 65507;
51 const Int EPC_INVALID_SOCKET = -1;
52 const Int EPC_SOCKET_ERROR = -1;
53 typedef Int EPC_SOCKET;
54 typedef void *PSOCKETOPT;
55 typedef void *PSNDRCVBUFFER;
56 typedef socklen_t EPC_SOCKLEN;
60 template<
class TQueue,
class TMessage>
class Talker;
61 template<
class TQueue,
class TMessage>
class Listener;
63 template<
class TQueue,
class TMessage>
class UDP;
64 template<
class TQueue,
class TMessage>
class Thread;
156 Address(cpStr addr, UShort port) { setAddress(addr,port); }
160 Address(
const struct in_addr &addr, UShort port) { setAddress(addr,port); }
164 Address(
const struct in6_addr &addr, UShort port) { setAddress(addr,port); }
167 Address(
struct sockaddr_in &addr) { memcpy(&m_addr, &addr,
sizeof(addr)); }
170 Address(
struct sockaddr_in6 &addr) { memcpy(&m_addr, &addr,
sizeof(addr)); }
179 Char buf[INET6_ADDRSTRLEN];
181 if (m_addr.ss_family == AF_INET)
183 if (!inet_ntop(m_addr.ss_family,&((
struct sockaddr_in*)&m_addr)->sin_addr.s_addr,buf,
sizeof(buf)))
184 throw AddressError_ConvertingToString();
188 if (!inet_ntop(m_addr.ss_family,&((
struct sockaddr_in6*)&m_addr)->sin6_addr.s6_addr,buf,
sizeof(buf)))
189 throw AddressError_ConvertingToString();
196 operator UShort()
const 198 if (m_addr.ss_family == AF_INET)
199 return ntohs(((
struct sockaddr_in*)&m_addr)->sin_port);
200 if (m_addr.ss_family == AF_INET6)
201 return ntohs(((
struct sockaddr_in6*)&m_addr)->sin6_port);
202 throw AddressError_UndefinedFamily();
223 return (
struct sockaddr *)&m_addr;
230 if (m_addr.ss_family == AF_INET)
231 return sizeof(
struct sockaddr_in);
232 if (m_addr.ss_family == AF_INET6)
233 return sizeof(
struct sockaddr_in6);
234 return sizeof(
struct sockaddr_storage);
242 memcpy(&m_addr, &addr,
sizeof(m_addr));
251 return setAddress(addr);
259 return setAddress(addr);
267 return setAddress(port);
289 if (m_addr.ss_family != AF_INET)
290 throw AddressError_CannotConvertInet62Inet();
291 return (
struct sockaddr_in &)m_addr;
298 if (m_addr.ss_family != AF_INET6)
299 throw AddressError_CannotConvertInet2Inet6();
300 return (
struct sockaddr_in6 &)m_addr;
310 if (inet_pton(AF_INET,addr,&((
struct sockaddr_in*)&m_addr)->sin_addr) == 1)
312 ((
struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
313 ((
struct sockaddr_in*)&m_addr)->sin_port = htons(port);
318 if (inet_pton(AF_INET6,addr,&((
struct sockaddr_in6*)&m_addr)->sin6_addr) == 1)
320 ((
struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
321 ((
struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
322 ((
struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
323 ((
struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
327 throw AddressError_UnknownAddressType();
337 std::memcpy(&((
struct sockaddr_in*)&m_addr)->sin_addr, &addr,
sizeof(((
struct sockaddr_in*)&m_addr)->sin_addr));
338 ((
struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
339 ((
struct sockaddr_in*)&m_addr)->sin_port = htons(port);
350 std::memcpy(&((
struct sockaddr_in6*)&m_addr)->sin6_addr, &addr,
sizeof(((
struct sockaddr_in6*)&m_addr)->sin6_addr));
351 ((
struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
352 ((
struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
353 ((
struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
354 ((
struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
368 ((
struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
369 ((
struct sockaddr_in*)&m_addr)->sin_port = htons(port);
370 ((
struct sockaddr_in*)&m_addr)->sin_addr.s_addr = INADDR_ANY;
375 ((
struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
376 ((
struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
377 ((
struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
378 ((
struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
379 ((
struct sockaddr_in6*)&m_addr)->sin6_addr = in6addr_any;
384 throw AddressError_UndefinedFamily();
393 ((
struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
394 ((
struct sockaddr_in*)&m_addr)->sin_port = addr.sin_port;
395 ((
struct sockaddr_in*)&m_addr)->sin_addr.s_addr = addr.sin_addr.s_addr;
402 ((
struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
403 ((
struct sockaddr_in6*)&m_addr)->sin6_port = addr.sin6_port;
404 ((
struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
405 ((
struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
406 ((
struct sockaddr_in6*)&m_addr)->sin6_addr = addr.sin6_addr;
414 memset( &m_addr, 0,
sizeof(m_addr) );
425 if (inet_pton(AF_INET,addr,&ipv4) == 1)
427 if (inet_pton(AF_INET6,addr,&ipv6) == 1)
433 struct sockaddr_storage m_addr;
440 template <
class TQueue,
class TMessage>
445 friend class UDP<TQueue,TMessage>;
499 static Char desc[256];
500 return strerror_r(m_error, desc,
sizeof(desc));
513 getThread().unregisterSocket(
this);
514 if (m_handle != EPC_INVALID_SOCKET)
517 m_handle = EPC_INVALID_SOCKET;
538 default:
return "UNDEFINED";
545 : m_thread( thread ),
546 m_socktype( socktype ),
549 m_protocol( protocol ),
551 m_handle( EPC_INVALID_SOCKET )
555 Void createSocket(Int family, Int type, Int protocol)
557 m_handle = socket(family, type, protocol);
558 if (m_handle == EPC_INVALID_SOCKET)
559 throw BaseError_UnableToCreateSocket();
563 m_protocol = protocol;
568 Void assignAddress(cpStr ipaddr, UShort port, Int family, Int socktype,
569 Int flags, Int protocol,
struct addrinfo **address);
576 Void setError(Int error)
581 Void setHandle(Int handle)
588 Base &setFamily(Int family)
600 if (getsockname(m_handle, addr.
getSockAddr(), &sockaddrlen) < 0)
601 throw BaseError_GetPeerNameError();
612 if (getpeername(m_handle, addr.
getSockAddr(), &sockaddrlen) < 0)
613 throw BaseError_GetPeerNameError();
618 virtual Void onReceive()
622 virtual Void onConnect()
626 virtual Void onClose()
630 virtual Void onError()
641 setsockopt(m_handle, SOL_SOCKET, SO_LINGER, (PSOCKETOPT)&l,
sizeof(l));
643 fcntl(m_handle, F_SETFL, O_NONBLOCK);
645 getThread().registerSocket(
this);
669 template <
class TQueue,
class TMessage>
714 m_local.setAddress(addr,port);
749 m_remote.setAddress(addr,port);
764 throw TcpTalkerError_InvalidRemoteAddress();
766 Int family = getRemote().getFamily() ==
Family::INET ? AF_INET : AF_INET6;
767 Int type = this->getType();
768 Int protocol = this->getProtocol();
770 this->createSocket( family, type, protocol );
772 int result = ::connect(this->getHandle(), getRemote().getSockAddr(), getRemote().getSockAddrLen());
779 else if (result == -1)
782 if (this->getError() != EINPROGRESS && this->getError() != EWOULDBLOCK)
783 throw TcpTalkerError_UnableToConnect();
787 this->getThread().bump();
805 m_remote.setAddress( addr, port );
812 return m_rbuf.used();
821 return m_rbuf.peekData(dest, 0, len);
829 return m_rbuf.readData(dest, 0, len);
838 m_wbuf.writeData((cpUChar)&len, 0,
sizeof(len),
True);
839 m_wbuf.writeData(src, 0, len,
True);
911 Int totalReceived = 0;
915 Int amtReceived = ::recv(this->getHandle(), (PSNDRCVBUFFER)buf,
sizeof(buf), 0);
918 m_rbuf.writeData(buf, 0, amtReceived);
919 totalReceived += amtReceived;
921 else if (amtReceived == 0)
929 if (this->getError() == EWOULDBLOCK)
931 throw TcpTalkerError_UnableToRecvData();
935 return totalReceived;
938 Void send(Bool
override =
False)
946 if (!
override && m_sending)
949 if (m_wbuf.isEmpty())
958 throw TcpTalkerError_InvalidSendState(getStateDescription());
964 if (m_wbuf.isEmpty())
970 Int packetLength = 0;
971 Int amtRead = m_wbuf.peekData((pUChar)&packetLength, 0,
sizeof(packetLength));
972 if (amtRead !=
sizeof(packetLength))
975 msg.
format(
"expected %d bytes, read %d bytes",
sizeof(packetLength), amtRead);
976 throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
980 while (sentLength < packetLength)
982 Int sendLength = packetLength - sentLength;
983 if (sendLength > (Int)
sizeof(buf))
984 sendLength =
sizeof(buf);
987 amtRead = m_wbuf.peekData((pUChar)buf,
sizeof(packetLength) + sentLength, sendLength);
988 if (amtRead != sendLength)
991 msg.
format(
"expected %d bytes, read %d bytes", sendLength, amtRead);
992 throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
996 Int amtWritten = send(buf, sendLength);
997 if (amtWritten == -1)
1000 sentLength += amtWritten;
1001 if (amtWritten != sendLength)
1005 packetLength -= sentLength;
1006 m_wbuf.readData(NULL, 0, sentLength + (!packetLength ?
sizeof(packetLength) : 0));
1007 if (packetLength > 0)
1012 m_wbuf.modifyData((pUChar)&packetLength, 0, (Int)
sizeof(packetLength));
1020 Int send(pUChar pData, Int length)
1022 Int result = ::send(this->getHandle(), (PSNDRCVBUFFER)pData, length, MSG_NOSIGNAL);
1027 if (this->getError() != EWOULDBLOCK)
1028 throw TcpTalkerError_SendingPacket();
1048 template <
class TQueue,
class TMessage>
1060 SOCK_STREAM, IPPROTO_TCP),
1072 SOCK_STREAM, IPPROTO_TCP),
1086 SOCK_STREAM, IPPROTO_TCP),
1088 m_backlog( backlog )
1113 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1125 m_backlog = backlog;
1137 if (::listen(this->getHandle(), getBacklog()) == EPC_SOCKET_ERROR)
1138 throw TcpListenerError_UnableToListen();
1147 setBacklog(backlog);
1158 return createSocket(this->getThread());
1174 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1178 int result = ::bind(this->getHandle(), getLocalAddress().getSockAddr(), getLocalAddress().getSockAddrLen());
1181 TcpListenerError_UnableToBindSocket err;
1203 template <
class TQueue,
class TMessage>
1213 :
Base<TQueue,TMessage>(thread,
SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1220 m_rcvmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1221 m_sndmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1228 :
Base<TQueue,TMessage>(thread,
SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1236 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1238 m_rcvmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1239 m_sndmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1247 :
Base<TQueue,TMessage>(thread,
SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1254 m_local.setAddress( ipaddr, port );
1255 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1257 m_rcvmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1258 m_sndmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1265 :
Base<TQueue,TMessage>(thread,
SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1273 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1275 m_rcvmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1276 m_sndmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1282 delete []
reinterpret_cast<pUChar
>(m_rcvmsg);
1284 delete []
reinterpret_cast<pUChar
>(m_sndmsg);
1310 m_local.setAddress(addr,port);
1327 write(
Address(), to, src, len);
1337 msg.total_length =
sizeof(msg) + len;
1338 msg.data_length = len;
1344 m_wbuf.writeData(reinterpret_cast<cpUChar>(&msg), 0,
sizeof(msg),
True);
1345 m_wbuf.writeData(src, 0, len,
True);
1360 if (this->getHandle() != EPC_INVALID_SOCKET)
1361 throw UdpError_AlreadyBound();
1363 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1369 Void
bind(cpStr ipaddr, UShort port)
1371 if (this->getHandle() != EPC_INVALID_SOCKET)
1372 throw UdpError_AlreadyBound();
1373 m_local.setAddress( ipaddr, port );
1374 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1381 if (this->getHandle() != EPC_INVALID_SOCKET)
1382 throw UdpError_AlreadyBound();
1384 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1417 struct cmsghdr header;
1418 struct in_pktinfo pktinfo;
1419 struct in6_pktinfo pktinfo6;
1422 Int totalReceived = 0;
1427 UChar cmsgdata[CMSG_SPACE(
sizeof(ControlData))];
1428 struct iovec iov[1];
1431 std::memset(&mh, 0,
sizeof(mh));
1432 mh.msg_control = cmsgdata;
1433 mh.msg_controllen =
sizeof(cmsgdata);
1436 iov[0].iov_base = m_rcvmsg->data;
1437 iov[0].iov_len = UPD_MAX_MSG_LENGTH;
1439 mh.msg_namelen =
sizeof(remote.
getStorage());
1445 Int amtReceived = ::recvmsg(this->getHandle(), &mh, flags);
1446 if (amtReceived >= 0)
1448 m_rcvmsg->total_length =
sizeof(UDPMessage) + amtReceived;
1449 m_rcvmsg->data_length = amtReceived;
1450 m_rcvmsg->local = getLocal();
1451 m_rcvmsg->remote = remote;
1453 for (
struct cmsghdr *cp = CMSG_FIRSTHDR(&mh); cp != NULL; cp = CMSG_NXTHDR(&mh,cp))
1455 if (cp->cmsg_level == IPPROTO_IP && cp->cmsg_type == IP_PKTINFO)
1457 struct in_pktinfo *p = (
struct in_pktinfo *)CMSG_DATA(cp);
1459 std::memset(&ipv4, 0,
sizeof(ipv4));
1460 ipv4.sin_family = AF_INET;
1461 ipv4.sin_port = ((
struct sockaddr_in&)getLocal().getStorage()).sin_port;
1462 ipv4.sin_addr.s_addr = p->ipi_spec_dst.s_addr;
1463 m_rcvmsg->local = ipv4;
1466 if (cp->cmsg_level == IPPROTO_IPV6 && cp->cmsg_type == IPV6_PKTINFO)
1468 struct in6_pktinfo *p = (
struct in6_pktinfo *)CMSG_DATA(cp);
1470 std::memset(&ipv6, 0,
sizeof(ipv6));
1471 ipv6.sin6_family = AF_INET6;
1472 ipv6.sin6_port = ((
struct sockaddr_in6&)getLocal().getStorage()).sin6_port;
1473 ipv6.sin6_flowinfo = 0;
1474 ipv6.sin6_scope_id = 0;
1475 ipv6.sin6_addr = p->ipi6_addr;
1476 m_rcvmsg->local = ipv6;
1481 m_rbuf.writeData( reinterpret_cast<pUChar>(m_rcvmsg), 0, m_rcvmsg->total_length);
1482 totalReceived += amtReceived;
1487 if (this->getError() == EWOULDBLOCK)
1489 throw UdpError_UnableToRecvData();
1493 return totalReceived;
1496 Void send(Bool
override =
False)
1502 if (!
override && m_sending)
1505 if (m_wbuf.isEmpty())
1514 if (m_wbuf.isEmpty())
1520 size_t packetLength = 0;
1521 Int amtRead = m_wbuf.peekData(reinterpret_cast<pUChar>(&packetLength), 0,
sizeof(packetLength));
1522 if ((
size_t)amtRead !=
sizeof(packetLength))
1525 msg.
format(
"expected %d bytes, read %d bytes",
sizeof(packetLength), amtRead);
1526 throw UdpError_ReadingWritePacketLength(msg.c_str());
1529 amtRead = m_wbuf.peekData(reinterpret_cast<pUChar>(m_sndmsg), 0, packetLength);
1530 if ((
size_t)amtRead != packetLength)
1533 msg.
format(
"expected %d bytes, read %d bytes",
sizeof(packetLength), amtRead);
1534 throw UdpError_ReadingWritePacketLength(msg.c_str());
1537 if (send(m_sndmsg->local, m_sndmsg->remote, m_sndmsg->data, m_sndmsg->data_length) == -1)
1543 m_wbuf.readData(NULL, 0, m_sndmsg->total_length);
1549 #pragma pack(push,1) 1552 size_t total_length;
1570 while (readMessage(*m_rcvmsg))
1572 onReceive(m_rcvmsg->remote, m_rcvmsg->local, m_rcvmsg->data, m_rcvmsg->data_length);
1578 if (this->getHandle() != EPC_INVALID_SOCKET)
1579 throw UdpError_AlreadyBound();
1585 result = setsockopt(this->getHandle(), IPPROTO_IP, IP_PKTINFO, &sockopt,
sizeof(sockopt));
1588 UdpError_UnableToBindSocket err;
1589 err.appendTextf(
" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1595 result = setsockopt(this->getHandle(), IPPROTO_IPV6, IPV6_RECVPKTINFO, &sockopt,
sizeof(sockopt));
1598 UdpError_UnableToBindSocket err;
1599 err.appendTextf(
" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1605 result = ::bind(this->getHandle(), getLocal().getSockAddr(), getLocal().getSockAddrLen());
1608 UdpError_UnableToBindSocket err;
1609 err.appendTextf(
" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1615 Bool readMessage(UDPMessage &msg)
1617 if (m_rbuf.peekData(reinterpret_cast<pUChar>(&msg), 0,
sizeof(msg)))
1619 m_rbuf.readData(reinterpret_cast<pUChar>(&msg), 0, msg.total_length);
1628 Int flags = MSG_NOSIGNAL;
1634 if (this->getError() != EMSGSIZE)
1635 throw UdpError_SendingPacket();
1647 UDPMessage *m_rcvmsg;
1648 UDPMessage *m_sndmsg;
1655 template <
class TQueue,
class TMessage>
1660 friend class UDP<TQueue,TMessage>;
1666 int *pipefd = this->getBumpPipe();
1670 int result = pipe(pipefd);
1672 throw ThreadError_UnableToOpenPipe();
1673 fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
1677 getMaxFileDescriptor(
True);
1682 int *pipefd = this->getBumpPipe();
1690 m_socketmap.insert(std::make_pair(socket->
getHandle(), socket));
1692 getMaxFileDescriptor(
True);
1699 if (m_socketmap.erase(socket->
getHandle()))
1702 getMaxFileDescriptor(
True);
1711 virtual Void pumpMessages()
1713 int maxfd, fd, fdcnt;
1714 fd_set readworking, writeworking, errorworking;
1721 memcpy(&readworking, &m_master,
sizeof(m_master));
1722 FD_SET(this->getBumpPipe()[0], &readworking);
1724 FD_ZERO(&writeworking);
1725 for (
auto it = m_socketmap.begin(); it != m_socketmap.end(); it++)
1733 FD_SET(it->first, &writeworking);
1737 memcpy(&errorworking, &m_master,
sizeof(m_master));
1739 maxfd = getMaxFileDescriptor() + 1;
1742 fdcnt = select(maxfd, &readworking, &writeworking, &errorworking, NULL);
1745 if (errno == EINTR || errno == 514 )
1747 if (!pumpMessagesInternal())
1760 if (FD_ISSET(this->getBumpPipe()[0], &readworking))
1763 if (!pumpMessagesInternal())
1770 for (fd = 0; fd < maxfd && fdcnt > 0; fd++)
1772 if (FD_ISSET(fd, &errorworking))
1774 auto socket_it = m_socketmap.find(fd);
1775 if (socket_it != m_socketmap.end())
1781 socklen_t optlen =
sizeof(error);
1782 getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
1783 pSocket->setError(error);
1784 processSelectError(pSocket);
1792 if (fdcnt > 0 && FD_ISSET(fd, &readworking))
1794 auto socket_it = m_socketmap.find(fd);
1795 if (socket_it != m_socketmap.end())
1799 result = processSelectRead(pSocket);
1804 if (fdcnt > 0 && FD_ISSET(fd, &writeworking))
1806 auto socket_it = m_socketmap.find(fd);
1807 if (result && socket_it != m_socketmap.end())
1811 processSelectWrite(pSocket);
1821 if (!pumpMessagesInternal())
1829 auto it = m_socketmap.begin();
1830 if (it == m_socketmap.end())
1833 m_socketmap.erase(it);
1846 virtual Void onInit()
1851 virtual Void onQuit()
1856 virtual Void onMessageQueued(
const TMessage &msg)
1862 virtual Void onError()
1868 if (write(this->getBumpPipe()[1],
"~", 1) == -1)
1869 throw ThreadError_UnableToWritePipe();
1877 if (read(this->getBumpPipe()[0], buf, 1) == -1)
1879 if (errno == EWOULDBLOCK)
1881 throw ThreadError_UnableToReadPipe();
1888 return GetThisMessageMap();
1904 Void setError(Int error) { m_error = error; }
1906 Bool pumpMessagesInternal()
1927 return msg.getMessageId() !=
EM_QUIT;
1939 struct sockaddr ipaddr;
1940 socklen_t ipaddrlen =
sizeof(ipaddr);
1943 if (handle == EPC_INVALID_SOCKET)
1946 if (err == EWOULDBLOCK)
1948 throw TcpListenerError_UnableToAcceptSocket();
1954 pnewsocket->setHandle(handle);
1955 pnewsocket->setAddresses();
1957 registerSocket(pnewsocket);
1971 errorHandler(err, NULL);
1989 processSelectAccept(psocket);
1997 Int result, socketError;
1998 socklen_t socketErrorLen =
sizeof(socketError);
2000 result = getsockopt(psocket->
getHandle(), SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen);
2004 if (result == -1 || socketError != 0)
2006 psocket->setError(socketError);
2007 TcpTalkerError_UnableToConnect ex;
2008 ex.appendTextf(
" ESocket::Thread<TQueue,TMessage>::processSelectRead() socketError=%d (%s)",
2020 processSelectError(psocket);
2038 errorHandler(err, psocket);
2045 processSelectClose(psocket);
2054 processSelectError(psocket);
2072 errorHandler(err, psocket);
2090 Int result, socketError;
2091 socklen_t socketErrorLen =
sizeof(socketError);
2093 result = getsockopt(psocket->
getHandle(), SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen);
2097 if (result == -1 || socketError != 0)
2099 psocket->setError(socketError);
2100 TcpTalkerError_UnableToConnect ex;
2101 ex.appendTextf(
" ESocket::Thread<TQueue,TMessage>::processSelectWrite() socketError=%d (%s)",
2113 processSelectError(psocket);
2127 processSelectError(psocket);
2136 processSelectError(psocket);
2150 errorHandler(err, psocket);
2158 onSocketError(psocket);
2164 onSocketClosed(psocket);
2167 int getMaxFileDescriptor(Bool calc=
False)
2171 m_maxfd = this->getBumpPipe()[0];
2173 for (
auto entry : m_socketmap)
2174 if (entry.second->getHandle() > m_maxfd)
2175 m_maxfd = entry.second->getHandle();
2182 std::unordered_map<Int,Base<TQueue,TMessage>*> m_socketmap;
2209 size_t addrhash =
EMurmurHash64::getHash(reinterpret_cast<cpUChar>(addr.getSockAddr()), addr.getSockAddrLen());
2210 size_t porthash = std::hash<UShort>{}(addr.getPort());
2216 #endif // #define __esocket_h_included Address(const Address &addr)
Copy constructor.
Definition: esocket.h:173
Performs static initialization associated with any EpcTools class that requires it. Initialization and uninitialization is performed by EpcTools::Initialize() and EpcTools::UnInitialize().
socklen_t getSockAddrLen() const
retrieves the length of the current socket address.
Definition: esocket.h:228
virtual ~Thread()
Class destructor.
Definition: esocket.h:1680
EString getAddress() const
Retrieves the printable IP address.
Definition: esocket.h:207
const struct sockaddr_storage & getSockAddrStorage() const
Retrieves a sockaddr pointer to the socket address.
Definition: esocket.h:214
Encapsulates and extends a std::string object.
#define True
True.
Definition: ebase.h:25
UShort getPort() const
Retrievs the port.
Definition: esocket.h:210
Void setBacklog(Int backlog)
Assigns the maximum number of "unaccepted" connections.
Definition: esocket.h:1123
Implements a circular buffer.
virtual Void onReceive(const Address &from, const Address &to, cpUChar msg, Int len)
Called for each message that is received.
Definition: esocket.h:1398
Address & getLocal()
Retrieves the local socket address.
Definition: esocket.h:692
const struct sockaddr_in6 & getInet6() const
Retrieves a reference to this address as an IPv6 address.
Definition: esocket.h:296
#define DECLARE_ERROR_ADVANCED4(__e__)
Declares exception class derived from EError with an const char* as a constructor parameter and devel...
Definition: eerror.h:85
Macros for various standard C library functions and standard includes.
Int getHandle()
Retrieves the socket file handle.
Definition: esocket.h:523
Address & operator=(const sockaddr_in &addr)
Assignment operator.
Definition: esocket.h:249
Int getType()
Retrieves the socket type.
Definition: esocket.h:478
Talker< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > TalkerPrivate
Definition: esocket.h:2194
Thread< EThreadQueuePublic< EThreadMessage >, EThreadMessage > ThreadPublic
Definition: esocket.h:2189
Listener(Thread< TQueue, TMessage > &thread, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1057
Void setPort(UShort port)
Assigns the port to listen for incoming connections on.
Definition: esocket.h:1110
EString getLocalAddress() const
Retrieves the IP address associated with the local socket.
Definition: esocket.h:698
Listener< EThreadQueuePublic< EThreadMessage >, EThreadMessage > ListenerPublic
Definition: esocket.h:2195
base class for EThreadPrivate and EThreadPublic
Definition: etevent.h:1062
cpStr getErrorDescription()
Definition: esocket.h:497
Listens for incoming TCP/IP connections.
Definition: esocket.h:1049
SocketState
The socket connection state.
Definition: esocket.h:92
EString getLocalAddress()
Retrieves the IP address for this socket.
Definition: esocket.h:1294
Talker(Thread< TQueue, TMessage > &thread, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:678
static Family getFamily(cpStr addr)
Determines the address family.
Definition: esocket.h:420
Address & operator=(UShort port)
Assigns a port value (allowing IPADDR_ANY).
Definition: esocket.h:265
UDP(Thread< TQueue, TMessage > &thread, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1212
Talker & setRemote(cpStr addr, UShort port)
Assigns the remote socket address.
Definition: esocket.h:747
Listener(Thread< TQueue, TMessage > &thread, UShort port, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1069
Int getError()
Called when an error is detected.
Definition: esocket.h:1707
Void disconnect()
Disconnects this socket.
Definition: esocket.h:863
SocketType
Defines the possible socket types.
Definition: esocket.h:79
virtual Void onMessageQueued(const TMessage &msg)
Called when an event message is queued.
Definition: etevent.h:1254
Address & setAddress(const struct in_addr &addr, UShort port)
Assigns the IPv4 socket address.
Definition: esocket.h:334
virtual Void onClose()
Called when this socket is closed.
Definition: esocket.h:1161
Thread< TQueue, TMessage > & getThread()
Retrieves the socket thread that this socket is associated with.
Definition: esocket.h:457
Bool getSending()
Retrieves indication if this socket is in the process of sending data.
Definition: esocket.h:846
Thread< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > ThreadPrivate
Definition: esocket.h:2190
A TCP socket class capabile of sending and receiving data.
Definition: esocket.h:670
Int getFamily()
Retrieves the address family.
Definition: esocket.h:471
Address & operator=(const Address &addr)
Assignment operator.
Definition: esocket.h:240
Void bind(UShort port)
Binds this socket to a local port and IPADDR_ANY.
Definition: esocket.h:1358
const struct sockaddr_in & getInet() const
Retrieves a reference to this address as an IPv4 address.
Definition: esocket.h:287
Void unregisterSocket(Base< TQueue, TMessage > *socket)
Called by the framework to unregister a Base derived socket object with this thread.
Definition: esocket.h:1697
Base< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > BasePrivate
Definition: esocket.h:2188
Listener(Thread< TQueue, TMessage > &thread, UShort port, Int backlog, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1083
cpStr getStateDescription(SocketState state)
Retrieves the description of the connection state.
Definition: esocket.h:530
Void listen()
Starts listening for incoming connections.
Definition: esocket.h:1134
#define False
False.
Definition: ebase.h:27
Void registerSocket(Base< TQueue, TMessage > *socket)
Called by the framework to register a Base derived socket object with this thread.
Definition: esocket.h:1688
Address(struct sockaddr_in &addr)
Class constructor.
Definition: esocket.h:167
SocketState getState()
Retrieves the current socket state.
Definition: esocket.h:1098
Address & setAddress(const struct in6_addr &addr, UShort port)
Assigns the IPv6 socket address.
Definition: esocket.h:347
Address(cpStr addr, UShort port)
Class constructor.
Definition: esocket.h:156
UDP< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > UdpPrivate
Definition: esocket.h:2199
Void disconnect()
Disconnects the socket.
Definition: esocket.h:1388
UShort getRemotePort() const
Retrieves the port associated with the remote socket.
Definition: esocket.h:739
Void connect(cpStr addr, UShort port)
Initiates an IP connection.
Definition: esocket.h:803
Void close()
Closes this socket.
Definition: esocket.h:504
Bool getSending()
Retrieves indication if this socket is in the process of sending data.
Definition: esocket.h:1352
virtual Void onError()
Called when an error is detected on this socket.
Definition: esocket.h:1167
SocketType getSocketType()
Retrieves the socket type.
Definition: esocket.h:464
Void connect(Address &addr)
Initiates an IP connection.
Definition: esocket.h:795
virtual Void onConnect()
Called when a connection has been established.
Definition: esocket.h:874
cpStr getStateDescription()
Retrieves the description of the current connection state.
Definition: esocket.h:858
virtual Void disconnect()
Disconnects this socket.
Definition: esocket.h:511
Thread()
Default constructor.
Definition: esocket.h:1664
Talker & setLocal(cpStr addr, UShort port)
Assigns the local socket address.
Definition: esocket.h:712
const Address & getLocal()
Retrieves the local address for this socket.
Definition: esocket.h:1288
Address(const struct in_addr &addr, UShort port)
Class constructor.
Definition: esocket.h:160
UDP(Thread< TQueue, TMessage > &thread, Address &addr, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1264
Talker & setLocal(const Address &addr)
Assigns the local socket address.
Definition: esocket.h:720
Defines base class for exceptions and declaration helper macros.
A UDP socket class capabile of sending and receiving data.
Definition: esocket.h:1204
Address & operator=(const sockaddr_in6 &addr)
Assignment operator.
Definition: esocket.h:257
An event message that is to be sent to a thread.
Definition: etevent.h:264
The socket thread base class. An event based thread class capable of surfacing socket events...
Definition: esocket.h:1656
virtual Void onError()
Called when an error is detected on this socket.
Definition: esocket.h:1402
Listener< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > ListenerPrivate
Definition: esocket.h:2196
Address & getLocalAddress()
Retrieves the local listening address.
Definition: esocket.h:1104
Base< EThreadQueuePublic< EThreadMessage >, EThreadMessage > BasePublic
Definition: esocket.h:2187
Int read(pUChar dest, Int len)
Rtrieves the specified number of bytes from the receive buffer.
Definition: esocket.h:827
The base class for exceptions derived from std::exception.
Definition: eerror.h:94
Hash calculation functions for strings and byte arrays.
struct sockaddr * getSockAddr() const
Retrieves a sockaddr_storage reference to the socket address.
Definition: esocket.h:221
Talker< TQueue, TMessage > * createSocket()
Called to create a talking socket when a incoming connection is received.
Definition: esocket.h:1156
#define EM_QUIT
thread quit event
Definition: etevent.h:796
Void bind(cpStr ipaddr, UShort port)
Binds this socket to a local address.
Definition: esocket.h:1369
Address(struct sockaddr_in6 &addr)
Class constructor.
Definition: esocket.h:170
Int getProtocol()
Retrieves the protocol.
Definition: esocket.h:485
Implements a circular buffer.
Definition: ecbuf.h:45
std::size_t operator()(const ESocket::Address &addr) const noexcept
Definition: esocket.h:2207
UDP(Thread< TQueue, TMessage > &thread, cpStr ipaddr, UShort port, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1246
static size_t combine(size_t h1, size_t h2)
Combines 2 64-bit hash values.
Definition: ehash.h:300
The base socket class.
Definition: esocket.h:441
Address(const struct in6_addr &addr, UShort port)
Class constructor.
Definition: esocket.h:164
Void write(cpUChar src, Int len)
Writes data to the socket. This is a thread safe method.
Definition: esocket.h:834
virtual Void onError()
Called when an error is detected on the socket.
Definition: esocket.h:883
virtual ~Base()
Virtual class destructor.
Definition: esocket.h:450
Void write(const Address &from, const Address &to, cpUChar src, Int len)
Sends data to the specified recipient address.
Definition: esocket.h:1334
Acquires and holds a lock on the specified mutex.
Definition: esynch.h:133
Int getError()
Retrieves the last error value.
Definition: esocket.h:492
Bool acquire(Bool wait=True)
Manually acquires a lock on the mutex.
Definition: esynch.h:155
Address()
Default constructor.
Definition: esocket.h:152
EString getRemoteAddress() const
Retrieves the IP address associated with the remote socket.
Definition: esocket.h:733
virtual ~Talker()
Class destrucor.
Definition: esocket.h:687
virtual Void onClose()
Called when the socket has been closed.
Definition: esocket.h:878
virtual Void onInit()
Called in the context of the thread prior to processing teh first event message.
Definition: etevent.h:1190
Void write(const Address &to, cpUChar src, Int len)
Sends data to the specified recipient address.
Definition: esocket.h:1325
UShort getLocalPort() const
Retrieves the port associated with the local socket.
Definition: esocket.h:704
UShort getPort()
Retrieves the port being listened on for incoming connections.
Definition: esocket.h:1117
UDP & setLocal(const Address &addr)
Assigns the socket address for this socket.
Definition: esocket.h:1316
UDP(Thread< TQueue, TMessage > &thread, UShort port, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1227
Address & setAddress(const sockaddr_in6 &addr)
Definition: esocket.h:399
Address & setAddress(const sockaddr_in &addr)
Definition: esocket.h:390
virtual Void onReceive()
Called when data has been received.
Definition: esocket.h:870
A private mutex (the mutex data is allocated from either the heap or stack).
Definition: esynch.h:175
const struct sockaddr_storage & getStorage() const
Retrieves the sockaddr_storage.
Definition: esocket.h:280
static size_t getHash(cChar val, size_t seed=0xc70f6907UL)
Calculates a 64-bit murmur hash for the value.
Definition: ehash.h:273
Talker & setRemote(const Address &addr)
Assigns the remote socket address.
Definition: esocket.h:755
EString & format(cpChar pszFormat,...)
Sets the value to the string using a "printf" style format string and arguments.
Definition: estring.cpp:38
UDP< EThreadQueuePublic< EThreadMessage >, EThreadMessage > UdpPublic
Definition: esocket.h:2198
UShort getLocalPort()
Retrieves the port for this socket.
Definition: esocket.h:1300
Address & setAddress(UShort port, Family fam=Family::INET6)
Assigns the socket address.
Definition: esocket.h:362
#define DECLARE_ERROR_ADVANCED(__e__)
Declares exception class derived from EError with no constructor parameters and developer defined con...
Definition: eerror.h:61
Address & clear()
Clears this address.
Definition: esocket.h:412
String class.
Definition: estring.h:31
Encapsulates a sockaddr_storage structure that represents a socket address.
Definition: esocket.h:148
Int peek(pUChar dest, Int len)
Rtrieves the specified number of bytes from the receive buffer without updating the read position...
Definition: esocket.h:819
UDP & setLocal(cpStr addr, UShort port)
Assigns the socket address for this socket.
Definition: esocket.h:1308
SocketState getState()
Retrieves the connection state.
Definition: esocket.h:852
Family getFamily() const
Retrieves the address family for this address.
Definition: esocket.h:272
Int getBacklog()
Retrieves the maximum number of "unaccepted" connections.
Definition: esocket.h:1129
The namespace for all socket related classes.
Definition: esocket.h:44
virtual ~UDP()
Class destructor.
Definition: esocket.h:1279
Dword getLastOsError()
Returns the current value of m_dwError.
Definition: eerror.h:303
Int bytesPending()
Retrieves the number of bytes in the receive buffer.
Definition: esocket.h:810
virtual Void onQuit()
Called in the context of the thread when the EM_QUIT event is processed.
Definition: etevent.h:1194
Void bind(const Address &addr)
Binds this socket to a local address.
Definition: esocket.h:1379
Talker< EThreadQueuePublic< EThreadMessage >, EThreadMessage > TalkerPublic
Definition: esocket.h:2193
virtual ~Listener()
Class destructor.
Definition: esocket.h:1093
Address & setAddress(cpStr addr, UShort port)
Assigns the socket address.
Definition: esocket.h:307
Void listen(UShort port, Int backlog)
Starts listening for incoming connections.
Definition: esocket.h:1144
Family
Defines the possible address family values.
Definition: esocket.h:68
Address & getRemote()
Retrieves the remote socket address.
Definition: esocket.h:727
Void connect()
Initiates an IP connection with to the previously assigned remote socket address. ...
Definition: esocket.h:761