Socket connect no longer synchronous and blocking

Summary:
Never really liked this code. Before this change, calls to connect were blocking.

Because of this, we had to make use of promises and a bit of really not that good-looking code.

So, this change makes connect non-blocking meaning that we make full use of our event handler.

These changes contain:
- CSR is not getting generated after each failed attempt.
- Connect is no longer blocking.
- Do not report events via the handler when explicitly disconnecting.

Reviewed By: jknoxville

Differential Revision: D46853228

fbshipit-source-id: 00e6a9c7c039a756175fe14982959e078d92bacb
This commit is contained in:
Lorenzo Blasa
2023-06-28 12:09:58 -07:00
committed by Facebook GitHub Bot
parent 65e515bdaa
commit e42db220ee
22 changed files with 286 additions and 436 deletions

View File

@@ -20,12 +20,12 @@
#define WRONG_THREAD_EXIT_MSG \
"ERROR: Aborting flipper initialization because it's not running in the flipper thread."
static constexpr int reconnectIntervalSeconds = 2;
static constexpr int RECONNECT_INTERVAL_SECONDS = 3;
// Not a public-facing version number.
// Used for compatibility checking with desktop flipper.
// To be bumped for every core platform interface change.
static constexpr int sdkVersion = 4;
static constexpr int SDK_VERSION = 4;
using namespace folly;
@@ -66,14 +66,14 @@ FlipperConnectionManagerImpl::FlipperConnectionManagerImpl(
std::shared_ptr<FlipperState> state,
std::shared_ptr<ConnectionContextStore> contextStore)
: deviceData_(config.deviceData),
flipperState_(state),
state_(state),
insecurePort(config.insecurePort),
securePort(config.securePort),
altInsecurePort(config.altInsecurePort),
altSecurePort(config.altSecurePort),
flipperScheduler_(config.callbackWorker),
scheduler_(config.callbackWorker),
connectionScheduler_(config.connectionWorker),
contextStore_(contextStore),
store_(contextStore),
implWrapper_(std::make_shared<FlipperConnectionManagerWrapper>(this)) {
CHECK_THROW(config.callbackWorker, std::invalid_argument);
CHECK_THROW(config.connectionWorker, std::invalid_argument);
@@ -85,39 +85,52 @@ FlipperConnectionManagerImpl::~FlipperConnectionManagerImpl() {
void FlipperConnectionManagerImpl::setCertificateProvider(
const std::shared_ptr<FlipperCertificateProvider> provider) {
certProvider_ = provider;
certificateProvider_ = provider;
};
std::shared_ptr<FlipperCertificateProvider>
FlipperConnectionManagerImpl::getCertificateProvider() {
return certProvider_;
return certificateProvider_;
}
void FlipperConnectionManagerImpl::handleSocketEvent(SocketEvent event) {
switch (event) {
case SocketEvent::OPEN:
isConnected_ = true;
if (connectionIsTrusted_) {
callbacks_->onConnected();
}
break;
case SocketEvent::SSL_ERROR:
// SSL errors are not handled as a connection event
// on this handler.
break;
case SocketEvent::CLOSE:
case SocketEvent::ERROR:
if (!isConnected_) {
return;
}
isConnected_ = false;
if (connectionIsTrusted_) {
connectionIsTrusted_ = false;
callbacks_->onDisconnected();
}
reconnect();
break;
}
// Ensure that the event is handled on the correct thread i.e. scheduler.
scheduler_->schedule([this, event]() {
switch (event) {
case SocketEvent::OPEN:
isConnected_ = true;
if (isConnectionTrusted_) {
failedConnectionAttempts_ = 0;
callbacks_->onConnected();
} else {
requestSignedCertificate();
}
break;
case SocketEvent::SSL_ERROR:
log("[conn] handleSocketEvent(SSL_ERROR)");
failedConnectionAttempts_++;
reconnect();
break;
case SocketEvent::CLOSE:
case SocketEvent::ERROR:
log("[conn] handleSocketEvent(CLOSE_ERROR)");
if (!isConnected_) {
reconnect();
return;
}
failedConnectionAttempts_++;
isConnected_ = false;
if (isConnectionTrusted_) {
isConnectionTrusted_ = false;
callbacks_->onDisconnected();
}
reconnect();
break;
}
});
}
void FlipperConnectionManagerImpl::start() {
@@ -126,22 +139,22 @@ void FlipperConnectionManagerImpl::start() {
return;
}
if (isStarted_) {
if (started_) {
log("Already started");
return;
}
isStarted_ = true;
started_ = true;
auto step = flipperState_->start("Start connection thread");
auto step = state_->start("Start connection thread");
flipperScheduler_->schedule([this, step]() {
scheduler_->schedule([this, step]() {
step->complete();
startSync();
});
}
void FlipperConnectionManagerImpl::startSync() {
if (!isStarted_) {
if (!started_) {
log("Not started");
return;
}
@@ -153,52 +166,28 @@ void FlipperConnectionManagerImpl::startSync() {
log("Already connected");
return;
}
socket_ = nullptr;
bool isClientSetupStep = isCertificateExchangeNeeded();
auto step = flipperState_->start(
isClientSetupStep ? "Establish pre-setup connection"
auto step = state_->start(
isClientSetupStep ? "Establish certificate exchange connection"
: "Establish main connection");
try {
if (isClientSetupStep) {
bool success = connectAndExchangeCertificate();
if (!success) {
reconnect();
return;
}
} else {
if (!connectSecurely()) {
// The expected code path when flipper desktop is not running.
// Don't count as a failed attempt, or it would invalidate the
// connection files for no reason. On iOS devices, we can always connect
// to the local port forwarding server even when it can't connect to
// flipper. In that case we get a Network error instead of a Port not
// open error, so we treat them the same.
step->fail(
"No route to flipper found. Is flipper desktop running? Retrying...");
reconnect();
}
}
step->complete();
} catch (const SSLException& e) {
auto message = std::string(e.what()) +
"\nMake sure the date and time of your device is up to date.";
log(message);
step->fail(message);
failedConnectionAttempts_++;
reconnect();
} catch (const std::exception& e) {
log(e.what());
step->fail(e.what());
failedConnectionAttempts_++;
reconnect();
if (isClientSetupStep) {
connectAndExchangeCertificate();
} else {
connectSecurely();
}
step->complete();
}
bool FlipperConnectionManagerImpl::connectAndExchangeCertificate() {
void FlipperConnectionManagerImpl::connectAndExchangeCertificate() {
log("[conn] connectAndExchangeCertificate()");
auto port = insecurePort;
auto endpoint = FlipperConnectionEndpoint(deviceData_.host, port, false);
int medium = certProvider_ != nullptr
? certProvider_->getCertificateExchangeMedium()
int medium = certificateProvider_ != nullptr
? certificateProvider_->getCertificateExchangeMedium()
: FlipperCertificateExchangeMedium::FS_ACCESS;
auto payload = std::make_unique<FlipperSocketBasePayload>();
@@ -206,47 +195,31 @@ bool FlipperConnectionManagerImpl::connectAndExchangeCertificate() {
payload->device = deviceData_.device;
payload->device_id = "unknown";
payload->app = deviceData_.app;
payload->sdk_version = sdkVersion;
payload->sdk_version = SDK_VERSION;
payload->medium = medium;
client_ = FlipperSocketProvider::socketCreate(
endpoint, std::move(payload), flipperScheduler_);
client_->setEventHandler(ConnectionEvents(implWrapper_));
socket_ = FlipperSocketProvider::socketCreate(
endpoint, std::move(payload), scheduler_);
socket_->setEventHandler(ConnectionEvents(implWrapper_));
connectionIsTrusted_ = false;
auto step =
flipperState_->start("Attempt to connect for certificate exchange");
isConnectionTrusted_ = false;
auto step = state_->start("Attempt to connect for certificate exchange");
step->complete();
// NON-TLS:
// On failure: clear the client.
// On success: proceed to request the client certificate.
// Connect is just handled here, move this elsewhere.
if (!client_->connect(this)) {
client_ = nullptr;
return false;
}
requestSignedCertificate();
return true;
socket_->connect(this);
}
bool FlipperConnectionManagerImpl::connectSecurely() {
client_ = nullptr;
void FlipperConnectionManagerImpl::connectSecurely() {
auto port = securePort;
auto endpoint = FlipperConnectionEndpoint(deviceData_.host, port, true);
int medium = certProvider_ != nullptr
? certProvider_->getCertificateExchangeMedium()
int medium = certificateProvider_ != nullptr
? certificateProvider_->getCertificateExchangeMedium()
: FlipperCertificateExchangeMedium::FS_ACCESS;
auto loadingDeviceId = flipperState_->start("Load Device Id");
auto deviceId = contextStore_->getDeviceId();
auto loadingDeviceId = state_->start("Load Device Id");
auto deviceId = store_->getDeviceId();
if (deviceId.compare("unknown")) {
loadingDeviceId->complete();
}
@@ -256,15 +229,15 @@ bool FlipperConnectionManagerImpl::connectSecurely() {
payload->device = deviceData_.device;
payload->device_id = deviceId;
payload->app = deviceData_.app;
payload->sdk_version = sdkVersion;
payload->sdk_version = SDK_VERSION;
payload->medium = medium;
payload->csr = contextStore_->getCertificateSigningRequest().c_str();
payload->csr_path = contextStore_->getCertificateDirectoryPath().c_str();
payload->csr = store_->getCertificateSigningRequest().c_str();
payload->csr_path = store_->getCertificateDirectoryPath().c_str();
client_ = FlipperSocketProvider::socketCreate(
endpoint, std::move(payload), connectionScheduler_, contextStore_.get());
client_->setEventHandler(ConnectionEvents(implWrapper_));
client_->setMessageHandler([this](const std::string& msg) {
socket_ = FlipperSocketProvider::socketCreate(
endpoint, std::move(payload), connectionScheduler_, store_.get());
socket_->setEventHandler(ConnectionEvents(implWrapper_));
socket_->setMessageHandler([this](const std::string& msg) {
std::unique_ptr<FireAndForgetBasedFlipperResponder> responder;
auto message = folly::parseJson(msg);
auto idItr = message.find("id");
@@ -278,54 +251,41 @@ bool FlipperConnectionManagerImpl::connectSecurely() {
this->onMessageReceived(folly::parseJson(msg), std::move(responder));
});
connectionIsTrusted_ = true;
auto step = flipperState_->start(
"Attempt to connect with existing client certificate");
isConnectionTrusted_ = true;
auto step =
state_->start("Attempt to connect with existing client certificate");
step->complete();
// TLS:
// On failure: clear the client.
// On success: clear number of failed attempts.
// Connect is just handled here, move this elsewhere.
if (!client_->connect(this)) {
client_ = nullptr;
return false;
}
failedConnectionAttempts_ = 0;
return true;
socket_->connect(this);
}
void FlipperConnectionManagerImpl::reconnect() {
if (!isStarted_) {
if (!started_) {
log("Not started");
return;
}
flipperScheduler_->scheduleAfter(
[this]() { startSync(); }, reconnectIntervalSeconds * 1000.0f);
log("[conn] reconnect()");
scheduler_->scheduleAfter(
[this]() { startSync(); }, RECONNECT_INTERVAL_SECONDS * 1000.0f);
}
void FlipperConnectionManagerImpl::stop() {
if (certProvider_ && certProvider_->shouldResetCertificateFolder()) {
contextStore_->resetState();
if (certificateProvider_ &&
certificateProvider_->shouldResetCertificateFolder()) {
store_->resetState();
}
if (!isStarted_) {
if (!started_) {
log("Not started");
return;
}
isStarted_ = false;
started_ = false;
std::shared_ptr<std::promise<void>> joinPromise =
std::make_shared<std::promise<void>>();
std::future<void> join = joinPromise->get_future();
flipperScheduler_->schedule([this, joinPromise]() {
if (client_) {
client_->disconnect();
}
client_ = nullptr;
scheduler_->schedule([this, joinPromise]() {
socket_ = nullptr;
joinPromise->set_value();
});
@@ -333,7 +293,7 @@ void FlipperConnectionManagerImpl::stop() {
}
bool FlipperConnectionManagerImpl::isConnected() const {
return isConnected_ && connectionIsTrusted_;
return isConnected_ && isConnectionTrusted_;
}
void FlipperConnectionManagerImpl::setCallbacks(Callbacks* callbacks) {
@@ -341,10 +301,10 @@ void FlipperConnectionManagerImpl::setCallbacks(Callbacks* callbacks) {
}
void FlipperConnectionManagerImpl::sendMessage(const folly::dynamic& message) {
flipperScheduler_->schedule([this, message]() {
scheduler_->schedule([this, message]() {
try {
if (client_) {
client_->send(message, []() {});
if (socket_) {
socket_->send(message, []() {});
}
} catch (std::length_error& e) {
// Skip sending messages that are too large.
@@ -355,10 +315,10 @@ void FlipperConnectionManagerImpl::sendMessage(const folly::dynamic& message) {
}
void FlipperConnectionManagerImpl::sendMessageRaw(const std::string& message) {
flipperScheduler_->schedule([this, message]() {
scheduler_->schedule([this, message]() {
try {
if (client_) {
client_->send(message, []() {});
if (socket_) {
socket_->send(message, []() {});
}
} catch (std::length_error& e) {
// Skip sending messages that are too large.
@@ -379,7 +339,7 @@ bool FlipperConnectionManagerImpl::isCertificateExchangeNeeded() {
return true;
}
auto last_known_medium = contextStore_->getLastKnownMedium();
auto last_known_medium = store_->getLastKnownMedium();
if (!last_known_medium) {
return true;
}
@@ -387,15 +347,15 @@ bool FlipperConnectionManagerImpl::isCertificateExchangeNeeded() {
// When we exchange certs over WWW, we use a fake generated serial number and
// a virtual device. If medium changes to FS_ACCESS at some point, we should
// restart the exchange process to get the device ID of the real device.
int medium = certProvider_ != nullptr
? certProvider_->getCertificateExchangeMedium()
int medium = certificateProvider_ != nullptr
? certificateProvider_->getCertificateExchangeMedium()
: FlipperCertificateExchangeMedium::FS_ACCESS;
if (last_known_medium != medium) {
return true;
}
auto step = flipperState_->start("Check required certificates are present");
bool hasRequiredFiles = contextStore_->hasRequiredFiles();
auto step = state_->start("Check required certificates are present");
bool hasRequiredFiles = store_->hasRequiredFiles();
if (hasRequiredFiles) {
step->complete();
}
@@ -406,109 +366,88 @@ void FlipperConnectionManagerImpl::processSignedCertificateResponse(
std::shared_ptr<FlipperStep> gettingCert,
std::string response,
bool isError) {
/**
Need to keep track of whether the response has been
handled. On success, the completion handler deallocates the socket which in
turn triggers a disconnect. A disconnect is called within
the context of a subscription handler. This means that the completion
handler can be called again to notify that the stream has
been interrupted because we are effectively still handing the response
read. So, if already handled, ignore and return;
*/
if (certificateExchangeCompleted_)
return;
certificateExchangeCompleted_ = true;
if (isError) {
auto error =
"Desktop failed to provide certificates. Error from flipper desktop:\n" +
"Flipper failed to provide certificates. Error from Flipper Desktop:\n" +
response;
log(error);
gettingCert->fail(error);
client_ = nullptr;
return;
}
int medium = certProvider_ != nullptr
? certProvider_->getCertificateExchangeMedium()
: FlipperCertificateExchangeMedium::FS_ACCESS;
if (!response.empty()) {
folly::dynamic config = folly::parseJson(response);
config["medium"] = medium;
contextStore_->storeConnectionConfig(config);
}
if (certProvider_) {
certProvider_->setFlipperState(flipperState_);
auto gettingCertFromProvider =
flipperState_->start("Getting cert from Cert Provider");
} else {
int medium = certificateProvider_ != nullptr
? certificateProvider_->getCertificateExchangeMedium()
: FlipperCertificateExchangeMedium::FS_ACCESS;
try {
// Certificates should be present in app's sandbox after it is
// returned. The reason we can't have a completion block here
// is because if the certs are not present after it returns
// then the flipper tries to reconnect on insecured channel
// and recreates the app.csr. By the time completion block is
// called the DeviceCA cert doesn't match app's csr and it
// throws an SSL error.
certProvider_->getCertificates(
contextStore_->getCertificateDirectoryPath(),
contextStore_->getDeviceId());
gettingCertFromProvider->complete();
} catch (std::exception& e) {
gettingCertFromProvider->fail(e.what());
gettingCert->fail(e.what());
} catch (...) {
gettingCertFromProvider->fail("Exception from certProvider");
gettingCert->fail("Exception from certProvider");
if (!response.empty()) {
folly::dynamic config = folly::parseJson(response);
config["medium"] = medium;
store_->storeConnectionConfig(config);
}
}
log("Certificate exchange complete.");
gettingCert->complete();
if (certificateProvider_) {
certificateProvider_->setFlipperState(state_);
auto gettingCertFromProvider =
state_->start("Getting client certificate from Certificate Provider");
// Disconnect after message sending is complete.
// The client destructor will send a disconnected event
// which will be handled by Flipper which will initiate
// a reconnect sequence.
client_ = nullptr;
try {
// Certificates should be present in app's sandbox after it is
// returned. The reason we can't have a completion block here
// is because if the certs are not present after it returns
// then the flipper tries to reconnect on insecured channel
// and recreates the app.csr. By the time completion block is
// called the DeviceCA cert doesn't match app's csr and it
// throws an SSL error.
certificateProvider_->getCertificates(
store_->getCertificateDirectoryPath(), store_->getDeviceId());
gettingCertFromProvider->complete();
} catch (std::exception& e) {
gettingCertFromProvider->fail(e.what());
gettingCert->fail(e.what());
} catch (...) {
gettingCertFromProvider->fail(
"Exception thrown from Certificate Provider");
gettingCert->fail("Exception thrown from Certificate Provider");
}
}
log("Certificate exchange complete.");
gettingCert->complete();
}
socket_ = nullptr;
reconnect();
}
void FlipperConnectionManagerImpl::requestSignedCertificate() {
auto resettingState = flipperState_->start("Reset connection store state");
contextStore_->resetState();
auto resettingState = state_->start("Reset connection store state");
store_->resetState();
resettingState->complete();
auto generatingCSR = flipperState_->start("Generate CSR");
std::string csr = contextStore_->getCertificateSigningRequest();
auto generatingCSR = state_->start("Generate CSR");
std::string csr = store_->getCertificateSigningRequest();
generatingCSR->complete();
int medium = certProvider_ != nullptr
? certProvider_->getCertificateExchangeMedium()
int medium = certificateProvider_ != nullptr
? certificateProvider_->getCertificateExchangeMedium()
: FlipperCertificateExchangeMedium::FS_ACCESS;
folly::dynamic message = folly::dynamic::object("method", "signCertificate")(
"csr", csr.c_str())(
"destination",
contextStore_->getCertificateDirectoryPath().c_str())("medium", medium);
folly::dynamic message =
folly::dynamic::object("method", "signCertificate")("csr", csr.c_str())(
"destination",
store_->getCertificateDirectoryPath().c_str())("medium", medium);
auto gettingCert = flipperState_->start("Getting cert from desktop");
auto gettingCert = state_->start("Getting cert from desktop");
certificateExchangeCompleted_ = false;
flipperScheduler_->schedule([this, message, gettingCert]() {
if (!client_) {
return;
}
client_->sendExpectResponse(
folly::toJson(message),
[this, gettingCert](const std::string& response, bool isError) {
flipperScheduler_->schedule([this, gettingCert, response, isError]() {
this->processSignedCertificateResponse(
gettingCert, response, isError);
});
socket_->sendExpectResponse(
folly::toJson(message),
[this, gettingCert](const std::string& response, bool isError) {
scheduler_->schedule([this, gettingCert, response, isError]() {
this->processSignedCertificateResponse(
gettingCert, response, isError);
});
});
});
failedConnectionAttempts_ = 0;
}
bool FlipperConnectionManagerImpl::isRunningInOwnThread() {
return flipperScheduler_->isRunningInOwnThread();
return scheduler_->isRunningInOwnThread();
}
} // namespace flipper