diff --git a/xplat/Flipper/FlipperConnectionManagerImpl.cpp b/xplat/Flipper/FlipperConnectionManagerImpl.cpp index 491f4dcc1..cdd87cf6f 100644 --- a/xplat/Flipper/FlipperConnectionManagerImpl.cpp +++ b/xplat/Flipper/FlipperConnectionManagerImpl.cpp @@ -17,6 +17,7 @@ #include #include "ConnectionContextStore.h" #include "FireAndForgetBasedFlipperResponder.h" +#include "FlipperRSocketResponder.h" #include "FlipperResponderImpl.h" #include "FlipperStep.h" #include "Log.h" @@ -33,7 +34,6 @@ static constexpr int maxPayloadSize = 0xFFFFFF; namespace facebook { namespace flipper { -rsocket::Payload toRSocketPayload(dynamic data); class ConnectionEvents : public rsocket::RSocketConnectionEvents { private: @@ -66,68 +66,6 @@ class ConnectionEvents : public rsocket::RSocketConnectionEvents { } }; -class Responder : public rsocket::RSocketResponder { - private: - FlipperConnectionManagerImpl* websocket_; - - public: - Responder(FlipperConnectionManagerImpl* websocket) : websocket_(websocket) {} - - void handleFireAndForget( - rsocket::Payload request, - rsocket::StreamId streamId) { - const auto payload = request.moveDataToString(); - std::unique_ptr responder; - auto message = folly::parseJson(payload); - if (message.find("id") != message.items().end()) { - auto id = message["id"].getInt(); - responder = - std::make_unique(websocket_, id); - } - - websocket_->callbacks_->onMessageReceived( - folly::parseJson(payload), std::move(responder)); - } - - std::shared_ptr> - handleRequestResponse(rsocket::Payload request, rsocket::StreamId streamId) { - const auto requestString = request.moveDataToString(); - - auto dynamicSingle = yarpl::single::Single::create( - [payload = std::move(requestString), this](auto observer) { - auto responder = std::make_unique(observer); - websocket_->callbacks_->onMessageReceived( - folly::parseJson(payload), std::move(responder)); - }); - - auto rsocketSingle = yarpl::single::Single::create( - [payload = std::move(requestString), dynamicSingle, this]( - auto observer) { - observer->onSubscribe( - yarpl::single::SingleSubscriptions::empty()); - dynamicSingle->subscribe( - [observer, this](folly::dynamic d) { - websocket_->connectionEventBase_->runInEventBaseThread( - [observer, d]() { - try { - observer->onSuccess(toRSocketPayload(d)); - - } catch (std::exception& e) { - log(e.what()); - observer->onError(e); - } - }); - }, - [observer, this](folly::exception_wrapper e) { - websocket_->connectionEventBase_->runInEventBaseThread( - [observer, e]() { observer->onError(e); }); - }); - }); - - return rsocketSingle; - } -}; - FlipperConnectionManagerImpl::FlipperConnectionManagerImpl( FlipperInitConfig config, std::shared_ptr state, @@ -255,7 +193,7 @@ void FlipperConnectionManagerImpl::connectSecurely() { std::move(address), std::move(sslContext)), std::move(parameters), - std::make_shared(this), + std::make_shared(this), std::chrono::seconds(connectionKeepaliveSeconds), // keepaliveInterval nullptr, // stats std::make_shared(this)) diff --git a/xplat/Flipper/FlipperConnectionManagerImpl.h b/xplat/Flipper/FlipperConnectionManagerImpl.h index 89a6b8b77..a72c32e2e 100644 --- a/xplat/Flipper/FlipperConnectionManagerImpl.h +++ b/xplat/Flipper/FlipperConnectionManagerImpl.h @@ -19,11 +19,13 @@ namespace flipper { class ConnectionEvents; class ConnectionContextStore; -class Responder; +class FlipperRSocketResponder; + +rsocket::Payload toRSocketPayload(folly::dynamic data); class FlipperConnectionManagerImpl : public FlipperConnectionManager { friend ConnectionEvents; - friend Responder; + friend FlipperRSocketResponder; public: FlipperConnectionManagerImpl(FlipperInitConfig config, std::shared_ptr state, std::shared_ptr contextStore); diff --git a/xplat/Flipper/FlipperRSocketResponder.cpp b/xplat/Flipper/FlipperRSocketResponder.cpp new file mode 100644 index 000000000..3073de49f --- /dev/null +++ b/xplat/Flipper/FlipperRSocketResponder.cpp @@ -0,0 +1,77 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the LICENSE + * file in the root directory of this source tree. + */ +#include "FlipperRSocketResponder.h" +#include +#include +#include "FireAndForgetBasedFlipperResponder.h" +#include "FlipperConnectionManagerImpl.h" +#include "FlipperResponderImpl.h" +#include "Log.h" + +using folly::dynamic; + +namespace facebook { +namespace flipper { + +rsocket::Payload toRSocketPayload(dynamic data); + +void FlipperRSocketResponder::handleFireAndForget( + rsocket::Payload request, + rsocket::StreamId streamId) { + const auto payload = request.moveDataToString(); + std::unique_ptr responder; + auto message = folly::parseJson(payload); + if (message.find("id") != message.items().end()) { + auto id = message["id"].getInt(); + responder = + std::make_unique(websocket_, id); + } + + websocket_->callbacks_->onMessageReceived( + folly::parseJson(payload), std::move(responder)); +} + +std::shared_ptr> +FlipperRSocketResponder::handleRequestResponse( + rsocket::Payload request, + rsocket::StreamId streamId) { + const auto requestString = request.moveDataToString(); + + auto dynamicSingle = yarpl::single::Single::create( + [payload = std::move(requestString), this](auto observer) { + auto responder = std::make_unique(observer); + websocket_->callbacks_->onMessageReceived( + folly::parseJson(payload), std::move(responder)); + }); + + auto rsocketSingle = yarpl::single::Single::create( + [payload = std::move(requestString), dynamicSingle, this](auto observer) { + observer->onSubscribe(yarpl::single::SingleSubscriptions::empty()); + dynamicSingle->subscribe( + [observer, this](folly::dynamic d) { + websocket_->connectionEventBase_->runInEventBaseThread( + [observer, d]() { + try { + observer->onSuccess(toRSocketPayload(d)); + + } catch (std::exception& e) { + log(e.what()); + observer->onError(e); + } + }); + }, + [observer, this](folly::exception_wrapper e) { + websocket_->connectionEventBase_->runInEventBaseThread( + [observer, e]() { observer->onError(e); }); + }); + }); + + return rsocketSingle; +} + +} // namespace flipper +} // namespace facebook diff --git a/xplat/Flipper/FlipperRSocketResponder.h b/xplat/Flipper/FlipperRSocketResponder.h new file mode 100644 index 000000000..0071b2bbd --- /dev/null +++ b/xplat/Flipper/FlipperRSocketResponder.h @@ -0,0 +1,31 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the LICENSE + * file in the root directory of this source tree. + */ +#include + +namespace facebook { +namespace flipper { + +class FlipperConnectionManagerImpl; + +class FlipperRSocketResponder : public rsocket::RSocketResponder { + private: + FlipperConnectionManagerImpl* websocket_; + + public: + FlipperRSocketResponder(FlipperConnectionManagerImpl* websocket) + : websocket_(websocket){}; + + void handleFireAndForget( + rsocket::Payload request, + rsocket::StreamId streamId); + + std::shared_ptr> + handleRequestResponse(rsocket::Payload request, rsocket::StreamId streamId); +}; + +} // namespace flipper +} // namespace facebook