From c48c1a728ae475b99183c28a5b8d620f609d9376 Mon Sep 17 00:00:00 2001 From: John Knox Date: Mon, 11 Feb 2019 14:01:32 -0800 Subject: [PATCH] Extract Responder class out of FlipperConnectionManagerImpl Summary: For one thing, this file is too big so it's good to split it up. Also, having this responder defined in the .cpp file makes it hard to test. Extracting it for testability. Reviewed By: passy Differential Revision: D14000079 fbshipit-source-id: 8da4e0e325f48c0ada8efc7cd6fffcb3440c6e26 --- .../Flipper/FlipperConnectionManagerImpl.cpp | 66 +--------------- xplat/Flipper/FlipperConnectionManagerImpl.h | 6 +- xplat/Flipper/FlipperRSocketResponder.cpp | 77 +++++++++++++++++++ xplat/Flipper/FlipperRSocketResponder.h | 31 ++++++++ 4 files changed, 114 insertions(+), 66 deletions(-) create mode 100644 xplat/Flipper/FlipperRSocketResponder.cpp create mode 100644 xplat/Flipper/FlipperRSocketResponder.h diff --git a/xplat/Flipper/FlipperConnectionManagerImpl.cpp b/xplat/Flipper/FlipperConnectionManagerImpl.cpp index 491f4dcc1..cdd87cf6f 100644 --- a/xplat/Flipper/FlipperConnectionManagerImpl.cpp +++ b/xplat/Flipper/FlipperConnectionManagerImpl.cpp @@ -17,6 +17,7 @@ #include #include "ConnectionContextStore.h" #include "FireAndForgetBasedFlipperResponder.h" +#include "FlipperRSocketResponder.h" #include "FlipperResponderImpl.h" #include "FlipperStep.h" #include "Log.h" @@ -33,7 +34,6 @@ static constexpr int maxPayloadSize = 0xFFFFFF; namespace facebook { namespace flipper { -rsocket::Payload toRSocketPayload(dynamic data); class ConnectionEvents : public rsocket::RSocketConnectionEvents { private: @@ -66,68 +66,6 @@ class ConnectionEvents : public rsocket::RSocketConnectionEvents { } }; -class Responder : public rsocket::RSocketResponder { - private: - FlipperConnectionManagerImpl* websocket_; - - public: - Responder(FlipperConnectionManagerImpl* websocket) : websocket_(websocket) {} - - void handleFireAndForget( - rsocket::Payload request, - rsocket::StreamId streamId) { - const auto payload = request.moveDataToString(); - std::unique_ptr responder; - auto message = folly::parseJson(payload); - if (message.find("id") != message.items().end()) { - auto id = message["id"].getInt(); - responder = - std::make_unique(websocket_, id); - } - - websocket_->callbacks_->onMessageReceived( - folly::parseJson(payload), std::move(responder)); - } - - std::shared_ptr> - handleRequestResponse(rsocket::Payload request, rsocket::StreamId streamId) { - const auto requestString = request.moveDataToString(); - - auto dynamicSingle = yarpl::single::Single::create( - [payload = std::move(requestString), this](auto observer) { - auto responder = std::make_unique(observer); - websocket_->callbacks_->onMessageReceived( - folly::parseJson(payload), std::move(responder)); - }); - - auto rsocketSingle = yarpl::single::Single::create( - [payload = std::move(requestString), dynamicSingle, this]( - auto observer) { - observer->onSubscribe( - yarpl::single::SingleSubscriptions::empty()); - dynamicSingle->subscribe( - [observer, this](folly::dynamic d) { - websocket_->connectionEventBase_->runInEventBaseThread( - [observer, d]() { - try { - observer->onSuccess(toRSocketPayload(d)); - - } catch (std::exception& e) { - log(e.what()); - observer->onError(e); - } - }); - }, - [observer, this](folly::exception_wrapper e) { - websocket_->connectionEventBase_->runInEventBaseThread( - [observer, e]() { observer->onError(e); }); - }); - }); - - return rsocketSingle; - } -}; - FlipperConnectionManagerImpl::FlipperConnectionManagerImpl( FlipperInitConfig config, std::shared_ptr state, @@ -255,7 +193,7 @@ void FlipperConnectionManagerImpl::connectSecurely() { std::move(address), std::move(sslContext)), std::move(parameters), - std::make_shared(this), + std::make_shared(this), std::chrono::seconds(connectionKeepaliveSeconds), // keepaliveInterval nullptr, // stats std::make_shared(this)) diff --git a/xplat/Flipper/FlipperConnectionManagerImpl.h b/xplat/Flipper/FlipperConnectionManagerImpl.h index 89a6b8b77..a72c32e2e 100644 --- a/xplat/Flipper/FlipperConnectionManagerImpl.h +++ b/xplat/Flipper/FlipperConnectionManagerImpl.h @@ -19,11 +19,13 @@ namespace flipper { class ConnectionEvents; class ConnectionContextStore; -class Responder; +class FlipperRSocketResponder; + +rsocket::Payload toRSocketPayload(folly::dynamic data); class FlipperConnectionManagerImpl : public FlipperConnectionManager { friend ConnectionEvents; - friend Responder; + friend FlipperRSocketResponder; public: FlipperConnectionManagerImpl(FlipperInitConfig config, std::shared_ptr state, std::shared_ptr contextStore); diff --git a/xplat/Flipper/FlipperRSocketResponder.cpp b/xplat/Flipper/FlipperRSocketResponder.cpp new file mode 100644 index 000000000..3073de49f --- /dev/null +++ b/xplat/Flipper/FlipperRSocketResponder.cpp @@ -0,0 +1,77 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the LICENSE + * file in the root directory of this source tree. + */ +#include "FlipperRSocketResponder.h" +#include +#include +#include "FireAndForgetBasedFlipperResponder.h" +#include "FlipperConnectionManagerImpl.h" +#include "FlipperResponderImpl.h" +#include "Log.h" + +using folly::dynamic; + +namespace facebook { +namespace flipper { + +rsocket::Payload toRSocketPayload(dynamic data); + +void FlipperRSocketResponder::handleFireAndForget( + rsocket::Payload request, + rsocket::StreamId streamId) { + const auto payload = request.moveDataToString(); + std::unique_ptr responder; + auto message = folly::parseJson(payload); + if (message.find("id") != message.items().end()) { + auto id = message["id"].getInt(); + responder = + std::make_unique(websocket_, id); + } + + websocket_->callbacks_->onMessageReceived( + folly::parseJson(payload), std::move(responder)); +} + +std::shared_ptr> +FlipperRSocketResponder::handleRequestResponse( + rsocket::Payload request, + rsocket::StreamId streamId) { + const auto requestString = request.moveDataToString(); + + auto dynamicSingle = yarpl::single::Single::create( + [payload = std::move(requestString), this](auto observer) { + auto responder = std::make_unique(observer); + websocket_->callbacks_->onMessageReceived( + folly::parseJson(payload), std::move(responder)); + }); + + auto rsocketSingle = yarpl::single::Single::create( + [payload = std::move(requestString), dynamicSingle, this](auto observer) { + observer->onSubscribe(yarpl::single::SingleSubscriptions::empty()); + dynamicSingle->subscribe( + [observer, this](folly::dynamic d) { + websocket_->connectionEventBase_->runInEventBaseThread( + [observer, d]() { + try { + observer->onSuccess(toRSocketPayload(d)); + + } catch (std::exception& e) { + log(e.what()); + observer->onError(e); + } + }); + }, + [observer, this](folly::exception_wrapper e) { + websocket_->connectionEventBase_->runInEventBaseThread( + [observer, e]() { observer->onError(e); }); + }); + }); + + return rsocketSingle; +} + +} // namespace flipper +} // namespace facebook diff --git a/xplat/Flipper/FlipperRSocketResponder.h b/xplat/Flipper/FlipperRSocketResponder.h new file mode 100644 index 000000000..0071b2bbd --- /dev/null +++ b/xplat/Flipper/FlipperRSocketResponder.h @@ -0,0 +1,31 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the LICENSE + * file in the root directory of this source tree. + */ +#include + +namespace facebook { +namespace flipper { + +class FlipperConnectionManagerImpl; + +class FlipperRSocketResponder : public rsocket::RSocketResponder { + private: + FlipperConnectionManagerImpl* websocket_; + + public: + FlipperRSocketResponder(FlipperConnectionManagerImpl* websocket) + : websocket_(websocket){}; + + void handleFireAndForget( + rsocket::Payload request, + rsocket::StreamId streamId); + + std::shared_ptr> + handleRequestResponse(rsocket::Payload request, rsocket::StreamId streamId); +}; + +} // namespace flipper +} // namespace facebook