From 558b2ffa41f604493846057cd11d1d8c5f12861e Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Sun, 27 Nov 2016 19:59:52 -0800 Subject: [PATCH] NetworkStream: Add setBlocking() and getNativeHandle(). (#161) Also add checking for "would block" errors in send() and receive(). Check for set nonblocking failures in TCPConnector as well (generate warnings rather than errors) --- include/tcpsockets/NetworkStream.h | 7 ++++- include/tcpsockets/TCPStream.h | 3 ++ src/tcpsockets/TCPConnector.cpp | 30 ++++++++++++++++---- src/tcpsockets/TCPStream.cpp | 44 ++++++++++++++++++++++++++++-- 4 files changed, 74 insertions(+), 10 deletions(-) diff --git a/include/tcpsockets/NetworkStream.h b/include/tcpsockets/NetworkStream.h index f7dfe4533e..03b3a93ae6 100644 --- a/include/tcpsockets/NetworkStream.h +++ b/include/tcpsockets/NetworkStream.h @@ -22,7 +22,8 @@ class NetworkStream { enum Error { kConnectionClosed = 0, kConnectionReset = -1, - kConnectionTimedOut = -2 + kConnectionTimedOut = -2, + kWouldBlock = -3 }; virtual std::size_t send(const char* buffer, std::size_t len, Error* err) = 0; @@ -34,6 +35,10 @@ class NetworkStream { virtual int getPeerPort() const = 0; virtual void setNoDelay() = 0; + // returns false on failure + virtual bool setBlocking(bool enabled) = 0; + virtual int getNativeHandle() const = 0; + NetworkStream(const NetworkStream&) = delete; NetworkStream& operator=(const NetworkStream&) = delete; }; diff --git a/include/tcpsockets/TCPStream.h b/include/tcpsockets/TCPStream.h index e7bb2f0d1a..89e3779778 100644 --- a/include/tcpsockets/TCPStream.h +++ b/include/tcpsockets/TCPStream.h @@ -43,6 +43,7 @@ class TCPStream : public NetworkStream { int m_sd; std::string m_peerIP; int m_peerPort; + bool m_blocking; public: friend class TCPAcceptor; @@ -58,6 +59,8 @@ class TCPStream : public NetworkStream { llvm::StringRef getPeerIP() const override; int getPeerPort() const override; void setNoDelay() override; + bool setBlocking(bool enabled) override; + int getNativeHandle() const override; TCPStream(const TCPStream& stream) = delete; TCPStream& operator=(const TCPStream&) = delete; diff --git a/src/tcpsockets/TCPConnector.cpp b/src/tcpsockets/TCPConnector.cpp index 8800acd7dd..9c6d6128af 100644 --- a/src/tcpsockets/TCPConnector.cpp +++ b/src/tcpsockets/TCPConnector.cpp @@ -130,12 +130,21 @@ std::unique_ptr TCPConnector::connect(const char* server, // Set socket to non-blocking #ifdef _WIN32 u_long mode = 1; - ioctlsocket(sd, FIONBIO, &mode); + if (ioctlsocket(sd, FIONBIO, &mode) == SOCKET_ERROR) + WPI_WARNING(logger, + "could not set socket to non-blocking: " << SocketStrerror()); #else long arg; arg = fcntl(sd, F_GETFL, nullptr); - arg |= O_NONBLOCK; - fcntl(sd, F_SETFL, arg); + if (arg < 0) { + WPI_WARNING(logger, + "could not set socket to non-blocking: " << SocketStrerror()); + } else { + arg |= O_NONBLOCK; + if (fcntl(sd, F_SETFL, arg) < 0) + WPI_WARNING(logger, + "could not set socket to non-blocking: " << SocketStrerror()); + } #endif // Connect with time limit @@ -169,11 +178,20 @@ std::unique_ptr TCPConnector::connect(const char* server, // Return socket to blocking mode #ifdef _WIN32 mode = 0; - ioctlsocket(sd, FIONBIO, &mode); + if (ioctlsocket(sd, FIONBIO, &mode) == SOCKET_ERROR) + WPI_WARNING(logger, + "could not set socket to blocking: " << SocketStrerror()); #else arg = fcntl(sd, F_GETFL, nullptr); - arg &= (~O_NONBLOCK); - fcntl(sd, F_SETFL, arg); + if (arg < 0) { + WPI_WARNING(logger, + "could not set socket to blocking: " << SocketStrerror()); + } else { + arg &= (~O_NONBLOCK); + if (fcntl(sd, F_SETFL, arg) < 0) + WPI_WARNING(logger, + "could not set socket to blocking: " << SocketStrerror()); + } #endif // Create stream object if connected, close if not. diff --git a/src/tcpsockets/TCPStream.cpp b/src/tcpsockets/TCPStream.cpp index aa464d9814..e7d028e21a 100644 --- a/src/tcpsockets/TCPStream.cpp +++ b/src/tcpsockets/TCPStream.cpp @@ -23,6 +23,7 @@ #include "tcpsockets/TCPStream.h" +#include #ifdef _WIN32 #include #else @@ -33,7 +34,8 @@ using namespace wpi; -TCPStream::TCPStream(int sd, sockaddr_in* address) : m_sd(sd) { +TCPStream::TCPStream(int sd, sockaddr_in* address) + : m_sd(sd), m_blocking(true) { char ip[50]; #ifdef _WIN32 unsigned long size = sizeof(ip) - 1; @@ -69,6 +71,10 @@ std::size_t TCPStream::send(const char* buffer, std::size_t len, Error* err) { result = false; break; } + if (!m_blocking) { + *err = kWouldBlock; + return 0; + } Sleep(1); } if (!result) { @@ -90,7 +96,10 @@ std::size_t TCPStream::send(const char* buffer, std::size_t len, Error* err) { ssize_t rv = ::send(m_sd, buffer, len, 0); #endif if (rv < 0) { - *err = kConnectionReset; + if (!m_blocking && (errno == EAGAIN || errno == EWOULDBLOCK)) + *err = kWouldBlock; + else + *err = kConnectionReset; return 0; } #endif @@ -126,7 +135,14 @@ std::size_t TCPStream::receive(char* buffer, std::size_t len, Error* err, return 0; } if (rv < 0) { - *err = kConnectionReset; +#ifdef _WIN32 + if (!m_blocking && WSAGetLastError() == WSAEWOULDBLOCK) +#else + if (!m_blocking && (errno == EAGAIN || errno == EWOULDBLOCK)) +#endif + *err = kWouldBlock; + else + *err = kConnectionReset; return 0; } return static_cast(rv); @@ -150,10 +166,32 @@ llvm::StringRef TCPStream::getPeerIP() const { return m_peerIP; } int TCPStream::getPeerPort() const { return m_peerPort; } void TCPStream::setNoDelay() { + if (m_sd < 0) return; int optval = 1; setsockopt(m_sd, IPPROTO_TCP, TCP_NODELAY, (char*)&optval, sizeof optval); } +bool TCPStream::setBlocking(bool enabled) { + if (m_sd < 0) return true; // silently accept +#ifdef _WIN32 + u_long mode = enabled ? 0 : 1; + if (ioctlsocket(m_sd, FIONBIO, &mode) == SOCKET_ERROR) return false; +#else + long flags = fcntl(m_sd, F_GETFL, nullptr); + if (flags < 0) return false; + if (enabled) + flags &= ~O_NONBLOCK; + else + flags |= O_NONBLOCK; + if (fcntl(m_sd, F_SETFL, flags) < 0) return false; +#endif + return true; +} + +int TCPStream::getNativeHandle() const { + return m_sd; +} + bool TCPStream::WaitForReadEvent(int timeout) { fd_set sdset; struct timeval tv;