Extract Responder class out of FlipperConnectionManagerImpl
Summary: For one thing, this file is too big so it's good to split it up. Also, having this responder defined in the .cpp file makes it hard to test. Extracting it for testability. Reviewed By: passy Differential Revision: D14000079 fbshipit-source-id: 8da4e0e325f48c0ada8efc7cd6fffcb3440c6e26
This commit is contained in:
committed by
Facebook Github Bot
parent
4a3de26a88
commit
c48c1a728a
@@ -17,6 +17,7 @@
|
||||
#include <thread>
|
||||
#include "ConnectionContextStore.h"
|
||||
#include "FireAndForgetBasedFlipperResponder.h"
|
||||
#include "FlipperRSocketResponder.h"
|
||||
#include "FlipperResponderImpl.h"
|
||||
#include "FlipperStep.h"
|
||||
#include "Log.h"
|
||||
@@ -33,7 +34,6 @@ static constexpr int maxPayloadSize = 0xFFFFFF;
|
||||
namespace facebook {
|
||||
namespace flipper {
|
||||
|
||||
rsocket::Payload toRSocketPayload(dynamic data);
|
||||
|
||||
class ConnectionEvents : public rsocket::RSocketConnectionEvents {
|
||||
private:
|
||||
@@ -66,68 +66,6 @@ class ConnectionEvents : public rsocket::RSocketConnectionEvents {
|
||||
}
|
||||
};
|
||||
|
||||
class Responder : public rsocket::RSocketResponder {
|
||||
private:
|
||||
FlipperConnectionManagerImpl* websocket_;
|
||||
|
||||
public:
|
||||
Responder(FlipperConnectionManagerImpl* websocket) : websocket_(websocket) {}
|
||||
|
||||
void handleFireAndForget(
|
||||
rsocket::Payload request,
|
||||
rsocket::StreamId streamId) {
|
||||
const auto payload = request.moveDataToString();
|
||||
std::unique_ptr<FireAndForgetBasedFlipperResponder> responder;
|
||||
auto message = folly::parseJson(payload);
|
||||
if (message.find("id") != message.items().end()) {
|
||||
auto id = message["id"].getInt();
|
||||
responder =
|
||||
std::make_unique<FireAndForgetBasedFlipperResponder>(websocket_, id);
|
||||
}
|
||||
|
||||
websocket_->callbacks_->onMessageReceived(
|
||||
folly::parseJson(payload), std::move(responder));
|
||||
}
|
||||
|
||||
std::shared_ptr<yarpl::single::Single<rsocket::Payload>>
|
||||
handleRequestResponse(rsocket::Payload request, rsocket::StreamId streamId) {
|
||||
const auto requestString = request.moveDataToString();
|
||||
|
||||
auto dynamicSingle = yarpl::single::Single<folly::dynamic>::create(
|
||||
[payload = std::move(requestString), this](auto observer) {
|
||||
auto responder = std::make_unique<FlipperResponderImpl>(observer);
|
||||
websocket_->callbacks_->onMessageReceived(
|
||||
folly::parseJson(payload), std::move(responder));
|
||||
});
|
||||
|
||||
auto rsocketSingle = yarpl::single::Single<rsocket::Payload>::create(
|
||||
[payload = std::move(requestString), dynamicSingle, this](
|
||||
auto observer) {
|
||||
observer->onSubscribe(
|
||||
yarpl::single::SingleSubscriptions::empty());
|
||||
dynamicSingle->subscribe(
|
||||
[observer, this](folly::dynamic d) {
|
||||
websocket_->connectionEventBase_->runInEventBaseThread(
|
||||
[observer, d]() {
|
||||
try {
|
||||
observer->onSuccess(toRSocketPayload(d));
|
||||
|
||||
} catch (std::exception& e) {
|
||||
log(e.what());
|
||||
observer->onError(e);
|
||||
}
|
||||
});
|
||||
},
|
||||
[observer, this](folly::exception_wrapper e) {
|
||||
websocket_->connectionEventBase_->runInEventBaseThread(
|
||||
[observer, e]() { observer->onError(e); });
|
||||
});
|
||||
});
|
||||
|
||||
return rsocketSingle;
|
||||
}
|
||||
};
|
||||
|
||||
FlipperConnectionManagerImpl::FlipperConnectionManagerImpl(
|
||||
FlipperInitConfig config,
|
||||
std::shared_ptr<FlipperState> state,
|
||||
@@ -255,7 +193,7 @@ void FlipperConnectionManagerImpl::connectSecurely() {
|
||||
std::move(address),
|
||||
std::move(sslContext)),
|
||||
std::move(parameters),
|
||||
std::make_shared<Responder>(this),
|
||||
std::make_shared<FlipperRSocketResponder>(this),
|
||||
std::chrono::seconds(connectionKeepaliveSeconds), // keepaliveInterval
|
||||
nullptr, // stats
|
||||
std::make_shared<ConnectionEvents>(this))
|
||||
|
||||
@@ -19,11 +19,13 @@ namespace flipper {
|
||||
|
||||
class ConnectionEvents;
|
||||
class ConnectionContextStore;
|
||||
class Responder;
|
||||
class FlipperRSocketResponder;
|
||||
|
||||
rsocket::Payload toRSocketPayload(folly::dynamic data);
|
||||
|
||||
class FlipperConnectionManagerImpl : public FlipperConnectionManager {
|
||||
friend ConnectionEvents;
|
||||
friend Responder;
|
||||
friend FlipperRSocketResponder;
|
||||
|
||||
public:
|
||||
FlipperConnectionManagerImpl(FlipperInitConfig config, std::shared_ptr<FlipperState> state, std::shared_ptr<ConnectionContextStore> contextStore);
|
||||
|
||||
77
xplat/Flipper/FlipperRSocketResponder.cpp
Normal file
77
xplat/Flipper/FlipperRSocketResponder.cpp
Normal file
@@ -0,0 +1,77 @@
|
||||
/**
|
||||
* Copyright (c) Facebook, Inc. and its affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the LICENSE
|
||||
* file in the root directory of this source tree.
|
||||
*/
|
||||
#include "FlipperRSocketResponder.h"
|
||||
#include <folly/json.h>
|
||||
#include <rsocket/RSocket.h>
|
||||
#include "FireAndForgetBasedFlipperResponder.h"
|
||||
#include "FlipperConnectionManagerImpl.h"
|
||||
#include "FlipperResponderImpl.h"
|
||||
#include "Log.h"
|
||||
|
||||
using folly::dynamic;
|
||||
|
||||
namespace facebook {
|
||||
namespace flipper {
|
||||
|
||||
rsocket::Payload toRSocketPayload(dynamic data);
|
||||
|
||||
void FlipperRSocketResponder::handleFireAndForget(
|
||||
rsocket::Payload request,
|
||||
rsocket::StreamId streamId) {
|
||||
const auto payload = request.moveDataToString();
|
||||
std::unique_ptr<FireAndForgetBasedFlipperResponder> responder;
|
||||
auto message = folly::parseJson(payload);
|
||||
if (message.find("id") != message.items().end()) {
|
||||
auto id = message["id"].getInt();
|
||||
responder =
|
||||
std::make_unique<FireAndForgetBasedFlipperResponder>(websocket_, id);
|
||||
}
|
||||
|
||||
websocket_->callbacks_->onMessageReceived(
|
||||
folly::parseJson(payload), std::move(responder));
|
||||
}
|
||||
|
||||
std::shared_ptr<yarpl::single::Single<rsocket::Payload>>
|
||||
FlipperRSocketResponder::handleRequestResponse(
|
||||
rsocket::Payload request,
|
||||
rsocket::StreamId streamId) {
|
||||
const auto requestString = request.moveDataToString();
|
||||
|
||||
auto dynamicSingle = yarpl::single::Single<folly::dynamic>::create(
|
||||
[payload = std::move(requestString), this](auto observer) {
|
||||
auto responder = std::make_unique<FlipperResponderImpl>(observer);
|
||||
websocket_->callbacks_->onMessageReceived(
|
||||
folly::parseJson(payload), std::move(responder));
|
||||
});
|
||||
|
||||
auto rsocketSingle = yarpl::single::Single<rsocket::Payload>::create(
|
||||
[payload = std::move(requestString), dynamicSingle, this](auto observer) {
|
||||
observer->onSubscribe(yarpl::single::SingleSubscriptions::empty());
|
||||
dynamicSingle->subscribe(
|
||||
[observer, this](folly::dynamic d) {
|
||||
websocket_->connectionEventBase_->runInEventBaseThread(
|
||||
[observer, d]() {
|
||||
try {
|
||||
observer->onSuccess(toRSocketPayload(d));
|
||||
|
||||
} catch (std::exception& e) {
|
||||
log(e.what());
|
||||
observer->onError(e);
|
||||
}
|
||||
});
|
||||
},
|
||||
[observer, this](folly::exception_wrapper e) {
|
||||
websocket_->connectionEventBase_->runInEventBaseThread(
|
||||
[observer, e]() { observer->onError(e); });
|
||||
});
|
||||
});
|
||||
|
||||
return rsocketSingle;
|
||||
}
|
||||
|
||||
} // namespace flipper
|
||||
} // namespace facebook
|
||||
31
xplat/Flipper/FlipperRSocketResponder.h
Normal file
31
xplat/Flipper/FlipperRSocketResponder.h
Normal file
@@ -0,0 +1,31 @@
|
||||
/**
|
||||
* Copyright (c) Facebook, Inc. and its affiliates.
|
||||
*
|
||||
* This source code is licensed under the MIT license found in the LICENSE
|
||||
* file in the root directory of this source tree.
|
||||
*/
|
||||
#include <rsocket/RSocketResponder.h>
|
||||
|
||||
namespace facebook {
|
||||
namespace flipper {
|
||||
|
||||
class FlipperConnectionManagerImpl;
|
||||
|
||||
class FlipperRSocketResponder : public rsocket::RSocketResponder {
|
||||
private:
|
||||
FlipperConnectionManagerImpl* websocket_;
|
||||
|
||||
public:
|
||||
FlipperRSocketResponder(FlipperConnectionManagerImpl* websocket)
|
||||
: websocket_(websocket){};
|
||||
|
||||
void handleFireAndForget(
|
||||
rsocket::Payload request,
|
||||
rsocket::StreamId streamId);
|
||||
|
||||
std::shared_ptr<yarpl::single::Single<rsocket::Payload>>
|
||||
handleRequestResponse(rsocket::Payload request, rsocket::StreamId streamId);
|
||||
};
|
||||
|
||||
} // namespace flipper
|
||||
} // namespace facebook
|
||||
Reference in New Issue
Block a user