Summary: Flipper exposes a call() api to plugins which lets them call their sdk component, and it returns a promise with the response. Currently this is done by sending a fireAndForget request, noting the id of the request, and then receiving fireAndForget requests and matching up the ids to give the result back to the right plugin promise. Instead, it will be simpler to use rsocket requestResponse, instead of fireAndForget, which is for this exact use case. This diff adds a requestResponse handler to the SDK, so that it can deal with such requests and respond accordingly, while preserving the current functionality if it receives a fireAndForget. So this part is backwards compatible and should be safe to land in isolation. A later diff will change the desktop app to use requestResponse, which may not be backwards compatible, so that will have to be deployed more carefully. Reviewed By: passy Differential Revision: D13974049 fbshipit-source-id: b371d94a86b1f186375161ed8f2242a462ce418f
408 lines
14 KiB
C++
408 lines
14 KiB
C++
/**
|
|
* Copyright (c) Facebook, Inc. and its affiliates.
|
|
*
|
|
* This source code is licensed under the MIT license found in the LICENSE
|
|
* file in the root directory of this source tree.
|
|
*/
|
|
#include "FlipperConnectionManagerImpl.h"
|
|
#include <folly/String.h>
|
|
#include <folly/futures/Future.h>
|
|
#include <folly/io/async/AsyncSocketException.h>
|
|
#include <folly/io/async/SSLContext.h>
|
|
#include <folly/json.h>
|
|
#include <rsocket/Payload.h>
|
|
#include <rsocket/RSocket.h>
|
|
#include <rsocket/transports/tcp/TcpConnectionFactory.h>
|
|
#include <stdexcept>
|
|
#include <thread>
|
|
#include "ConnectionContextStore.h"
|
|
#include "FireAndForgetBasedFlipperResponder.h"
|
|
#include "FlipperResponderImpl.h"
|
|
#include "FlipperStep.h"
|
|
#include "Log.h"
|
|
#include "yarpl/Single.h"
|
|
|
|
#define WRONG_THREAD_EXIT_MSG \
|
|
"ERROR: Aborting flipper initialization because it's not running in the flipper thread."
|
|
|
|
static constexpr int reconnectIntervalSeconds = 2;
|
|
static constexpr int connectionKeepaliveSeconds = 10;
|
|
|
|
static constexpr int maxPayloadSize = 0xFFFFFF;
|
|
|
|
namespace facebook {
|
|
namespace flipper {
|
|
|
|
rsocket::Payload toRSocketPayload(dynamic data);
|
|
|
|
class ConnectionEvents : public rsocket::RSocketConnectionEvents {
|
|
private:
|
|
FlipperConnectionManagerImpl* websocket_;
|
|
|
|
public:
|
|
ConnectionEvents(FlipperConnectionManagerImpl* websocket)
|
|
: websocket_(websocket) {}
|
|
|
|
void onConnected() {
|
|
websocket_->isOpen_ = true;
|
|
if (websocket_->connectionIsTrusted_) {
|
|
websocket_->callbacks_->onConnected();
|
|
}
|
|
}
|
|
|
|
void onDisconnected(const folly::exception_wrapper&) {
|
|
if (!websocket_->isOpen_)
|
|
return;
|
|
websocket_->isOpen_ = false;
|
|
if (websocket_->connectionIsTrusted_) {
|
|
websocket_->connectionIsTrusted_ = false;
|
|
websocket_->callbacks_->onDisconnected();
|
|
}
|
|
websocket_->reconnect();
|
|
}
|
|
|
|
void onClosed(const folly::exception_wrapper& e) {
|
|
onDisconnected(e);
|
|
}
|
|
};
|
|
|
|
class Responder : public rsocket::RSocketResponder {
|
|
private:
|
|
FlipperConnectionManagerImpl* websocket_;
|
|
|
|
public:
|
|
Responder(FlipperConnectionManagerImpl* websocket) : websocket_(websocket) {}
|
|
|
|
void handleFireAndForget(
|
|
rsocket::Payload request,
|
|
rsocket::StreamId streamId) {
|
|
const auto payload = request.moveDataToString();
|
|
std::unique_ptr<FireAndForgetBasedFlipperResponder> responder;
|
|
auto message = folly::parseJson(payload);
|
|
if (message.find("id") != message.items().end()) {
|
|
auto id = message["id"].getInt();
|
|
responder =
|
|
std::make_unique<FireAndForgetBasedFlipperResponder>(websocket_, id);
|
|
}
|
|
|
|
websocket_->callbacks_->onMessageReceived(
|
|
folly::parseJson(payload), std::move(responder));
|
|
}
|
|
|
|
std::shared_ptr<yarpl::single::Single<rsocket::Payload>>
|
|
handleRequestResponse(rsocket::Payload request, rsocket::StreamId streamId) {
|
|
const auto requestString = request.moveDataToString();
|
|
|
|
auto dynamicSingle = yarpl::single::Single<folly::dynamic>::create(
|
|
[payload = std::move(requestString), this](auto observer) {
|
|
auto responder = std::make_unique<FlipperResponderImpl>(observer);
|
|
websocket_->callbacks_->onMessageReceived(
|
|
folly::parseJson(payload), std::move(responder));
|
|
});
|
|
|
|
auto rsocketSingle = yarpl::single::Single<rsocket::Payload>::create(
|
|
[payload = std::move(requestString), dynamicSingle, this](
|
|
auto observer) {
|
|
observer->onSubscribe(
|
|
yarpl::single::SingleSubscriptions::empty());
|
|
dynamicSingle->subscribe(
|
|
[observer, this](folly::dynamic d) {
|
|
websocket_->connectionEventBase_->runInEventBaseThread(
|
|
[observer, d]() {
|
|
try {
|
|
observer->onSuccess(toRSocketPayload(d));
|
|
|
|
} catch (std::exception& e) {
|
|
log(e.what());
|
|
observer->onError(e);
|
|
}
|
|
});
|
|
},
|
|
[observer, this](folly::exception_wrapper e) {
|
|
websocket_->connectionEventBase_->runInEventBaseThread(
|
|
[observer, e]() { observer->onError(e); });
|
|
});
|
|
});
|
|
|
|
return rsocketSingle;
|
|
}
|
|
};
|
|
|
|
FlipperConnectionManagerImpl::FlipperConnectionManagerImpl(
|
|
FlipperInitConfig config,
|
|
std::shared_ptr<FlipperState> state,
|
|
std::shared_ptr<ConnectionContextStore> contextStore)
|
|
: deviceData_(config.deviceData),
|
|
flipperState_(state),
|
|
insecurePort(config.insecurePort),
|
|
securePort(config.securePort),
|
|
flipperEventBase_(config.callbackWorker),
|
|
connectionEventBase_(config.connectionWorker),
|
|
contextStore_(contextStore) {
|
|
CHECK_THROW(config.callbackWorker, std::invalid_argument);
|
|
CHECK_THROW(config.connectionWorker, std::invalid_argument);
|
|
}
|
|
|
|
FlipperConnectionManagerImpl::~FlipperConnectionManagerImpl() {
|
|
stop();
|
|
}
|
|
|
|
void FlipperConnectionManagerImpl::start() {
|
|
auto step = flipperState_->start("Start connection thread");
|
|
|
|
folly::makeFuture()
|
|
.via(flipperEventBase_->getEventBase())
|
|
.delayed(std::chrono::milliseconds(0))
|
|
.thenValue([this, step](auto&&) {
|
|
step->complete();
|
|
startSync();
|
|
});
|
|
}
|
|
|
|
void FlipperConnectionManagerImpl::startSync() {
|
|
if (!isRunningInOwnThread()) {
|
|
log(WRONG_THREAD_EXIT_MSG);
|
|
return;
|
|
}
|
|
if (isOpen()) {
|
|
log("Already connected");
|
|
return;
|
|
}
|
|
bool isClientSetupStep = isCertificateExchangeNeeded();
|
|
auto step = flipperState_->start(
|
|
isClientSetupStep ? "Establish pre-setup connection"
|
|
: "Establish main connection");
|
|
try {
|
|
if (isClientSetupStep) {
|
|
doCertificateExchange();
|
|
} else {
|
|
connectSecurely();
|
|
}
|
|
step->complete();
|
|
} catch (const folly::AsyncSocketException& e) {
|
|
if (e.getType() == folly::AsyncSocketException::NOT_OPEN ||
|
|
e.getType() == folly::AsyncSocketException::NETWORK_ERROR) {
|
|
// The expected code path when flipper desktop is not running.
|
|
// Don't count as a failed attempt, or it would invalidate the connection
|
|
// files for no reason. On iOS devices, we can always connect to the local
|
|
// port forwarding server even when it can't connect to flipper. In that
|
|
// case we get a Network error instead of a Port not open error, so we
|
|
// treat them the same.
|
|
step->fail(
|
|
"No route to flipper found. Is flipper desktop running? Retrying...");
|
|
} else {
|
|
log(e.what());
|
|
failedConnectionAttempts_++;
|
|
step->fail(e.what());
|
|
}
|
|
reconnect();
|
|
} catch (const std::exception& e) {
|
|
log(e.what());
|
|
step->fail(e.what());
|
|
failedConnectionAttempts_++;
|
|
reconnect();
|
|
}
|
|
}
|
|
|
|
void FlipperConnectionManagerImpl::doCertificateExchange() {
|
|
rsocket::SetupParameters parameters;
|
|
folly::SocketAddress address;
|
|
|
|
parameters.payload = rsocket::Payload(
|
|
folly::toJson(folly::dynamic::object("os", deviceData_.os)(
|
|
"device", deviceData_.device)("app", deviceData_.app)));
|
|
address.setFromHostPort(deviceData_.host, insecurePort);
|
|
|
|
auto connectingInsecurely = flipperState_->start("Connect insecurely");
|
|
connectionIsTrusted_ = false;
|
|
client_ =
|
|
rsocket::RSocket::createConnectedClient(
|
|
std::make_unique<rsocket::TcpConnectionFactory>(
|
|
*connectionEventBase_->getEventBase(), std::move(address)),
|
|
std::move(parameters),
|
|
nullptr,
|
|
std::chrono::seconds(connectionKeepaliveSeconds), // keepaliveInterval
|
|
nullptr, // stats
|
|
std::make_shared<ConnectionEvents>(this))
|
|
.get();
|
|
connectingInsecurely->complete();
|
|
|
|
requestSignedCertFromFlipper();
|
|
}
|
|
|
|
void FlipperConnectionManagerImpl::connectSecurely() {
|
|
rsocket::SetupParameters parameters;
|
|
folly::SocketAddress address;
|
|
|
|
auto loadingDeviceId = flipperState_->start("Load Device Id");
|
|
auto deviceId = contextStore_->getDeviceId();
|
|
if (deviceId.compare("unknown")) {
|
|
loadingDeviceId->complete();
|
|
}
|
|
parameters.payload = rsocket::Payload(folly::toJson(folly::dynamic::object(
|
|
"os", deviceData_.os)("device", deviceData_.device)(
|
|
"device_id", deviceId)("app", deviceData_.app)));
|
|
address.setFromHostPort(deviceData_.host, securePort);
|
|
|
|
std::shared_ptr<folly::SSLContext> sslContext =
|
|
contextStore_->getSSLContext();
|
|
auto connectingSecurely = flipperState_->start("Connect securely");
|
|
connectionIsTrusted_ = true;
|
|
client_ =
|
|
rsocket::RSocket::createConnectedClient(
|
|
std::make_unique<rsocket::TcpConnectionFactory>(
|
|
*connectionEventBase_->getEventBase(),
|
|
std::move(address),
|
|
std::move(sslContext)),
|
|
std::move(parameters),
|
|
std::make_shared<Responder>(this),
|
|
std::chrono::seconds(connectionKeepaliveSeconds), // keepaliveInterval
|
|
nullptr, // stats
|
|
std::make_shared<ConnectionEvents>(this))
|
|
.get();
|
|
connectingSecurely->complete();
|
|
failedConnectionAttempts_ = 0;
|
|
}
|
|
|
|
void FlipperConnectionManagerImpl::reconnect() {
|
|
folly::makeFuture()
|
|
.via(flipperEventBase_->getEventBase())
|
|
.delayed(std::chrono::seconds(reconnectIntervalSeconds))
|
|
.thenValue([this](auto&&) { startSync(); });
|
|
}
|
|
|
|
void FlipperConnectionManagerImpl::stop() {
|
|
if (client_) {
|
|
client_->disconnect();
|
|
}
|
|
client_ = nullptr;
|
|
}
|
|
|
|
bool FlipperConnectionManagerImpl::isOpen() const {
|
|
return isOpen_ && connectionIsTrusted_;
|
|
}
|
|
|
|
void FlipperConnectionManagerImpl::setCallbacks(Callbacks* callbacks) {
|
|
callbacks_ = callbacks;
|
|
}
|
|
|
|
void FlipperConnectionManagerImpl::sendMessage(const folly::dynamic& message) {
|
|
flipperEventBase_->add([this, message]() {
|
|
try {
|
|
rsocket::Payload payload = toRSocketPayload(message);
|
|
if (client_) {
|
|
client_->getRequester()
|
|
->fireAndForget(std::move(payload))
|
|
->subscribe([]() {});
|
|
}
|
|
} catch (std::length_error& e) {
|
|
// Skip sending messages that are too large.
|
|
log(e.what());
|
|
return;
|
|
}
|
|
});
|
|
}
|
|
|
|
bool FlipperConnectionManagerImpl::isCertificateExchangeNeeded() {
|
|
if (failedConnectionAttempts_ >= 2) {
|
|
return true;
|
|
}
|
|
|
|
auto step = flipperState_->start("Check required certificates are present");
|
|
bool hasRequiredFiles = contextStore_->hasRequiredFiles();
|
|
if (hasRequiredFiles) {
|
|
step->complete();
|
|
}
|
|
return !hasRequiredFiles;
|
|
}
|
|
|
|
void FlipperConnectionManagerImpl::requestSignedCertFromFlipper() {
|
|
auto generatingCSR = flipperState_->start("Generate CSR");
|
|
std::string csr = contextStore_->createCertificateSigningRequest();
|
|
generatingCSR->complete();
|
|
|
|
folly::dynamic message =
|
|
folly::dynamic::object("method", "signCertificate")("csr", csr.c_str())(
|
|
"destination", contextStore_->getCertificateDirectoryPath().c_str());
|
|
auto gettingCert = flipperState_->start("Getting cert from desktop");
|
|
|
|
flipperEventBase_->add([this, message, gettingCert]() {
|
|
client_->getRequester()
|
|
->requestResponse(rsocket::Payload(folly::toJson(message)))
|
|
->subscribe(
|
|
[this, gettingCert](rsocket::Payload p) {
|
|
auto response = p.moveDataToString();
|
|
if (!response.empty()) {
|
|
folly::dynamic config = folly::parseJson(response);
|
|
contextStore_->storeConnectionConfig(config);
|
|
}
|
|
gettingCert->complete();
|
|
log("Certificate exchange complete.");
|
|
// Disconnect after message sending is complete.
|
|
// This will trigger a reconnect which should use the secure
|
|
// channel.
|
|
// TODO: Connect immediately, without waiting for reconnect
|
|
client_ = nullptr;
|
|
},
|
|
[this, message](folly::exception_wrapper e) {
|
|
e.handle(
|
|
[&](rsocket::ErrorWithPayload& errorWithPayload) {
|
|
std::string errorMessage =
|
|
errorWithPayload.payload.moveDataToString();
|
|
|
|
if (errorMessage.compare("not implemented")) {
|
|
log("Desktop failed to provide certificates. Error from flipper desktop:\n" +
|
|
errorMessage);
|
|
client_ = nullptr;
|
|
} else {
|
|
sendLegacyCertificateRequest(message);
|
|
}
|
|
},
|
|
[e](...) {
|
|
log(("Error during certificate exchange:" + e.what())
|
|
.c_str());
|
|
});
|
|
});
|
|
});
|
|
failedConnectionAttempts_ = 0;
|
|
}
|
|
|
|
void FlipperConnectionManagerImpl::sendLegacyCertificateRequest(
|
|
folly::dynamic message) {
|
|
// Desktop is using an old version of Flipper.
|
|
// Fall back to fireAndForget, instead of requestResponse.
|
|
auto sendingRequest =
|
|
flipperState_->start("Sending fallback certificate request");
|
|
client_->getRequester()
|
|
->fireAndForget(rsocket::Payload(folly::toJson(message)))
|
|
->subscribe([this, sendingRequest]() {
|
|
sendingRequest->complete();
|
|
folly::dynamic config = folly::dynamic::object();
|
|
contextStore_->storeConnectionConfig(config);
|
|
client_ = nullptr;
|
|
});
|
|
}
|
|
|
|
bool FlipperConnectionManagerImpl::isRunningInOwnThread() {
|
|
return flipperEventBase_->isInEventBaseThread();
|
|
}
|
|
|
|
rsocket::Payload toRSocketPayload(dynamic data) {
|
|
std::string json = folly::toJson(data);
|
|
rsocket::Payload payload = rsocket::Payload(json);
|
|
auto payloadLength = payload.data->computeChainDataLength();
|
|
|
|
DCHECK_LE(payloadLength, maxPayloadSize);
|
|
if (payloadLength > maxPayloadSize) {
|
|
auto logMessage =
|
|
std::string(
|
|
"Error: Skipping sending message larger than max rsocket payload: ") +
|
|
json;
|
|
throw new std::length_error(logMessage);
|
|
}
|
|
return payload;
|
|
}
|
|
|
|
} // namespace flipper
|
|
} // namespace facebook
|