Revert D36052198: Partially remove dependency on folly async

Differential Revision:
D36052198 (ade685c621)

Original commit changeset: 170d64a324a1

Original Phabricator Diff: D36052198 (ade685c621)

fbshipit-source-id: 69d2b18e70a6267667432d6ed9dc1c5bc545b417
This commit is contained in:
Billy Ng
2022-05-12 18:47:41 -07:00
committed by Facebook GitHub Bot
parent ade685c621
commit 3804ccf898
18 changed files with 116 additions and 132 deletions

View File

@@ -13,7 +13,6 @@
#include <fb/fbjni.h> #include <fb/fbjni.h>
#endif #endif
#include <folly/futures/Future.h>
#include <folly/io/async/AsyncSocketException.h> #include <folly/io/async/AsyncSocketException.h>
#include <folly/io/async/EventBase.h> #include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseManager.h> #include <folly/io/async/EventBaseManager.h>
@@ -24,7 +23,6 @@
#include <Flipper/FlipperClient.h> #include <Flipper/FlipperClient.h>
#include <Flipper/FlipperConnection.h> #include <Flipper/FlipperConnection.h>
#include <Flipper/FlipperConnectionManager.h> #include <Flipper/FlipperConnectionManager.h>
#include <Flipper/FlipperFollyScheduler.h>
#include <Flipper/FlipperResponder.h> #include <Flipper/FlipperResponder.h>
#include <Flipper/FlipperSocket.h> #include <Flipper/FlipperSocket.h>
#include <Flipper/FlipperSocketProvider.h> #include <Flipper/FlipperSocketProvider.h>
@@ -44,9 +42,6 @@ void handleException(const std::exception& e) {
__android_log_write(ANDROID_LOG_ERROR, "FLIPPER", message.c_str()); __android_log_write(ANDROID_LOG_ERROR, "FLIPPER", message.c_str());
} }
std::unique_ptr<facebook::flipper::Scheduler> sonarScheduler;
std::unique_ptr<facebook::flipper::Scheduler> connectionScheduler;
class JEventBase : public jni::HybridClass<JEventBase> { class JEventBase : public jni::HybridClass<JEventBase> {
public: public:
constexpr static auto kJavaDescriptor = constexpr static auto kJavaDescriptor =
@@ -417,7 +412,7 @@ class JFlipperSocketProvider : public facebook::flipper::FlipperSocketProvider {
virtual std::unique_ptr<facebook::flipper::FlipperSocket> create( virtual std::unique_ptr<facebook::flipper::FlipperSocket> create(
facebook::flipper::FlipperConnectionEndpoint endpoint, facebook::flipper::FlipperConnectionEndpoint endpoint,
std::unique_ptr<facebook::flipper::FlipperSocketBasePayload> payload, std::unique_ptr<facebook::flipper::FlipperSocketBasePayload> payload,
facebook::flipper::Scheduler* scheduler) override { folly::EventBase* eventBase) override {
return std::make_unique<JFlipperWebSocket>( return std::make_unique<JFlipperWebSocket>(
std::move(endpoint), std::move(payload)); std::move(endpoint), std::move(payload));
; ;
@@ -425,7 +420,7 @@ class JFlipperSocketProvider : public facebook::flipper::FlipperSocketProvider {
virtual std::unique_ptr<facebook::flipper::FlipperSocket> create( virtual std::unique_ptr<facebook::flipper::FlipperSocket> create(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
facebook::flipper::Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore) override { ConnectionContextStore* connectionContextStore) override {
return std::make_unique<JFlipperWebSocket>( return std::make_unique<JFlipperWebSocket>(
std::move(endpoint), std::move(payload), connectionContextStore); std::move(endpoint), std::move(payload), connectionContextStore);
@@ -944,11 +939,6 @@ class JFlipperClient : public jni::HybridClass<JFlipperClient> {
const std::string app, const std::string app,
const std::string appId, const std::string appId,
const std::string privateAppDirectory) { const std::string privateAppDirectory) {
sonarScheduler =
std::make_unique<FollyScheduler>(callbackWorker->eventBase());
connectionScheduler =
std::make_unique<FollyScheduler>(connectionWorker->eventBase());
FlipperClient::init( FlipperClient::init(
{{std::move(host), {{std::move(host),
std::move(os), std::move(os),
@@ -957,8 +947,8 @@ class JFlipperClient : public jni::HybridClass<JFlipperClient> {
std::move(app), std::move(app),
std::move(appId), std::move(appId),
std::move(privateAppDirectory)}, std::move(privateAppDirectory)},
sonarScheduler.get(), callbackWorker->eventBase(),
connectionScheduler.get(), connectionWorker->eventBase(),
insecurePort, insecurePort,
securePort, securePort,
altInsecurePort, altInsecurePort,

View File

@@ -10,8 +10,9 @@
#import "FlipperClient.h" #import "FlipperClient.h"
#import <Flipper/FlipperCertificateProvider.h> #import <Flipper/FlipperCertificateProvider.h>
#import <Flipper/FlipperClient.h> #import <Flipper/FlipperClient.h>
#import <Flipper/FlipperFollyScopedThreadScheduler.h>
#import <Flipper/FlipperSocketProvider.h> #import <Flipper/FlipperSocketProvider.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <memory> #include <memory>
#import "FlipperClient+Testing.h" #import "FlipperClient+Testing.h"
#import "FlipperCppWrapperPlugin.h" #import "FlipperCppWrapperPlugin.h"
@@ -31,9 +32,8 @@ using WrapperPlugin = facebook::flipper::FlipperCppWrapperPlugin;
@implementation FlipperClient { @implementation FlipperClient {
facebook::flipper::FlipperClient* _cppClient; facebook::flipper::FlipperClient* _cppClient;
std::unique_ptr<facebook::flipper::Scheduler> sonarScheduler; folly::ScopedEventBaseThread sonarThread;
std::unique_ptr<facebook::flipper::Scheduler> connectionScheduler; folly::ScopedEventBaseThread connectionThread;
id<FlipperKitCertificateProvider> _certProvider; id<FlipperKitCertificateProvider> _certProvider;
#if !TARGET_OS_OSX && !TARGET_OS_SIMULATOR && !TARGET_OS_MACCATALYST #if !TARGET_OS_OSX && !TARGET_OS_SIMULATOR && !TARGET_OS_MACCATALYST
FKPortForwardingServer* _secureServer; FKPortForwardingServer* _secureServer;
@@ -90,10 +90,6 @@ using WrapperPlugin = facebook::flipper::FlipperCppWrapperPlugin;
deviceName = [[NSHost currentHost] localizedName]; deviceName = [[NSHost currentHost] localizedName];
#endif #endif
sonarScheduler =
std::make_unique<facebook::flipper::FollyScopedThreadScheduler>();
connectionScheduler =
std::make_unique<facebook::flipper::FollyScopedThreadScheduler>();
static const std::string UNKNOWN = std::string("unknown"); static const std::string UNKNOWN = std::string("unknown");
try { try {
facebook::flipper::FlipperClient::init( facebook::flipper::FlipperClient::init(
@@ -106,8 +102,8 @@ using WrapperPlugin = facebook::flipper::FlipperCppWrapperPlugin;
[appId UTF8String] ?: UNKNOWN, [appId UTF8String] ?: UNKNOWN,
[privateAppDirectory UTF8String], [privateAppDirectory UTF8String],
}, },
sonarScheduler.get(), sonarThread.getEventBase(),
connectionScheduler.get(), connectionThread.getEventBase(),
[SKEnvironmentVariables getInsecurePort], [SKEnvironmentVariables getInsecurePort],
[SKEnvironmentVariables getSecurePort], [SKEnvironmentVariables getSecurePort],
[SKEnvironmentVariables getAltInsecurePort], [SKEnvironmentVariables getAltInsecurePort],

View File

@@ -9,11 +9,11 @@
#pragma once #pragma once
#import <Flipper/FlipperScheduler.h>
#import <Flipper/FlipperSocket.h> #import <Flipper/FlipperSocket.h>
#import <Flipper/FlipperSocketProvider.h> #import <Flipper/FlipperSocketProvider.h>
#import <Flipper/FlipperTransportTypes.h> #import <Flipper/FlipperTransportTypes.h>
#import <folly/dynamic.h> #import <folly/dynamic.h>
#import <folly/io/async/EventBase.h>
#import <future> #import <future>
#import <memory> #import <memory>
@@ -67,14 +67,14 @@ class FlipperWebSocketProvider : public FlipperSocketProvider {
virtual std::unique_ptr<FlipperSocket> create( virtual std::unique_ptr<FlipperSocket> create(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler) override { folly::EventBase* eventBase) override {
return std::make_unique<FlipperWebSocket>( return std::make_unique<FlipperWebSocket>(
std::move(endpoint), std::move(payload)); std::move(endpoint), std::move(payload));
} }
virtual std::unique_ptr<FlipperSocket> create( virtual std::unique_ptr<FlipperSocket> create(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore) override { ConnectionContextStore* connectionContextStore) override {
return std::make_unique<FlipperWebSocket>( return std::make_unique<FlipperWebSocket>(
std::move(endpoint), std::move(payload), connectionContextStore); std::move(endpoint), std::move(payload), connectionContextStore);

View File

@@ -8,7 +8,6 @@
#include "FlipperClient.h" #include "FlipperClient.h"
#include <fstream> #include <fstream>
#include <iostream> #include <iostream>
#include <sstream>
#include <stdexcept> #include <stdexcept>
#include <vector> #include <vector>
#include "ConnectionContextStore.h" #include "ConnectionContextStore.h"

View File

@@ -8,7 +8,6 @@
#pragma once #pragma once
#include <map> #include <map>
#include <memory>
#include <mutex> #include <mutex>
#include <vector> #include <vector>
#include "FlipperCertificateProvider.h" #include "FlipperCertificateProvider.h"

View File

@@ -95,8 +95,8 @@ FlipperConnectionManagerImpl::FlipperConnectionManagerImpl(
securePort(config.securePort), securePort(config.securePort),
altInsecurePort(config.altInsecurePort), altInsecurePort(config.altInsecurePort),
altSecurePort(config.altSecurePort), altSecurePort(config.altSecurePort),
flipperScheduler_(config.callbackWorker), flipperEventBase_(config.callbackWorker),
connectionScheduler_(config.connectionWorker), connectionEventBase_(config.connectionWorker),
contextStore_(contextStore), contextStore_(contextStore),
implWrapper_(std::make_shared<FlipperConnectionManagerWrapper>(this)) { implWrapper_(std::make_shared<FlipperConnectionManagerWrapper>(this)) {
CHECK_THROW(config.callbackWorker, std::invalid_argument); CHECK_THROW(config.callbackWorker, std::invalid_argument);
@@ -131,7 +131,10 @@ void FlipperConnectionManagerImpl::start() {
auto step = flipperState_->start("Start connection thread"); auto step = flipperState_->start("Start connection thread");
flipperScheduler_->schedule([this, step]() { folly::makeFuture()
.via(flipperEventBase_->getEventBase())
.delayed(std::chrono::milliseconds(0))
.thenValue([this, step](auto&&) {
step->complete(); step->complete();
startSync(); startSync();
}); });
@@ -207,7 +210,7 @@ bool FlipperConnectionManagerImpl::connectAndExchangeCertificate() {
payload->medium = medium; payload->medium = medium;
auto newClient = FlipperSocketProvider::socketCreate( auto newClient = FlipperSocketProvider::socketCreate(
endpoint, std::move(payload), flipperScheduler_); endpoint, std::move(payload), connectionEventBase_);
newClient->setEventHandler(ConnectionEvents(implWrapper_)); newClient->setEventHandler(ConnectionEvents(implWrapper_));
auto connectingInsecurely = flipperState_->start("Connect insecurely"); auto connectingInsecurely = flipperState_->start("Connect insecurely");
@@ -256,7 +259,7 @@ bool FlipperConnectionManagerImpl::connectSecurely() {
payload->csr_path = contextStore_->getCertificateDirectoryPath().c_str(); payload->csr_path = contextStore_->getCertificateDirectoryPath().c_str();
auto newClient = FlipperSocketProvider::socketCreate( auto newClient = FlipperSocketProvider::socketCreate(
endpoint, std::move(payload), connectionScheduler_, contextStore_.get()); endpoint, std::move(payload), connectionEventBase_, contextStore_.get());
newClient->setEventHandler(ConnectionEvents(implWrapper_)); newClient->setEventHandler(ConnectionEvents(implWrapper_));
newClient->setMessageHandler([this](const std::string& msg) { newClient->setMessageHandler([this](const std::string& msg) {
std::unique_ptr<FireAndForgetBasedFlipperResponder> responder; std::unique_ptr<FireAndForgetBasedFlipperResponder> responder;
@@ -291,8 +294,10 @@ void FlipperConnectionManagerImpl::reconnect() {
log("Not started"); log("Not started");
return; return;
} }
flipperScheduler_->scheduleAfter( folly::makeFuture()
[this]() { startSync(); }, reconnectIntervalSeconds * 1000.0f); .via(flipperEventBase_->getEventBase())
.delayed(std::chrono::seconds(reconnectIntervalSeconds))
.thenValue([this](auto&&) { startSync(); });
} }
void FlipperConnectionManagerImpl::stop() { void FlipperConnectionManagerImpl::stop() {
@@ -320,7 +325,7 @@ void FlipperConnectionManagerImpl::setCallbacks(Callbacks* callbacks) {
} }
void FlipperConnectionManagerImpl::sendMessage(const folly::dynamic& message) { void FlipperConnectionManagerImpl::sendMessage(const folly::dynamic& message) {
flipperScheduler_->schedule([this, message]() { flipperEventBase_->add([this, message]() {
try { try {
if (client_) { if (client_) {
client_->send(message, []() {}); client_->send(message, []() {});
@@ -452,11 +457,11 @@ void FlipperConnectionManagerImpl::requestSignedCertificate() {
auto gettingCert = flipperState_->start("Getting cert from desktop"); auto gettingCert = flipperState_->start("Getting cert from desktop");
certificateExchangeCompleted_ = false; certificateExchangeCompleted_ = false;
flipperScheduler_->schedule([this, message, gettingCert]() { flipperEventBase_->add([this, message, gettingCert]() {
client_->sendExpectResponse( client_->sendExpectResponse(
folly::toJson(message), folly::toJson(message),
[this, gettingCert](const std::string& response, bool isError) { [this, gettingCert](const std::string& response, bool isError) {
flipperScheduler_->schedule([this, gettingCert, response, isError]() { flipperEventBase_->add([this, gettingCert, response, isError]() {
this->processSignedCertificateResponse( this->processSignedCertificateResponse(
gettingCert, response, isError); gettingCert, response, isError);
}); });
@@ -466,7 +471,7 @@ void FlipperConnectionManagerImpl::requestSignedCertificate() {
} }
bool FlipperConnectionManagerImpl::isRunningInOwnThread() { bool FlipperConnectionManagerImpl::isRunningInOwnThread() {
return flipperScheduler_->isRunningInOwnThread(); return flipperEventBase_->isInEventBaseThread();
} }
} // namespace flipper } // namespace flipper

View File

@@ -7,10 +7,11 @@
#pragma once #pragma once
#include <folly/Executor.h>
#include <folly/io/async/EventBase.h>
#include <mutex> #include <mutex>
#include "FlipperConnectionManager.h" #include "FlipperConnectionManager.h"
#include "FlipperInitConfig.h" #include "FlipperInitConfig.h"
#include "FlipperScheduler.h"
#include "FlipperSocket.h" #include "FlipperSocket.h"
#include "FlipperState.h" #include "FlipperState.h"
@@ -63,8 +64,8 @@ class FlipperConnectionManagerImpl : public FlipperConnectionManager {
int altInsecurePort; int altInsecurePort;
int altSecurePort; int altSecurePort;
Scheduler* flipperScheduler_; folly::EventBase* flipperEventBase_;
Scheduler* connectionScheduler_; folly::EventBase* connectionEventBase_;
std::unique_ptr<FlipperSocket> client_; std::unique_ptr<FlipperSocket> client_;

View File

@@ -7,8 +7,8 @@
#pragma once #pragma once
#include <folly/io/async/EventBase.h>
#include <map> #include <map>
#include "FlipperScheduler.h"
namespace facebook { namespace facebook {
namespace flipper { namespace flipper {
@@ -30,14 +30,14 @@ struct FlipperInitConfig {
DeviceData deviceData; DeviceData deviceData;
/** /**
Scheduler on which client callbacks should be called. EventBase on which client callbacks should be called.
*/ */
Scheduler* callbackWorker; folly::EventBase* callbackWorker;
/** /**
Scheduler to be used to maintain the network connection. EventBase to be used to maintain the network connection.
*/ */
Scheduler* connectionWorker; folly::EventBase* connectionWorker;
int insecurePort = 9089; int insecurePort = 9089;
int securePort = 9088; int securePort = 9088;

View File

@@ -21,19 +21,19 @@ std::unique_ptr<FlipperSocketProvider> FlipperSocketProvider::shelvedProvider_ =
std::unique_ptr<FlipperSocket> FlipperSocketProvider::socketCreate( std::unique_ptr<FlipperSocket> FlipperSocketProvider::socketCreate(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler) { folly::EventBase* eventBase) {
return provider_->create(std::move(endpoint), std::move(payload), scheduler); return provider_->create(std::move(endpoint), std::move(payload), eventBase);
} }
std::unique_ptr<FlipperSocket> FlipperSocketProvider::socketCreate( std::unique_ptr<FlipperSocket> FlipperSocketProvider::socketCreate(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore) { ConnectionContextStore* connectionContextStore) {
return provider_->create( return provider_->create(
std::move(endpoint), std::move(endpoint),
std::move(payload), std::move(payload),
scheduler, eventBase,
connectionContextStore); connectionContextStore);
} }

View File

@@ -7,8 +7,8 @@
#pragma once #pragma once
#include <folly/io/async/EventBase.h>
#include <memory> #include <memory>
#include "FlipperScheduler.h"
namespace facebook { namespace facebook {
namespace flipper { namespace flipper {
@@ -32,37 +32,35 @@ class FlipperSocketProvider {
@param endpoint Endpoint to connect to. @param endpoint Endpoint to connect to.
@param payload Any configuration payload to establish a connection with @param payload Any configuration payload to establish a connection with
the specified endpoint. the specified endpoint.
@param scheduler An scheduler used to schedule and execute connection @param eventBase A folly event base used to execute connection operations.
operations.
*/ */
virtual std::unique_ptr<FlipperSocket> create( virtual std::unique_ptr<FlipperSocket> create(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler) = 0; folly::EventBase* eventBase) = 0;
/** /**
Create an instance of FlipperSocket. Create an instance of FlipperSocket.
@param endpoint Endpoint to connect to. @param endpoint Endpoint to connect to.
@param payload Any configuration payload to establish a connection with @param payload Any configuration payload to establish a connection with
the specified endpoint. the specified endpoint.
@param scheduler An scheduler used to schedule and execute connection @param eventBase A folly event base used to execute connection operations.
operations.
@param connectionContextStore A connection context store used for obtaining @param connectionContextStore A connection context store used for obtaining
the certificate used for secure connections. the certificate used for secure connections.
*/ */
virtual std::unique_ptr<FlipperSocket> create( virtual std::unique_ptr<FlipperSocket> create(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore) = 0; ConnectionContextStore* connectionContextStore) = 0;
static std::unique_ptr<FlipperSocket> socketCreate( static std::unique_ptr<FlipperSocket> socketCreate(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler); folly::EventBase* eventBase);
static std::unique_ptr<FlipperSocket> socketCreate( static std::unique_ptr<FlipperSocket> socketCreate(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore); ConnectionContextStore* connectionContextStore);
static void setDefaultProvider( static void setDefaultProvider(

View File

@@ -6,11 +6,10 @@
*/ */
#include <Flipper/FlipperConnectionManagerImpl.h> #include <Flipper/FlipperConnectionManagerImpl.h>
#include <Flipper/FlipperFollyScheduler.h>
#include <FlipperTestLib/ConnectionContextStoreMock.h> #include <FlipperTestLib/ConnectionContextStoreMock.h>
#include <folly/Singleton.h> #include <folly/Singleton.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <memory>
namespace facebook { namespace facebook {
namespace flipper { namespace flipper {
@@ -22,16 +21,12 @@ class FlipperConnectionManagerImplTerminationTest : public ::testing::Test {
protected: protected:
std::shared_ptr<FlipperState> state; std::shared_ptr<FlipperState> state;
std::shared_ptr<ConnectionContextStore> contextStore; std::shared_ptr<ConnectionContextStore> contextStore;
std::unique_ptr<Scheduler> sonarScheduler;
std::unique_ptr<Scheduler> connectionScheduler;
void SetUp() override { void SetUp() override {
// Folly singletons must be registered before they are used. // Folly singletons must be registered before they are used.
// Without this, test fails in phabricator. // Without this, test fails in phabricator.
folly::SingletonVault::singleton()->registrationComplete(); folly::SingletonVault::singleton()->registrationComplete();
state = std::make_shared<FlipperState>(); state = std::make_shared<FlipperState>();
contextStore = std::make_shared<ConnectionContextStoreMock>(); contextStore = std::make_shared<ConnectionContextStoreMock>();
sonarScheduler = std::make_unique<FollyScheduler>(new EventBase());
connectionScheduler = std::make_unique<FollyScheduler>(new EventBase());
} }
}; };
@@ -40,7 +35,7 @@ TEST_F(
testNullEventBaseGetsRejected) { testNullEventBaseGetsRejected) {
try { try {
auto instance = std::make_shared<FlipperConnectionManagerImpl>( auto instance = std::make_shared<FlipperConnectionManagerImpl>(
FlipperInitConfig{DeviceData{}, nullptr, connectionScheduler.get()}, FlipperInitConfig{DeviceData{}, nullptr, new EventBase()},
state, state,
contextStore); contextStore);
FAIL(); FAIL();
@@ -49,7 +44,7 @@ TEST_F(
} }
try { try {
auto instance = std::make_shared<FlipperConnectionManagerImpl>( auto instance = std::make_shared<FlipperConnectionManagerImpl>(
FlipperInitConfig{DeviceData{}, sonarScheduler.get(), nullptr}, FlipperInitConfig{DeviceData{}, new EventBase(), nullptr},
state, state,
contextStore); contextStore);
FAIL(); FAIL();
@@ -61,8 +56,8 @@ TEST_F(
TEST_F( TEST_F(
FlipperConnectionManagerImplTerminationTest, FlipperConnectionManagerImplTerminationTest,
testNonStartedEventBaseDoesntHang) { testNonStartedEventBaseDoesntHang) {
auto config = FlipperInitConfig{ auto config =
DeviceData{}, sonarScheduler.get(), connectionScheduler.get()}; FlipperInitConfig{DeviceData{}, new EventBase(), new EventBase()};
auto instance = std::make_shared<FlipperConnectionManagerImpl>( auto instance = std::make_shared<FlipperConnectionManagerImpl>(
config, state, contextStore); config, state, contextStore);
instance->start(); instance->start();
@@ -77,11 +72,8 @@ TEST_F(
std::thread([flipperEventBase]() { flipperEventBase->loopForever(); }); std::thread([flipperEventBase]() { flipperEventBase->loopForever(); });
auto connectionThread = std::thread( auto connectionThread = std::thread(
[connectionEventBase]() { connectionEventBase->loopForever(); }); [connectionEventBase]() { connectionEventBase->loopForever(); });
auto localSonarScheduler = std::make_unique<FollyScheduler>(flipperEventBase); auto config =
auto localConnectionScheduler = FlipperInitConfig{DeviceData{}, flipperEventBase, connectionEventBase};
std::make_unique<FollyScheduler>(connectionEventBase);
auto config = FlipperInitConfig{
DeviceData{}, localSonarScheduler.get(), localConnectionScheduler.get()};
auto instance = std::make_shared<FlipperConnectionManagerImpl>( auto instance = std::make_shared<FlipperConnectionManagerImpl>(
config, state, contextStore); config, state, contextStore);

View File

@@ -9,11 +9,11 @@
#pragma once #pragma once
#include <Flipper/FlipperScheduler.h>
#include <Flipper/FlipperSocket.h> #include <Flipper/FlipperSocket.h>
#include <Flipper/FlipperSocketProvider.h> #include <Flipper/FlipperSocketProvider.h>
#include <Flipper/FlipperTransportTypes.h> #include <Flipper/FlipperTransportTypes.h>
#include <folly/dynamic.h> #include <folly/dynamic.h>
#include <folly/io/async/EventBase.h>
#include <future> #include <future>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@@ -37,18 +37,18 @@ class BaseClient {
BaseClient( BaseClient(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler) folly::EventBase* eventBase)
: endpoint_(std::move(endpoint)), : endpoint_(std::move(endpoint)),
payload_(std::move(payload)), payload_(std::move(payload)),
scheduler_(scheduler) {} eventBase_(eventBase) {}
BaseClient( BaseClient(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore) ConnectionContextStore* connectionContextStore)
: endpoint_(std::move(endpoint)), : endpoint_(std::move(endpoint)),
payload_(std::move(payload)), payload_(std::move(payload)),
scheduler_(scheduler), eventBase_(eventBase),
connectionContextStore_(connectionContextStore) {} connectionContextStore_(connectionContextStore) {}
BaseClient(const BaseClient&) = delete; BaseClient(const BaseClient&) = delete;
@@ -84,7 +84,7 @@ class BaseClient {
protected: protected:
FlipperConnectionEndpoint endpoint_; FlipperConnectionEndpoint endpoint_;
std::unique_ptr<FlipperSocketBasePayload> payload_; std::unique_ptr<FlipperSocketBasePayload> payload_;
Scheduler* scheduler_; folly::EventBase* eventBase_;
ConnectionContextStore* connectionContextStore_; ConnectionContextStore* connectionContextStore_;
SocketEventHandler eventHandler_; SocketEventHandler eventHandler_;

View File

@@ -14,6 +14,8 @@
#include <Flipper/Log.h> #include <Flipper/Log.h>
#include <folly/String.h> #include <folly/String.h>
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
#include <folly/io/async/AsyncSocketException.h>
#include <folly/io/async/SSLContext.h>
#include <folly/json.h> #include <folly/json.h>
#include <cctype> #include <cctype>
#include <iomanip> #include <iomanip>
@@ -30,24 +32,24 @@ namespace flipper {
FlipperWebSocket::FlipperWebSocket( FlipperWebSocket::FlipperWebSocket(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler) folly::EventBase* eventBase)
: FlipperWebSocket( : FlipperWebSocket(
std::move(endpoint), std::move(endpoint),
std::move(payload), std::move(payload),
scheduler, eventBase,
nullptr) {} nullptr) {}
FlipperWebSocket::FlipperWebSocket( FlipperWebSocket::FlipperWebSocket(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore) { ConnectionContextStore* connectionContextStore) {
if (endpoint.secure) { if (endpoint.secure) {
socket_ = std::make_unique<WebSocketTLSClient>( socket_ = std::make_unique<WebSocketTLSClient>(
endpoint, std::move(payload), scheduler, connectionContextStore); endpoint, std::move(payload), eventBase, connectionContextStore);
} else { } else {
socket_ = std::make_unique<WebSocketClient>( socket_ = std::make_unique<WebSocketClient>(
endpoint, std::move(payload), scheduler, connectionContextStore); endpoint, std::move(payload), eventBase, connectionContextStore);
} }
} }

View File

@@ -9,11 +9,11 @@
#pragma once #pragma once
#include <Flipper/FlipperScheduler.h>
#include <Flipper/FlipperSocket.h> #include <Flipper/FlipperSocket.h>
#include <Flipper/FlipperSocketProvider.h> #include <Flipper/FlipperSocketProvider.h>
#include <Flipper/FlipperTransportTypes.h> #include <Flipper/FlipperTransportTypes.h>
#include <folly/dynamic.h> #include <folly/dynamic.h>
#include <folly/io/async/EventBase.h>
#include <future> #include <future>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@@ -29,11 +29,11 @@ class FlipperWebSocket : public FlipperSocket {
FlipperWebSocket( FlipperWebSocket(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler); folly::EventBase* eventBase);
FlipperWebSocket( FlipperWebSocket(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore); ConnectionContextStore* connectionContextStore);
virtual ~FlipperWebSocket(); virtual ~FlipperWebSocket();
@@ -62,19 +62,19 @@ class FlipperWebSocketProvider : public FlipperSocketProvider {
virtual std::unique_ptr<FlipperSocket> create( virtual std::unique_ptr<FlipperSocket> create(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler) override { folly::EventBase* eventBase) override {
return std::make_unique<FlipperWebSocket>( return std::make_unique<FlipperWebSocket>(
std::move(endpoint), std::move(payload), scheduler); std::move(endpoint), std::move(payload), eventBase);
} }
virtual std::unique_ptr<FlipperSocket> create( virtual std::unique_ptr<FlipperSocket> create(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore) override { ConnectionContextStore* connectionContextStore) override {
return std::make_unique<FlipperWebSocket>( return std::make_unique<FlipperWebSocket>(
std::move(endpoint), std::move(endpoint),
std::move(payload), std::move(payload),
scheduler, eventBase,
connectionContextStore); connectionContextStore);
} }
}; };

View File

@@ -14,6 +14,7 @@
#include <Flipper/Log.h> #include <Flipper/Log.h>
#include <folly/String.h> #include <folly/String.h>
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
#include <folly/io/async/SSLContext.h>
#include <folly/json.h> #include <folly/json.h>
#include <websocketpp/common/memory.hpp> #include <websocketpp/common/memory.hpp>
#include <websocketpp/common/thread.hpp> #include <websocketpp/common/thread.hpp>
@@ -30,22 +31,22 @@ namespace flipper {
WebSocketClient::WebSocketClient( WebSocketClient::WebSocketClient(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler) folly::EventBase* eventBase)
: WebSocketClient( : WebSocketClient(
std::move(endpoint), std::move(endpoint),
std::move(payload), std::move(payload),
scheduler, eventBase,
nullptr) {} nullptr) {}
WebSocketClient::WebSocketClient( WebSocketClient::WebSocketClient(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore) ConnectionContextStore* connectionContextStore)
: BaseClient( : BaseClient(
std::move(endpoint), std::move(endpoint),
std::move(payload), std::move(payload),
scheduler, eventBase,
connectionContextStore) { connectionContextStore) {
status_ = Status::Unconnected; status_ = Status::Unconnected;
@@ -147,7 +148,7 @@ void WebSocketClient::disconnect() {
thread_->join(); thread_->join();
} }
thread_ = nullptr; thread_ = nullptr;
scheduler_->schedule( eventBase_->add(
[eventHandler = eventHandler_]() { eventHandler(SocketEvent::CLOSE); }); [eventHandler = eventHandler_]() { eventHandler(SocketEvent::CLOSE); });
} }
@@ -179,11 +180,11 @@ void WebSocketClient::send(
void WebSocketClient::sendExpectResponse( void WebSocketClient::sendExpectResponse(
const std::string& message, const std::string& message,
SocketSendExpectResponseHandler completion) { SocketSendExpectResponseHandler completion) {
connection_->set_message_handler([completion, scheduler = scheduler_]( connection_->set_message_handler(
websocketpp::connection_hdl hdl, [completion, eventBase = eventBase_](
SocketClient::message_ptr msg) { websocketpp::connection_hdl hdl, SocketClient::message_ptr msg) {
const std::string& payload = msg->get_payload(); const std::string& payload = msg->get_payload();
scheduler->schedule([completion, payload] { completion(payload, false); }); eventBase->add([completion, payload] { completion(payload, false); });
}); });
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
socket_.send( socket_.send(
@@ -204,7 +205,7 @@ void WebSocketClient::onOpen(SocketClient* c, websocketpp::connection_hdl hdl) {
} }
status_ = Status::Initializing; status_ = Status::Initializing;
scheduler_->schedule( eventBase_->add(
[eventHandler = eventHandler_]() { eventHandler(SocketEvent::OPEN); }); [eventHandler = eventHandler_]() { eventHandler(SocketEvent::OPEN); });
} }
@@ -214,7 +215,7 @@ void WebSocketClient::onMessage(
SocketClient::message_ptr msg) { SocketClient::message_ptr msg) {
const std::string& payload = msg->get_payload(); const std::string& payload = msg->get_payload();
if (messageHandler_) { if (messageHandler_) {
scheduler_->schedule([payload, messageHandler = messageHandler_]() { eventBase_->add([payload, messageHandler = messageHandler_]() {
messageHandler(payload); messageHandler(payload);
}); });
} }
@@ -229,7 +230,7 @@ void WebSocketClient::onFail(SocketClient* c, websocketpp::connection_hdl hdl) {
connected_.set_value(false); connected_.set_value(false);
} }
status_ = Status::Failed; status_ = Status::Failed;
scheduler_->schedule( eventBase_->add(
[eventHandler = eventHandler_]() { eventHandler(SocketEvent::ERROR); }); [eventHandler = eventHandler_]() { eventHandler(SocketEvent::ERROR); });
} }
@@ -237,7 +238,7 @@ void WebSocketClient::onClose(
SocketClient* c, SocketClient* c,
websocketpp::connection_hdl hdl) { websocketpp::connection_hdl hdl) {
status_ = Status::Closed; status_ = Status::Closed;
scheduler_->schedule( eventBase_->add(
[eventHandler = eventHandler_]() { eventHandler(SocketEvent::CLOSE); }); [eventHandler = eventHandler_]() { eventHandler(SocketEvent::CLOSE); });
} }

View File

@@ -9,11 +9,11 @@
#pragma once #pragma once
#include <Flipper/FlipperScheduler.h>
#include <Flipper/FlipperSocket.h> #include <Flipper/FlipperSocket.h>
#include <Flipper/FlipperSocketProvider.h> #include <Flipper/FlipperSocketProvider.h>
#include <Flipper/FlipperTransportTypes.h> #include <Flipper/FlipperTransportTypes.h>
#include <folly/dynamic.h> #include <folly/dynamic.h>
#include <folly/io/async/EventBase.h>
#include <future> #include <future>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@@ -35,11 +35,11 @@ class WebSocketClient : public BaseClient {
WebSocketClient( WebSocketClient(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler); folly::EventBase* eventBase);
WebSocketClient( WebSocketClient(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore); ConnectionContextStore* connectionContextStore);
WebSocketClient(const WebSocketClient&) = delete; WebSocketClient(const WebSocketClient&) = delete;

View File

@@ -33,22 +33,22 @@ namespace flipper {
WebSocketTLSClient::WebSocketTLSClient( WebSocketTLSClient::WebSocketTLSClient(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler) folly::EventBase* eventBase)
: WebSocketTLSClient( : WebSocketTLSClient(
std::move(endpoint), std::move(endpoint),
std::move(payload), std::move(payload),
scheduler, eventBase,
nullptr) {} nullptr) {}
WebSocketTLSClient::WebSocketTLSClient( WebSocketTLSClient::WebSocketTLSClient(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore) ConnectionContextStore* connectionContextStore)
: BaseClient( : BaseClient(
std::move(endpoint), std::move(endpoint),
std::move(payload), std::move(payload),
scheduler, eventBase,
connectionContextStore) { connectionContextStore) {
status_ = Status::Unconnected; status_ = Status::Unconnected;
@@ -157,7 +157,7 @@ void WebSocketTLSClient::disconnect() {
} }
thread_ = nullptr; thread_ = nullptr;
scheduler_->schedule( eventBase_->add(
[eventHandler = eventHandler_]() { eventHandler(SocketEvent::CLOSE); }); [eventHandler = eventHandler_]() { eventHandler(SocketEvent::CLOSE); });
} }
@@ -189,11 +189,11 @@ void WebSocketTLSClient::send(
void WebSocketTLSClient::sendExpectResponse( void WebSocketTLSClient::sendExpectResponse(
const std::string& message, const std::string& message,
SocketSendExpectResponseHandler completion) { SocketSendExpectResponseHandler completion) {
connection_->set_message_handler([completion, scheduler = scheduler_]( connection_->set_message_handler(
websocketpp::connection_hdl hdl, [completion, eventBase = eventBase_](
SocketTLSClient::message_ptr msg) { websocketpp::connection_hdl hdl, SocketTLSClient::message_ptr msg) {
const std::string& payload = msg->get_payload(); const std::string& payload = msg->get_payload();
scheduler->schedule([completion, payload] { completion(payload, false); }); eventBase->add([completion, payload] { completion(payload, false); });
}); });
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
socket_.send( socket_.send(
@@ -216,7 +216,7 @@ void WebSocketTLSClient::onOpen(
} }
status_ = Status::Initializing; status_ = Status::Initializing;
scheduler_->schedule( eventBase_->add(
[eventHandler = eventHandler_]() { eventHandler(SocketEvent::OPEN); }); [eventHandler = eventHandler_]() { eventHandler(SocketEvent::OPEN); });
} }
@@ -226,7 +226,7 @@ void WebSocketTLSClient::onMessage(
SocketTLSClient::message_ptr msg) { SocketTLSClient::message_ptr msg) {
const std::string& payload = msg->get_payload(); const std::string& payload = msg->get_payload();
if (messageHandler_) { if (messageHandler_) {
scheduler_->schedule([payload, messageHandler = messageHandler_]() { eventBase_->add([payload, messageHandler = messageHandler_]() {
messageHandler(payload); messageHandler(payload);
}); });
} }
@@ -257,7 +257,7 @@ void WebSocketTLSClient::onFail(
} }
} }
status_ = Status::Failed; status_ = Status::Failed;
scheduler_->schedule([eventHandler = eventHandler_, sslError]() { eventBase_->add([eventHandler = eventHandler_, sslError]() {
if (sslError) { if (sslError) {
eventHandler(SocketEvent::SSL_ERROR); eventHandler(SocketEvent::SSL_ERROR);
} else { } else {
@@ -270,7 +270,7 @@ void WebSocketTLSClient::onClose(
SocketTLSClient* c, SocketTLSClient* c,
websocketpp::connection_hdl hdl) { websocketpp::connection_hdl hdl) {
status_ = Status::Closed; status_ = Status::Closed;
scheduler_->schedule( eventBase_->add(
[eventHandler = eventHandler_]() { eventHandler(SocketEvent::CLOSE); }); [eventHandler = eventHandler_]() { eventHandler(SocketEvent::CLOSE); });
} }

View File

@@ -13,6 +13,7 @@
#include <Flipper/FlipperSocketProvider.h> #include <Flipper/FlipperSocketProvider.h>
#include <Flipper/FlipperTransportTypes.h> #include <Flipper/FlipperTransportTypes.h>
#include <folly/dynamic.h> #include <folly/dynamic.h>
#include <folly/io/async/EventBase.h>
#include <future> #include <future>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@@ -38,11 +39,11 @@ class WebSocketTLSClient : public BaseClient {
WebSocketTLSClient( WebSocketTLSClient(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler); folly::EventBase* eventBase);
WebSocketTLSClient( WebSocketTLSClient(
FlipperConnectionEndpoint endpoint, FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload, std::unique_ptr<FlipperSocketBasePayload> payload,
Scheduler* scheduler, folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore); ConnectionContextStore* connectionContextStore);
WebSocketTLSClient(const WebSocketTLSClient&) = delete; WebSocketTLSClient(const WebSocketTLSClient&) = delete;