diff --git a/android/build.gradle b/android/build.gradle index adfee99a2..22b667e32 100644 --- a/android/build.gradle +++ b/android/build.gradle @@ -20,7 +20,7 @@ android { buildConfigField "boolean", "IS_INTERNAL_BUILD", 'true' ndk { - abiFilters 'x86', 'x86_64', 'armeabi-v7a', 'arm64-v8a' + abiFilters 'x86_64', 'armeabi-v7a', 'arm64-v8a' } externalNativeBuild { diff --git a/android/third-party/native.gradle b/android/third-party/native.gradle index 861eec5ad..7ee0a051a 100644 --- a/android/third-party/native.gradle +++ b/android/third-party/native.gradle @@ -11,7 +11,7 @@ import org.apache.tools.ant.filters.ReplaceTokens // Increment this when making changes to any of the native // dependencies. // !!! -final def CACHE_REVISION = 3 +final def CACHE_REVISION = 27 final def externalDir = new File("$projectDir/external") final def downloadsDir = new File("$externalDir/downloads") @@ -117,7 +117,7 @@ task prepareBoost(dependsOn: [downloadBoost], type: Copy) { } task downloadFolly(dependsOn: createNativeDepsDirectories, type: Download) { - src 'https://github.com/facebook/folly/archive/v2018.09.10.00.tar.gz' + src 'https://github.com/facebook/folly/archive/v2019.06.10.00.tar.gz' onlyIfNewer true overwrite false dest new File(downloadsDir, 'folly-' + getDownloadFileName(src)) @@ -127,10 +127,10 @@ task prepareFolly(dependsOn: [downloadFolly], type: Copy) { onlyIf { isCacheOutOfDate(CACHE_REVISION) } from tarTree(downloadFolly.dest) from './overrides/Folly/' - include 'folly-2018.09.10.00/folly/**/*', 'build.gradle', 'CMakeLists.txt', 'ApplicationManifest.xml' - eachFile { it.path = it.path - "folly-2018.09.10.00/" } + include 'folly-2019.06.10.00/folly/**/*', 'build.gradle', 'CMakeLists.txt', 'ApplicationManifest.xml' + eachFile { it.path = it.path - "folly-2019.06.10.00/" } includeEmptyDirs = false - into "$externalDir/folly" + into "$externalDir/folly/" } //TODO: Get rid off this hack. @@ -143,8 +143,8 @@ task finalizeFollyWithDemangle(dependsOn: [prepareFolly], type: Copy) { //TODO: Get rid off this hack. task finalizeFolly(dependsOn: [finalizeFollyWithDemangle], type: Copy) { from './overrides/Folly/' - include 'AsyncServerSocket.cpp' - into "$externalDir/folly/folly/io/async/" + include 'AsyncServerSocket.cpp' + into "$externalDir/folly/folly/io/async/" } task downloadLibEvent(dependsOn: [], type: Download) { @@ -210,7 +210,7 @@ task finalizeOpenSSL(dependsOn: [configureOpenSSL], type: Exec) { } task downloadRSocket(dependsOn: [], type: Download) { - src 'https://github.com/priteshrnandgaonkar/rsocket-cpp/archive/0.10.3.tar.gz' + src 'https://github.com/priteshrnandgaonkar/rsocket-cpp/archive/0.10.7.tar.gz' onlyIfNewer true overwrite false dest new File(downloadsDir, 'rsocket-' + getDownloadFileName(src)) @@ -219,7 +219,7 @@ task downloadRSocket(dependsOn: [], type: Download) { task prepareRSocket(dependsOn: [downloadRSocket], type: Copy) { from tarTree(downloadRSocket.dest) from './overrides/RSocket/' - include 'rsocket-cpp-0.10.3/**/*', 'build.gradle', 'ApplicationManifest.xml', 'CMakeLists.txt' + include 'rsocket-cpp-0.10.7/**/*', 'build.gradle', 'ApplicationManifest.xml', 'CMakeLists.txt' includeEmptyDirs = false into "$externalDir/RSocket" } diff --git a/android/third-party/overrides/Folly/AsyncServerSocket.cpp b/android/third-party/overrides/Folly/AsyncServerSocket.cpp index 6beba061c..75559f51b 100644 --- a/android/third-party/overrides/Folly/AsyncServerSocket.cpp +++ b/android/third-party/overrides/Folly/AsyncServerSocket.cpp @@ -15,7 +15,7 @@ */ #ifndef __STDC_FORMAT_MACROS - #define __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS #endif #include @@ -35,10 +35,16 @@ #include #include -namespace fsp = folly::portability::sockets; - namespace folly { +#ifndef TCP_SAVE_SYN +#define TCP_SAVE_SYN 27 +#endif + +#ifndef TCP_SAVED_SYN +#define TCP_SAVED_SYN 28 +#endif + static constexpr bool msgErrQueueSupported = #ifdef FOLLY_HAVE_MSG_ERRQUEUE true; @@ -50,59 +56,32 @@ const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce; const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce; const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue; -int setCloseOnExec(int fd, int value) { - // Read the current flags - int old_flags = fcntl(fd, F_GETFD, 0); - - // If reading the flags failed, return error indication now - if (old_flags < 0) { - return -1; - } - - // Set just the flag we want to set - int new_flags; - if (value != 0) { - new_flags = old_flags | FD_CLOEXEC; - } else { - new_flags = old_flags & ~FD_CLOEXEC; - } - - // Store modified flag word in the descriptor - return fcntl(fd, F_SETFD, new_flags); -} - void AsyncServerSocket::RemoteAcceptor::start( - EventBase* eventBase, uint32_t maxAtOnce, uint32_t maxInQueue) { + EventBase* eventBase, + uint32_t maxAtOnce, + uint32_t maxInQueue) { setMaxReadAtOnce(maxAtOnce); queue_.setMaxQueueSize(maxInQueue); - if (!eventBase->runInEventBaseThread([=](){ - callback_->acceptStarted(); - this->startConsuming(eventBase, &queue_); - })) { - throw std::invalid_argument("unable to start waiting on accept " - "notification queue in the specified " - "EventBase thread"); - } + eventBase->runInEventBaseThread([=]() { + callback_->acceptStarted(); + this->startConsuming(eventBase, &queue_); + }); } void AsyncServerSocket::RemoteAcceptor::stop( - EventBase* eventBase, AcceptCallback* callback) { - if (!eventBase->runInEventBaseThread([=](){ - callback->acceptStopped(); - delete this; - })) { - throw std::invalid_argument("unable to start waiting on accept " - "notification queue in the specified " - "EventBase thread"); - } + EventBase* eventBase, + AcceptCallback* callback) { + eventBase->runInEventBaseThread([=]() { + callback->acceptStopped(); + delete this; + }); } void AsyncServerSocket::RemoteAcceptor::messageAvailable( QueueMessage&& msg) noexcept { switch (msg.type) { - case MessageType::MSG_NEW_CONN: - { + case MessageType::MSG_NEW_CONN: { if (connectionEventCallback_) { connectionEventCallback_->onConnectionDequeuedByAcceptorCallback( msg.fd, msg.address); @@ -110,18 +89,16 @@ void AsyncServerSocket::RemoteAcceptor::messageAvailable( callback_->connectionAccepted(msg.fd, msg.address); break; } - case MessageType::MSG_ERROR: - { + case MessageType::MSG_ERROR: { std::runtime_error ex(msg.msg); callback_->acceptError(ex); break; } - default: - { + default: { LOG(ERROR) << "invalid accept notification message type " << int(msg.type); std::runtime_error ex( - "received invalid accept notification message type"); + "received invalid accept notification message type"); callback_->acceptError(ex); } } @@ -137,7 +114,9 @@ class AsyncServerSocket::BackoffTimeout : public AsyncTimeout { explicit BackoffTimeout(AsyncServerSocket* socket) : AsyncTimeout(socket->getEventBase()), socket_(socket) {} - void timeoutExpired() noexcept override { socket_->backoffTimeoutExpired(); } + void timeoutExpired() noexcept override { + socket_->backoffTimeoutExpired(); + } private: AsyncServerSocket* socket_; @@ -160,7 +139,9 @@ AsyncServerSocket::AsyncServerSocket(EventBase* eventBase) backoffTimeout_(nullptr), callbacks_(), keepAliveEnabled_(true), - closeOnExec_(true) {} + closeOnExec_(true) { + disableTransparentTls(); +} void AsyncServerSocket::setShutdownSocketSet( const std::weak_ptr& wNewSS) { @@ -193,8 +174,7 @@ AsyncServerSocket::~AsyncServerSocket() { int AsyncServerSocket::stopAccepting(int shutdownFlags) { int result = 0; for (auto& handler : sockets_) { - VLOG(10) << "AsyncServerSocket::stopAccepting " << this << - handler.socket_; + VLOG(10) << "AsyncServerSocket::stopAccepting " << this << handler.socket_; } if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); @@ -258,7 +238,7 @@ void AsyncServerSocket::destroy() { DelayedDestruction::destroy(); } -void AsyncServerSocket::attachEventBase(EventBase *eventBase) { +void AsyncServerSocket::attachEventBase(EventBase* eventBase) { assert(eventBase_ == nullptr); eventBase->dcheckIsInEventBaseThread(); @@ -279,18 +259,19 @@ void AsyncServerSocket::detachEventBase() { } } -void AsyncServerSocket::useExistingSockets(const std::vector& fds) { +void AsyncServerSocket::useExistingSockets( + const std::vector& fds) { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } if (sockets_.size() > 0) { throw std::invalid_argument( - "cannot call useExistingSocket() on a " - "AsyncServerSocket that already has a socket"); + "cannot call useExistingSocket() on a " + "AsyncServerSocket that already has a socket"); } - for (auto fd: fds) { + for (auto fd : fds) { // Set addressFamily_ from this socket. // Note that the socket may not have been bound yet, but // setFromLocalAddress() will still work and get the correct address family. @@ -301,7 +282,7 @@ void AsyncServerSocket::useExistingSockets(const std::vector& fds) { #if __linux__ if (noTransparentTls_) { // Ignore return value, errors are ok - setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0); + netops::setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0); } #endif @@ -311,31 +292,30 @@ void AsyncServerSocket::useExistingSockets(const std::vector& fds) { } } -void AsyncServerSocket::useExistingSocket(int fd) { +void AsyncServerSocket::useExistingSocket(NetworkSocket fd) { useExistingSockets({fd}); } void AsyncServerSocket::bindSocket( - int fd, + NetworkSocket fd, const SocketAddress& address, bool isExistingSocket) { sockaddr_storage addrStorage; address.getAddress(&addrStorage); sockaddr* saddr = reinterpret_cast(&addrStorage); - if (fsp::bind(fd, saddr, address.getActualSize()) != 0) { + if (netops::bind(fd, saddr, address.getActualSize()) != 0) { if (!isExistingSocket) { closeNoInt(fd); } - folly::throwSystemError(errno, - "failed to bind to async server socket: " + - address.describe()); + folly::throwSystemError( + errno, "failed to bind to async server socket: " + address.describe()); } #if __linux__ if (noTransparentTls_) { // Ignore return value, errors are ok - setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0); + netops::setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0); } #endif @@ -347,9 +327,9 @@ void AsyncServerSocket::bindSocket( bool AsyncServerSocket::setZeroCopy(bool enable) { if (msgErrQueueSupported) { - int fd = getSocket(); int val = enable ? 1 : 0; - int ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val)); + int ret = netops::setsockopt( + getNetworkSocket(), SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val)); return (0 == ret); } @@ -366,19 +346,18 @@ void AsyncServerSocket::bind(const SocketAddress& address) { // However, in the normal case we need to create a new socket now. // Don't set socket_ yet, so that socket_ will remain uninitialized if an // error occurs. - int fd; + NetworkSocket fd; if (sockets_.size() == 0) { fd = createSocket(address.getFamily()); } else if (sockets_.size() == 1) { if (address.getFamily() != sockets_[0].addressFamily_) { throw std::invalid_argument( - "Attempted to bind address to socket with " - "different address family"); + "Attempted to bind address to socket with " + "different address family"); } fd = sockets_[0].socket_; } else { - throw std::invalid_argument( - "Attempted to bind to multiple fds"); + throw std::invalid_argument("Attempted to bind to multiple fds"); } bindSocket(fd, address, !sockets_.empty()); @@ -391,13 +370,14 @@ void AsyncServerSocket::bind( throw std::invalid_argument("No ip addresses were provided"); } if (!sockets_.empty()) { - throw std::invalid_argument("Cannot call bind on a AsyncServerSocket " - "that already has a socket."); + throw std::invalid_argument( + "Cannot call bind on a AsyncServerSocket " + "that already has a socket."); } for (const IPAddress& ipAddress : ipAddresses) { SocketAddress address(ipAddress.toFullyQualified(), port); - int fd = createSocket(address.getFamily()); + auto fd = createSocket(address.getFamily()); bindSocket(fd, address, false); } @@ -422,19 +402,21 @@ void AsyncServerSocket::bind(uint16_t port) { constexpr const char* kWildcardNode = kIsWindows ? "" : nullptr; if (getaddrinfo(kWildcardNode, sport, &hints, &res0)) { throw std::invalid_argument( - "Attempted to bind address to socket with " - "bad getaddrinfo"); + "Attempted to bind address to socket with " + "bad getaddrinfo"); } - SCOPE_EXIT { freeaddrinfo(res0); }; + SCOPE_EXIT { + freeaddrinfo(res0); + }; - auto setupAddress = [&] (struct addrinfo* res) { - int s = fsp::socket(res->ai_family, res->ai_socktype, res->ai_protocol); + auto setupAddress = [&](struct addrinfo* res) { + auto s = netops::socket(res->ai_family, res->ai_socktype, res->ai_protocol); // IPv6/IPv4 may not be supported by the kernel - if (s < 0 && errno == EAFNOSUPPORT) { + if (s == NetworkSocket() && errno == EAFNOSUPPORT) { return; } - CHECK_GE(s, 0); + CHECK_NE(s, NetworkSocket()); try { setupSocket(s, res->ai_family); @@ -445,12 +427,14 @@ void AsyncServerSocket::bind(uint16_t port) { if (res->ai_family == AF_INET6) { int v6only = 1; - CHECK(0 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, - &v6only, sizeof(v6only))); + CHECK( + 0 == + netops::setsockopt( + s, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only))); } // Bind to the socket - if (fsp::bind(s, res->ai_addr, socklen_t(res->ai_addrlen)) != 0) { + if (netops::bind(s, res->ai_addr, socklen_t(res->ai_addrlen)) != 0) { folly::throwSystemError( errno, "failed to bind to async server socket for port ", @@ -462,7 +446,7 @@ void AsyncServerSocket::bind(uint16_t port) { #if __linux__ if (noTransparentTls_) { // Ignore return value, errors are ok - setsockopt(s, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0); + netops::setsockopt(s, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0); } #endif @@ -509,7 +493,7 @@ void AsyncServerSocket::bind(uint16_t port) { // were opened, then restarting from scratch. if (port == 0 && !sockets_.empty() && tries != kNumTries) { for (const auto& socket : sockets_) { - if (socket.socket_ <= 0) { + if (socket.socket_ == NetworkSocket()) { continue; } else if ( const auto shutdownSocketSet = wShutdownSocketSet_.lock()) { @@ -532,8 +516,7 @@ void AsyncServerSocket::bind(uint16_t port) { } if (sockets_.size() == 0) { - throw std::runtime_error( - "did not bind any async server socket for port"); + throw std::runtime_error("did not bind any async server socket for port"); } } @@ -544,9 +527,8 @@ void AsyncServerSocket::listen(int backlog) { // Start listening for (auto& handler : sockets_) { - if (fsp::listen(handler.socket_, backlog) == -1) { - folly::throwSystemError(errno, - "failed to listen on async server socket"); + if (netops::listen(handler.socket_, backlog) == -1) { + folly::throwSystemError(errno, "failed to listen on async server socket"); } } } @@ -554,14 +536,13 @@ void AsyncServerSocket::listen(int backlog) { void AsyncServerSocket::getAddress(SocketAddress* addressReturn) const { CHECK(sockets_.size() >= 1); VLOG_IF(2, sockets_.size() > 1) - << "Warning: getAddress() called and multiple addresses available (" - << sockets_.size() << "). Returning only the first one."; + << "Warning: getAddress() called and multiple addresses available (" + << sockets_.size() << "). Returning only the first one."; addressReturn->setFromLocalAddress(sockets_[0].socket_); } -std::vector AsyncServerSocket::getAddresses() - const { +std::vector AsyncServerSocket::getAddresses() const { CHECK(sockets_.size() >= 1); auto tsaVec = std::vector(sockets_.size()); auto tsaIter = tsaVec.begin(); @@ -571,9 +552,10 @@ std::vector AsyncServerSocket::getAddresses() return tsaVec; } -void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback, - EventBase *eventBase, - uint32_t maxAtOnce) { +void AsyncServerSocket::addAcceptCallback( + AcceptCallback* callback, + EventBase* eventBase, + uint32_t maxAtOnce) { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } @@ -620,8 +602,9 @@ void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback, callbacks_.back().consumer = acceptor; } -void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback, - EventBase *eventBase) { +void AsyncServerSocket::removeAcceptCallback( + AcceptCallback* callback, + EventBase* eventBase) { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } @@ -634,8 +617,9 @@ void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback, uint32_t n = 0; while (true) { if (it == callbacks_.end()) { - throw std::runtime_error("AsyncServerSocket::removeAcceptCallback(): " - "accept callback not found"); + throw std::runtime_error( + "AsyncServerSocket::removeAcceptCallback(): " + "accept callback not found"); } if (it->callback == callback && (it->eventBase == eventBase || eventBase == nullptr)) { @@ -699,8 +683,7 @@ void AsyncServerSocket::startAccepting() { } for (auto& handler : sockets_) { - if (!handler.registerHandler( - EventHandler::READ | EventHandler::PERSIST)) { + if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) { throw std::runtime_error("failed to register for accept events"); } } @@ -712,7 +695,7 @@ void AsyncServerSocket::pauseAccepting() { } accepting_ = false; for (auto& handler : sockets_) { - handler. unregisterHandler(); + handler.unregisterHandler(); } // If we were in the accept backoff state, disable the backoff timeout @@ -721,9 +704,9 @@ void AsyncServerSocket::pauseAccepting() { } } -int AsyncServerSocket::createSocket(int family) { - int fd = fsp::socket(family, SOCK_STREAM, 0); - if (fd == -1) { +NetworkSocket AsyncServerSocket::createSocket(int family) { + auto fd = netops::socket(family, SOCK_STREAM, 0); + if (fd == NetworkSocket()) { folly::throwSystemError(errno, "error creating async server socket"); } @@ -736,16 +719,45 @@ int AsyncServerSocket::createSocket(int family) { return fd; } -void AsyncServerSocket::setupSocket(int fd, int family) { +/** + * Enable/Disable TOS reflection for the server socket + * If enabled, the 'accepted' connections will reflect the + * TOS derived from the client's connect request + */ +void AsyncServerSocket::setTosReflect(bool enable) { + if (!kIsLinux || enable == false) { + tosReflect_ = false; + return; + } + + for (auto& handler : sockets_) { + if (handler.socket_ == NetworkSocket()) { + continue; + } + + int val = (enable) ? 1 : 0; + int ret = netops::setsockopt( + handler.socket_, IPPROTO_TCP, TCP_SAVE_SYN, &val, sizeof(val)); + + if (ret == 0) { + VLOG(10) << "Enabled SYN save for socket " << handler.socket_; + } else { + folly::throwSystemError(errno, "failed to enable TOS reflect"); + } + } + tosReflect_ = true; +} + +void AsyncServerSocket::setupSocket(NetworkSocket fd, int family) { // Put the socket in non-blocking mode - if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) { - folly::throwSystemError(errno, - "failed to put socket in non-blocking mode"); + if (netops::set_socket_non_blocking(fd) != 0) { + folly::throwSystemError(errno, "failed to put socket in non-blocking mode"); } // Set reuseaddr to avoid 2MSL delay on server restart int one = 1; - if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) { + if (netops::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != + 0) { // This isn't a fatal error; just log an error message and continue LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket " << errno; } @@ -753,42 +765,46 @@ void AsyncServerSocket::setupSocket(int fd, int family) { // Set reuseport to support multiple accept threads int zero = 0; if (reusePortEnabled_ && - setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)) != 0) { + netops::setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)) != + 0) { LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket " - << strerror(errno); + << errnoStr(errno); #ifdef WIN32 folly::throwSystemError(errno, "failed to bind to the async server socket"); #else SocketAddress address; address.setFromLocalAddress(fd); - folly::throwSystemError(errno, - "failed to bind to async server socket: " + - address.describe()); + folly::throwSystemError( + errno, "failed to bind to async server socket: " + address.describe()); #endif } // Set keepalive as desired - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, - (keepAliveEnabled_) ? &one : &zero, sizeof(int)) != 0) { - LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: " << - strerror(errno); + if (netops::setsockopt( + fd, + SOL_SOCKET, + SO_KEEPALIVE, + (keepAliveEnabled_) ? &one : &zero, + sizeof(int)) != 0) { + LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: " + << errnoStr(errno); } // Setup FD_CLOEXEC flag - if (closeOnExec_ && - (-1 == folly::setCloseOnExec(fd, closeOnExec_))) { - LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: " << - strerror(errno); + if (closeOnExec_ && (-1 == netops::set_socket_close_on_exec(fd))) { + LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: " + << errnoStr(errno); } // Set TCP nodelay if available, MAC OS X Hack // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html #ifndef TCP_NOPUSH if (family != AF_UNIX) { - if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0) { + if (netops::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != + 0) { // This isn't a fatal error; just log an error message and continue - LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: " << - strerror(errno); + LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: " + << errnoStr(errno); } } #else @@ -808,9 +824,10 @@ void AsyncServerSocket::setupSocket(int fd, int family) { } } -void AsyncServerSocket::handlerReady(uint16_t /* events */, - int fd, - sa_family_t addressFamily) noexcept { +void AsyncServerSocket::handlerReady( + uint16_t /* events */, + NetworkSocket fd, + sa_family_t addressFamily) noexcept { assert(!callbacks_.empty()); DestructorGuard dg(this); @@ -830,24 +847,59 @@ void AsyncServerSocket::handlerReady(uint16_t /* events */, } // Accept a new client socket -#ifdef SOCK_NONBLOCK -// int clientSocket = accept4(fd, saddr, &addrLen, SOCK_NONBLOCK); +// #ifdef SOCK_NONBLOCK +// auto clientSocket = NetworkSocket::fromFd( +// accept4(fd.toFd(), saddr, &addrLen, SOCK_NONBLOCK)); // #else - int clientSocket = accept(fd, saddr, &addrLen); -#endif + auto clientSocket = netops::accept(fd, saddr, &addrLen); +// #endif address.setFromSockaddr(saddr, addrLen); - if (clientSocket >= 0 && connectionEventCallback_) { + if (clientSocket != NetworkSocket() && connectionEventCallback_) { connectionEventCallback_->onConnectionAccepted(clientSocket, address); } + // Connection accepted, get the SYN packet from the client if + // TOS reflect is enabled + if (kIsLinux && clientSocket != NetworkSocket() && tosReflect_) { + std::array buffer; + socklen_t len = sizeof(buffer); + int ret = netops::getsockopt( + clientSocket, IPPROTO_TCP, TCP_SAVED_SYN, &buffer, &len); + + if (ret == 0) { + uint32_t tosWord = folly::Endian::big(buffer[0]); + if (addressFamily == AF_INET6) { + tosWord = (tosWord & 0x0FC00000) >> 20; + ret = netops::setsockopt( + clientSocket, + IPPROTO_IPV6, + IPV6_TCLASS, + &tosWord, + sizeof(tosWord)); + } else if (addressFamily == AF_INET) { + tosWord = (tosWord & 0x00FC0000) >> 16; + ret = netops::setsockopt( + clientSocket, IPPROTO_IP, IP_TOS, &tosWord, sizeof(tosWord)); + } + + if (ret != 0) { + LOG(ERROR) << "Unable to set TOS for accepted socket " + << clientSocket; + } + } else { + LOG(ERROR) << "Unable to get SYN packet for accepted socket " + << clientSocket; + } + } + std::chrono::time_point nowMs = - std::chrono::steady_clock::now(); + std::chrono::steady_clock::now(); auto timeSinceLastAccept = std::max( - 0, - nowMs.time_since_epoch().count() - - lastAccepTimestamp_.time_since_epoch().count()); + 0, + nowMs.time_since_epoch().count() - + lastAccepTimestamp_.time_since_epoch().count()); lastAccepTimestamp_ = nowMs; if (acceptRate_ < 1) { acceptRate_ *= 1 + acceptRateAdjustSpeed_ * timeSinceLastAccept; @@ -855,18 +907,18 @@ void AsyncServerSocket::handlerReady(uint16_t /* events */, acceptRate_ = 1; } else if (rand() > acceptRate_ * RAND_MAX) { ++numDroppedConnections_; - if (clientSocket >= 0) { + if (clientSocket != NetworkSocket()) { closeNoInt(clientSocket); if (connectionEventCallback_) { - connectionEventCallback_->onConnectionDropped(clientSocket, - address); + connectionEventCallback_->onConnectionDropped( + clientSocket, address); } } continue; } } - if (clientSocket < 0) { + if (clientSocket == NetworkSocket()) { if (errno == EAGAIN) { // No more sockets to accept right now. // Check for this code first, since it's the most common. @@ -876,7 +928,7 @@ void AsyncServerSocket::handlerReady(uint16_t /* events */, // too quickly. Pause accepting briefly to back off and give the server // a chance to recover. LOG(ERROR) << "accept failed: out of file descriptors; entering accept " - "back-off state"; + "back-off state"; enterBackoff(); // Dispatch the error message @@ -892,10 +944,10 @@ void AsyncServerSocket::handlerReady(uint16_t /* events */, #ifndef SOCK_NONBLOCK // Explicitly set the new connection to non-blocking mode - if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) { + if (netops::set_socket_non_blocking(clientSocket) != 0) { closeNoInt(clientSocket); - dispatchError("failed to set accepted socket to non-blocking mode", - errno); + dispatchError( + "failed to set accepted socket to non-blocking mode", errno); if (connectionEventCallback_) { connectionEventCallback_->onConnectionDropped(clientSocket, address); } @@ -913,14 +965,15 @@ void AsyncServerSocket::handlerReady(uint16_t /* events */, } } -void AsyncServerSocket::dispatchSocket(int socket, - SocketAddress&& address) { +void AsyncServerSocket::dispatchSocket( + NetworkSocket socket, + SocketAddress&& address) { uint32_t startingIndex = callbackIndex_; // Short circuit if the callback is in the primary EventBase thread - CallbackInfo *info = nextCallback(); - if (info->eventBase == nullptr) { + CallbackInfo* info = nextCallback(); + if (info->eventBase == nullptr || info->eventBase == this->eventBase_) { info->callback->connectionAccepted(socket, address); return; } @@ -937,8 +990,7 @@ void AsyncServerSocket::dispatchSocket(int socket, if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) { if (connectionEventCallback_) { connectionEventCallback_->onConnectionEnqueuedForAcceptorCallback( - socket, - addr); + socket, addr); } // Success! return. return; @@ -953,7 +1005,6 @@ void AsyncServerSocket::dispatchSocket(int socket, acceptRate_ *= 1 - kAcceptRateDecreaseSpeed; } - if (callbackIndex_ == startingIndex) { // The notification queue was full // We can't really do anything at this point other than close the socket. @@ -977,9 +1028,9 @@ void AsyncServerSocket::dispatchSocket(int socket, } } -void AsyncServerSocket::dispatchError(const char *msgstr, int errnoValue) { +void AsyncServerSocket::dispatchError(const char* msgstr, int errnoValue) { uint32_t startingIndex = callbackIndex_; - CallbackInfo *info = nextCallback(); + CallbackInfo* info = nextCallback(); // Create a message to send over the notification queue QueueMessage msg; @@ -989,9 +1040,9 @@ void AsyncServerSocket::dispatchError(const char *msgstr, int errnoValue) { while (true) { // Short circuit if the callback is in the primary EventBase thread - if (info->eventBase == nullptr) { + if (info->eventBase == nullptr || info->eventBase == this->eventBase_) { std::runtime_error ex( - std::string(msgstr) + folly::to(errnoValue)); + std::string(msgstr) + folly::to(errnoValue)); info->callback->acceptError(ex); return; } @@ -1082,16 +1133,15 @@ void AsyncServerSocket::backoffTimeoutExpired() { // Register the handler. for (auto& handler : sockets_) { - if (!handler.registerHandler( - EventHandler::READ | EventHandler::PERSIST)) { + if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) { // We're hosed. We could just re-schedule backoffTimeout_ to // re-try again after a little bit. However, we don't want to // loop retrying forever if we can't re-enable accepts. Just // abort the entire program in this state; things are really bad // and restarting the entire server is probably the best remedy. LOG(ERROR) - << "failed to re-enable AsyncServerSocket accepts after backoff; " - << "crashing now"; + << "failed to re-enable AsyncServerSocket accepts after backoff; " + << "crashing now"; abort(); } } diff --git a/android/third-party/overrides/Folly/CMakeLists.txt b/android/third-party/overrides/Folly/CMakeLists.txt index e3c4108ae..00d663f05 100644 --- a/android/third-party/overrides/Folly/CMakeLists.txt +++ b/android/third-party/overrides/Folly/CMakeLists.txt @@ -109,6 +109,7 @@ list(APPEND SRC_FILES ${FOLLY_DIR}/io/async/HHWheelTimer.cpp ${FOLLY_DIR}/Format.cpp ${FOLLY_DIR}/String.cpp ${FOLLY_DIR}/memory/detail/MallocImpl.cpp + ${FOLLY_DIR}/net/NetOps.cpp ) add_library(${PACKAGE_NAME} STATIC ${SRC_FILES}) diff --git a/android/third-party/overrides/RSocket/CMakeLists.txt b/android/third-party/overrides/RSocket/CMakeLists.txt index 82bb93b7d..b8d260487 100644 --- a/android/third-party/overrides/RSocket/CMakeLists.txt +++ b/android/third-party/overrides/RSocket/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required (VERSION 3.6.0) PROJECT(rsocket CXX) set(PACKAGE_NAME rsocket) -set(RSOCKET_VERSION 0.10.3) +set(RSOCKET_VERSION 0.10.7) set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_EXTENSIONS OFF) diff --git a/xplat/CMakeLists.txt b/xplat/CMakeLists.txt index 30f4321de..a60e12f3a 100644 --- a/xplat/CMakeLists.txt +++ b/xplat/CMakeLists.txt @@ -4,7 +4,7 @@ set(CMAKE_VERBOSE_MAKEFILE on) set(PACKAGE_NAME flippercpp) set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_EXTENSIONS OFF) -set(RSOCKET_VERSION 0.10.3) +set(RSOCKET_VERSION 0.10.7) set(external_DIR ${PROJECT_SOURCE_DIR}/../android/third-party/external) set(libfolly_DIR ${external_DIR}/folly/) set(rsocket_DIR ${external_DIR}/RSocket/)