Remove RSocket

Summary:
^
Changelog: Remove rsocket references from xplat

Reviewed By: aigoncharov

Differential Revision: D34418118

fbshipit-source-id: bd49b9da119e3a2a1ce396d14e0dca73e1b9c692
This commit is contained in:
Lorenzo Blasa
2022-02-24 23:56:22 -08:00
committed by Facebook GitHub Bot
parent c15605afd3
commit c4f80a826e
12 changed files with 10 additions and 667 deletions

View File

@@ -14,7 +14,6 @@
#include "FireAndForgetBasedFlipperResponder.h"
#include "FlipperConnectionImpl.h"
#include "FlipperConnectionManagerImpl.h"
#include "FlipperResponderImpl.h"
#include "FlipperState.h"
#include "FlipperStep.h"
#include "Log.h"

View File

@@ -15,11 +15,9 @@
#include <thread>
#include "ConnectionContextStore.h"
#include "FireAndForgetBasedFlipperResponder.h"
#include "FlipperResponderImpl.h"
#include "FlipperSocketProvider.h"
#include "FlipperStep.h"
#include "Log.h"
#include "yarpl/Single.h"
#define WRONG_THREAD_EXIT_MSG \
"ERROR: Aborting flipper initialization because it's not running in the flipper thread."
@@ -120,6 +118,11 @@ FlipperConnectionManagerImpl::getCertificateProvider() {
};
void FlipperConnectionManagerImpl::start() {
if (!FlipperSocketProvider::hasProvider()) {
log("No socket provider has been set, unable to start");
return;
}
if (isStarted_) {
log("Already started");
return;
@@ -263,10 +266,6 @@ bool FlipperConnectionManagerImpl::connectSecurely() {
auto newClient = FlipperSocketProvider::socketCreate(
endpoint, std::move(payload), connectionEventBase_, contextStore_.get());
newClient->setEventHandler(ConnectionEvents(implWrapper_));
/**
Message handler is only ever used for WebSocket connections. RSocket uses a
different approach whereas a responder is used instead.
*/
newClient->setMessageHandler([this](const std::string& msg) {
std::unique_ptr<FireAndForgetBasedFlipperResponder> responder;
auto message = folly::parseJson(msg);

View File

@@ -1,217 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "FlipperRSocket.h"
#include <folly/String.h>
#include <folly/futures/Future.h>
#include <folly/io/async/AsyncSocketException.h>
#include <folly/io/async/SSLContext.h>
#include <folly/json.h>
#include <rsocket/Payload.h>
#include <rsocket/RSocket.h>
#include <rsocket/transports/tcp/TcpConnectionFactory.h>
#include <stdexcept>
#include <string>
#include <thread>
#include "ConnectionContextStore.h"
#include "FireAndForgetBasedFlipperResponder.h"
#include "FlipperRSocketResponder.h"
#include "FlipperResponderImpl.h"
#include "FlipperTransportTypes.h"
#include "Log.h"
#include "yarpl/Single.h"
static constexpr int connectionKeepaliveSeconds = 10;
static constexpr int maxPayloadSize = 0xFFFFFF;
namespace facebook {
namespace flipper {
rsocket::Payload toRSocketPayload(folly::dynamic data);
class RSocketEvents : public rsocket::RSocketConnectionEvents {
private:
const SocketEventHandler handler_;
public:
RSocketEvents(const SocketEventHandler eventHandler)
: handler_(std::move(eventHandler)) {}
void onConnected() {
handler_(SocketEvent::OPEN);
}
void onDisconnected(const folly::exception_wrapper&) {
handler_(SocketEvent::CLOSE);
}
void onClosed(const folly::exception_wrapper& e) {
handler_(SocketEvent::CLOSE);
}
};
class RSocketSerializer : public FlipperPayloadSerializer {
public:
void put(std::string key, std::string value) override {
object_[key] = value;
}
void put(std::string key, int value) override {
object_[key] = value;
}
std::string serialize() override {
return folly::toJson(object_);
}
~RSocketSerializer() {}
private:
folly::dynamic object_ = folly::dynamic::object();
};
rsocket::Payload toRSocketPayload(folly::dynamic data) {
std::string json = folly::toJson(data);
rsocket::Payload payload = rsocket::Payload(json);
auto payloadLength = payload.data->computeChainDataLength();
if (payloadLength > maxPayloadSize) {
auto logMessage =
std::string(
"Error: Skipping sending message larger than max rsocket payload: ") +
json.substr(0, 100) + "...";
log(logMessage);
throw std::length_error(logMessage);
}
return payload;
}
FlipperRSocket::FlipperRSocket(
FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload,
folly::EventBase* eventBase)
: endpoint_(std::move(endpoint)),
payload_(std::move(payload)),
eventBase_(eventBase) {}
FlipperRSocket::FlipperRSocket(
FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload,
folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore)
: endpoint_(std::move(endpoint)),
payload_(std::move(payload)),
eventBase_(eventBase),
connectionContextStore_(connectionContextStore) {}
void FlipperRSocket::setEventHandler(SocketEventHandler eventHandler) {
eventHandler_ = std::move(eventHandler);
}
void FlipperRSocket::setMessageHandler(SocketMessageHandler messageHandler) {
messageHandler_ = std::move(messageHandler);
}
bool FlipperRSocket::connect(FlipperConnectionManager* manager) {
folly::SocketAddress address;
address.setFromHostPort(endpoint_.host, endpoint_.port);
auto serializer = RSocketSerializer{};
payload_->serialize(serializer);
auto payload = serializer.serialize();
rsocket::SetupParameters parameters;
parameters.payload = rsocket::Payload(payload);
std::unique_ptr<rsocket::TcpConnectionFactory> tcpConnectionFactory = nullptr;
if (endpoint_.secure) {
tcpConnectionFactory = std::make_unique<rsocket::TcpConnectionFactory>(
*eventBase_->getEventBase(),
std::move(address),
connectionContextStore_->getSSLContext());
} else {
tcpConnectionFactory = std::make_unique<rsocket::TcpConnectionFactory>(
*eventBase_->getEventBase(), std::move(address));
}
auto newClient =
rsocket::RSocket::createConnectedClient(
std::move(tcpConnectionFactory),
std::move(parameters),
endpoint_.secure
? std::make_shared<FlipperRSocketResponder>(manager, eventBase_)
: nullptr,
std::chrono::seconds(connectionKeepaliveSeconds), // keepaliveInterval
nullptr, // stats
std::make_shared<RSocketEvents>(eventHandler_))
.thenError<folly::AsyncSocketException>([](const auto& e) {
if (e.getType() == folly::AsyncSocketException::NOT_OPEN ||
e.getType() == folly::AsyncSocketException::NETWORK_ERROR) {
// This is the state where no Flipper desktop client is connected.
// We don't want an exception thrown here.
return std::unique_ptr<rsocket::RSocketClient>(nullptr);
}
throw e;
})
.get();
if (newClient.get() == nullptr) {
return false;
}
client_ = std::move(newClient);
return true;
}
void FlipperRSocket::disconnect() {
if (client_.get() == nullptr)
return;
client_->disconnect();
}
void FlipperRSocket::send(
const folly::dynamic& message,
SocketSendHandler completion) {
if (client_.get() == nullptr)
return;
rsocket::Payload payload = toRSocketPayload(message);
client_->getRequester()
->fireAndForget(std::move(payload))
->subscribe(completion);
}
void FlipperRSocket::send(
const std::string& message,
SocketSendHandler completion) {
if (client_.get() == nullptr)
return;
client_->getRequester()
->fireAndForget(rsocket::Payload(message))
->subscribe(completion);
}
void FlipperRSocket::sendExpectResponse(
const std::string& message,
SocketSendExpectResponseHandler completion) {
if (client_.get() == nullptr)
return;
client_->getRequester()
->requestResponse(rsocket::Payload(message))
->subscribe(
[completion](rsocket::Payload payload) {
auto response = payload.moveDataToString();
completion(response, false);
},
[completion](folly::exception_wrapper e) {
e.handle(
[&](rsocket::ErrorWithPayload& errorWithPayload) {
auto error = errorWithPayload.payload.moveDataToString();
completion(error, true);
},
[e, completion](...) { completion(e.what().c_str(), true); });
});
}
} // namespace flipper
} // namespace facebook

View File

@@ -1,63 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/dynamic.h>
#include <rsocket/RSocket.h>
#include <future>
#include <memory>
#include "FlipperSocket.h"
#include "FlipperTransportTypes.h"
namespace facebook {
namespace flipper {
class FlipperConnectionManager;
class ConnectionContextStore;
class FlipperRSocket : public FlipperSocket {
public:
FlipperRSocket(
FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload,
folly::EventBase* eventBase);
FlipperRSocket(
FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload,
folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore);
virtual ~FlipperRSocket() {}
virtual void setEventHandler(SocketEventHandler eventHandler) override;
virtual void setMessageHandler(SocketMessageHandler messageHandler) override;
virtual bool connect(FlipperConnectionManager* manager) override;
virtual void disconnect() override;
virtual void send(const folly::dynamic& message, SocketSendHandler completion)
override;
virtual void send(const std::string& message, SocketSendHandler completion)
override;
virtual void sendExpectResponse(
const std::string& message,
SocketSendExpectResponseHandler completion) override;
private:
FlipperConnectionEndpoint endpoint_;
std::unique_ptr<FlipperSocketBasePayload> payload_;
folly::EventBase* eventBase_;
ConnectionContextStore* connectionContextStore_;
std::unique_ptr<rsocket::RSocketClient> client_;
SocketEventHandler eventHandler_;
SocketMessageHandler messageHandler_;
};
} // namespace flipper
} // namespace facebook

View File

@@ -1,80 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "FlipperRSocketResponder.h"
#include <folly/json.h>
#include <rsocket/RSocket.h>
#include "FireAndForgetBasedFlipperResponder.h"
#include "FlipperConnectionManagerImpl.h"
#include "FlipperResponderImpl.h"
#include "Log.h"
using folly::dynamic;
namespace facebook {
namespace flipper {
rsocket::Payload toRSocketPayload(dynamic data);
void FlipperRSocketResponder::handleFireAndForget(
rsocket::Payload request,
rsocket::StreamId streamId) {
const auto payload = request.moveDataToString();
std::unique_ptr<FireAndForgetBasedFlipperResponder> responder;
auto message = folly::parseJson(payload);
auto idItr = message.find("id");
if (idItr == message.items().end()) {
responder =
std::make_unique<FireAndForgetBasedFlipperResponder>(websocket_);
} else {
responder = std::make_unique<FireAndForgetBasedFlipperResponder>(
websocket_, idItr->second.getInt());
}
websocket_->onMessageReceived(
folly::parseJson(payload), std::move(responder));
}
std::shared_ptr<yarpl::single::Single<rsocket::Payload>>
FlipperRSocketResponder::handleRequestResponse(
rsocket::Payload request,
rsocket::StreamId streamId) {
const auto requestString = request.moveDataToString();
auto dynamicSingle = yarpl::single::Single<folly::dynamic>::create(
[payload = std::move(requestString), this](auto observer) {
auto responder = std::make_unique<FlipperResponderImpl>(observer);
websocket_->onMessageReceived(
folly::parseJson(payload), std::move(responder));
});
auto rsocketSingle = yarpl::single::Single<rsocket::Payload>::create(
[payload = std::move(requestString), dynamicSingle, this](auto observer) {
observer->onSubscribe(yarpl::single::SingleSubscriptions::empty());
dynamicSingle->subscribe(
[observer, this](folly::dynamic d) {
eventBase_->runInEventBaseThread([observer, d]() {
try {
observer->onSuccess(toRSocketPayload(d));
} catch (std::exception& e) {
log(e.what());
observer->onError(e);
}
});
},
[observer, this](folly::exception_wrapper e) {
eventBase_->runInEventBaseThread(
[observer, e]() { observer->onError(e); });
});
});
return rsocketSingle;
}
} // namespace flipper
} // namespace facebook

View File

@@ -1,35 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <rsocket/RSocketResponder.h>
namespace facebook {
namespace flipper {
class FlipperConnectionManager;
class FlipperRSocketResponder : public rsocket::RSocketResponder {
private:
FlipperConnectionManager* websocket_;
folly::EventBase* eventBase_;
public:
FlipperRSocketResponder(
FlipperConnectionManager* websocket,
folly::EventBase* eventBase)
: websocket_(websocket), eventBase_(eventBase){};
void handleFireAndForget(
rsocket::Payload request,
rsocket::StreamId streamId);
std::shared_ptr<yarpl::single::Single<rsocket::Payload>>
handleRequestResponse(rsocket::Payload request, rsocket::StreamId streamId);
};
} // namespace flipper
} // namespace facebook

View File

@@ -1,63 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/io/async/EventBase.h>
#include <folly/json.h>
#include <rsocket/RSocketResponder.h>
#include "FlipperConnectionManager.h"
#include "FlipperResponder.h"
#include "Log.h"
namespace facebook {
namespace flipper {
/* Responder to encapsulate yarpl observables and hide them from flipper core +
* plugins */
class FlipperResponderImpl : public FlipperResponder {
public:
FlipperResponderImpl(
std::shared_ptr<yarpl::single::SingleObserver<folly::dynamic>>
downstreamObserver)
: downstreamObserver_(downstreamObserver) {}
void success(const folly::dynamic& response) override {
const folly::dynamic message = folly::dynamic::object("success", response);
isCompleted = true;
downstreamObserver_->onSuccess(message);
}
void error(const folly::dynamic& response) override {
const folly::dynamic message = folly::dynamic::object("error", response);
isCompleted = true;
downstreamObserver_->onSuccess(message);
}
~FlipperResponderImpl() {
if (!isCompleted) {
try {
downstreamObserver_->onSuccess(
folly::dynamic::object("success", folly::dynamic::object()));
} catch (std::exception& e) {
log(std::string(
"Exception occurred when responding in FlipperResponder: ") +
e.what());
} catch (...) {
log("Exception occurred when responding in FlipperResponder");
}
}
}
private:
std::shared_ptr<yarpl::single::SingleObserver<folly::dynamic>>
downstreamObserver_;
bool isCompleted = false;
};
} // namespace flipper
} // namespace facebook

View File

@@ -28,11 +28,6 @@ class FlipperSocket {
virtual void setEventHandler(SocketEventHandler eventHandler) {}
/**
Sets the socket message handler. Used to handle received messages.
@discussion Message handler is only ever used for WebSocket connections.
RSocket uses a different approach whereas a responder is used instead. We
could create an RSocket responder that uses a message handler as well. For
simplicity, and given that RSocket will be removed in future releases, it
was decided not to follow that path.
@param messageHandler Received messages handler.
*/
virtual void setMessageHandler(SocketMessageHandler messageHandler) {}

View File

@@ -6,37 +6,14 @@
*/
#include "FlipperSocketProvider.h"
#include "FlipperRSocket.h"
#include "FlipperSocket.h"
#include "FlipperTransportTypes.h"
namespace facebook {
namespace flipper {
class FlipperDefaultSocketProvider : public FlipperSocketProvider {
public:
FlipperDefaultSocketProvider() {}
virtual std::unique_ptr<FlipperSocket> create(
FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload,
folly::EventBase* eventBase) override {
return std::make_unique<FlipperRSocket>(
std::move(endpoint), std::move(payload), eventBase);
}
virtual std::unique_ptr<FlipperSocket> create(
FlipperConnectionEndpoint endpoint,
std::unique_ptr<FlipperSocketBasePayload> payload,
folly::EventBase* eventBase,
ConnectionContextStore* connectionContextStore) override {
return std::make_unique<FlipperRSocket>(
std::move(endpoint),
std::move(payload),
eventBase,
connectionContextStore);
}
};
std::unique_ptr<FlipperSocketProvider> FlipperSocketProvider::provider_ =
std::make_unique<FlipperDefaultSocketProvider>();
nullptr;
std::unique_ptr<FlipperSocketProvider> FlipperSocketProvider::shelvedProvider_ =
nullptr;
@@ -65,13 +42,8 @@ void FlipperSocketProvider::setDefaultProvider(
provider_ = std::move(provider);
}
void FlipperSocketProvider::shelveDefault() {
shelvedProvider_ = std::move(provider_);
provider_ = std::make_unique<FlipperDefaultSocketProvider>();
}
void FlipperSocketProvider::unshelveDefault() {
provider_ = std::move(shelvedProvider_);
bool FlipperSocketProvider::hasProvider() {
return provider_ != nullptr;
}
} // namespace flipper

View File

@@ -66,15 +66,7 @@ class FlipperSocketProvider {
static void setDefaultProvider(
std::unique_ptr<FlipperSocketProvider> provider);
/**
Shelves the current default socket provider and promotes the internal
socket provider as default.
*/
static void shelveDefault();
/**
Restores a previously shelved socket provider.
*/
static void unshelveDefault();
static bool hasProvider();
private:
static std::unique_ptr<FlipperSocketProvider> provider_;

View File

@@ -1,70 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <Flipper/FlipperRSocketResponder.h>
#include <Flipper/Log.h>
#include <FlipperTestLib/FlipperConnectionManagerMock.h>
#include <folly/json.h>
#include <gtest/gtest.h>
namespace facebook {
namespace flipper {
namespace test {
using folly::dynamic;
class Callbacks
: public facebook::flipper::FlipperConnectionManager::Callbacks {
public:
void onConnected() {}
void onDisconnected() {}
void onMessageReceived(
const folly::dynamic& message,
std::unique_ptr<FlipperResponder> responder) {
message_ = message;
responder_ = std::move(responder);
}
folly::dynamic message_;
std::unique_ptr<FlipperResponder> responder_;
};
TEST(FlipperRSocketResponderTests, testFireAndForgetWithoutIdParam) {
auto socket = facebook::flipper::test::FlipperConnectionManagerMock();
auto callbacks = new Callbacks();
socket.setCallbacks(callbacks);
folly::EventBase* eb = new folly::EventBase();
auto responder = facebook::flipper::FlipperRSocketResponder(&socket, eb);
dynamic d = dynamic::object("my", "message");
auto json = folly::toJson(d);
responder.handleFireAndForget(rsocket::Payload(json), rsocket::StreamId(1));
EXPECT_EQ(socket.messagesReceived.size(), 1);
EXPECT_EQ(socket.messagesReceived[0]["my"], "message");
EXPECT_EQ(socket.respondersWithIdReceived, 0);
EXPECT_EQ(socket.respondersWithoutIdReceived, 1);
}
TEST(FlipperRSocketResponderTests, testFireAndForgetWithIdParam) {
auto socket = facebook::flipper::test::FlipperConnectionManagerMock();
auto callbacks = new Callbacks();
socket.setCallbacks(callbacks);
folly::EventBase* eb = new folly::EventBase();
auto responder = facebook::flipper::FlipperRSocketResponder(&socket, eb);
dynamic d = dynamic::object("my", "message")("id", 7);
auto json = folly::toJson(d);
responder.handleFireAndForget(rsocket::Payload(json), rsocket::StreamId(1));
EXPECT_EQ(socket.messagesReceived.size(), 1);
EXPECT_EQ(socket.messagesReceived[0]["my"], "message");
EXPECT_EQ(socket.messagesReceived[0]["id"], 7);
EXPECT_EQ(socket.respondersWithIdReceived, 1);
EXPECT_EQ(socket.respondersWithoutIdReceived, 0);
}
} // namespace test
} // namespace flipper
} // namespace facebook

View File

@@ -1,86 +0,0 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <Flipper/FlipperResponderImpl.h>
#include "yarpl/Flowable.h"
#include "yarpl/Single.h"
#include "yarpl/single/SingleTestObserver.h"
#include <folly/json.h>
#include <gtest/gtest.h>
namespace facebook {
namespace flipper {
namespace test {
using folly::dynamic;
void assertIsSuccess(folly::dynamic d);
void assertIsError(folly::dynamic d);
TEST(FlipperResponderImplTest, testSuccessWrapper) {
auto dynamicSingle =
yarpl::single::Single<folly::dynamic>::create([](auto observer) mutable {
observer->onSubscribe(yarpl::single::SingleSubscriptions::empty());
auto responder = std::make_shared<FlipperResponderImpl>(observer);
responder->success(folly::dynamic::object("my", "object"));
});
auto to = yarpl::single::SingleTestObserver<folly::dynamic>::create();
dynamicSingle->subscribe(to);
to->awaitTerminalEvent();
auto output = to->getOnSuccessValue();
assertIsSuccess(output);
EXPECT_EQ(output["success"]["my"], "object");
}
TEST(FlipperResponderImplTest, testErrorWrapper) {
auto dynamicSingle =
yarpl::single::Single<folly::dynamic>::create([](auto observer) mutable {
observer->onSubscribe(yarpl::single::SingleSubscriptions::empty());
auto responder = std::make_shared<FlipperResponderImpl>(observer);
responder->error(folly::dynamic::object("my", "object"));
});
auto to = yarpl::single::SingleTestObserver<folly::dynamic>::create();
dynamicSingle->subscribe(to);
to->awaitTerminalEvent();
auto output = to->getOnSuccessValue();
assertIsError(output);
EXPECT_EQ(output["error"]["my"], "object");
}
TEST(FlipperResponderImplTest, testNoExplicitResponseReturnsSuccess) {
auto to = yarpl::single::SingleTestObserver<folly::dynamic>::create();
{
auto dynamicSingle = yarpl::single::Single<folly::dynamic>::create(
[](auto observer) mutable {
observer->onSubscribe(yarpl::single::SingleSubscriptions::empty());
auto responder = std::make_shared<FlipperResponderImpl>(observer);
});
dynamicSingle->subscribe(to);
}
to->awaitTerminalEvent();
auto output = to->getOnSuccessValue();
assertIsSuccess(output);
EXPECT_TRUE(output["success"].empty());
}
void assertIsSuccess(folly::dynamic d) {
EXPECT_NE(d.find("success"), d.items().end());
EXPECT_EQ(d.find("error"), d.items().end());
}
void assertIsError(folly::dynamic d) {
EXPECT_NE(d.find("error"), d.items().end());
EXPECT_EQ(d.find("success"), d.items().end());
}
} // namespace test
} // namespace flipper
} // namespace facebook