diff --git a/xplat/Flipper/FlipperConnectionManager.h b/xplat/Flipper/FlipperConnectionManager.h index 96ff689a2..bc7e073a1 100644 --- a/xplat/Flipper/FlipperConnectionManager.h +++ b/xplat/Flipper/FlipperConnectionManager.h @@ -45,6 +45,13 @@ class FlipperConnectionManager { The callbacks should be set before a connection is established. */ virtual void setCallbacks(Callbacks* callbacks) = 0; + + /** + Called by ws server when a message has been received. + */ + virtual void onMessageReceived( + const folly::dynamic& message, + std::unique_ptr responder) = 0; }; class FlipperConnectionManager::Callbacks { diff --git a/xplat/Flipper/FlipperConnectionManagerImpl.cpp b/xplat/Flipper/FlipperConnectionManagerImpl.cpp index cdd87cf6f..8107c9142 100644 --- a/xplat/Flipper/FlipperConnectionManagerImpl.cpp +++ b/xplat/Flipper/FlipperConnectionManagerImpl.cpp @@ -193,7 +193,7 @@ void FlipperConnectionManagerImpl::connectSecurely() { std::move(address), std::move(sslContext)), std::move(parameters), - std::make_shared(this), + std::make_shared(this, connectionEventBase_), std::chrono::seconds(connectionKeepaliveSeconds), // keepaliveInterval nullptr, // stats std::make_shared(this)) @@ -241,6 +241,12 @@ void FlipperConnectionManagerImpl::sendMessage(const folly::dynamic& message) { }); } +void FlipperConnectionManagerImpl::onMessageReceived( + const folly::dynamic& message, + std::unique_ptr responder) { + callbacks_->onMessageReceived(message, std::move(responder)); +} + bool FlipperConnectionManagerImpl::isCertificateExchangeNeeded() { if (failedConnectionAttempts_ >= 2) { return true; diff --git a/xplat/Flipper/FlipperConnectionManagerImpl.h b/xplat/Flipper/FlipperConnectionManagerImpl.h index a72c32e2e..291b7ef66 100644 --- a/xplat/Flipper/FlipperConnectionManagerImpl.h +++ b/xplat/Flipper/FlipperConnectionManagerImpl.h @@ -25,7 +25,6 @@ rsocket::Payload toRSocketPayload(folly::dynamic data); class FlipperConnectionManagerImpl : public FlipperConnectionManager { friend ConnectionEvents; - friend FlipperRSocketResponder; public: FlipperConnectionManagerImpl(FlipperInitConfig config, std::shared_ptr state, std::shared_ptr contextStore); @@ -42,6 +41,10 @@ class FlipperConnectionManagerImpl : public FlipperConnectionManager { void sendMessage(const folly::dynamic& message) override; + void onMessageReceived( + const folly::dynamic& message, + std::unique_ptr responder) override; + void reconnect(); private: diff --git a/xplat/Flipper/FlipperRSocketResponder.cpp b/xplat/Flipper/FlipperRSocketResponder.cpp index 3073de49f..bb897a45d 100644 --- a/xplat/Flipper/FlipperRSocketResponder.cpp +++ b/xplat/Flipper/FlipperRSocketResponder.cpp @@ -31,7 +31,7 @@ void FlipperRSocketResponder::handleFireAndForget( std::make_unique(websocket_, id); } - websocket_->callbacks_->onMessageReceived( + websocket_->onMessageReceived( folly::parseJson(payload), std::move(responder)); } @@ -44,7 +44,7 @@ FlipperRSocketResponder::handleRequestResponse( auto dynamicSingle = yarpl::single::Single::create( [payload = std::move(requestString), this](auto observer) { auto responder = std::make_unique(observer); - websocket_->callbacks_->onMessageReceived( + websocket_->onMessageReceived( folly::parseJson(payload), std::move(responder)); }); @@ -53,19 +53,18 @@ FlipperRSocketResponder::handleRequestResponse( observer->onSubscribe(yarpl::single::SingleSubscriptions::empty()); dynamicSingle->subscribe( [observer, this](folly::dynamic d) { - websocket_->connectionEventBase_->runInEventBaseThread( - [observer, d]() { - try { - observer->onSuccess(toRSocketPayload(d)); + eventBase_->runInEventBaseThread([observer, d]() { + try { + observer->onSuccess(toRSocketPayload(d)); - } catch (std::exception& e) { - log(e.what()); - observer->onError(e); - } - }); + } catch (std::exception& e) { + log(e.what()); + observer->onError(e); + } + }); }, [observer, this](folly::exception_wrapper e) { - websocket_->connectionEventBase_->runInEventBaseThread( + eventBase_->runInEventBaseThread( [observer, e]() { observer->onError(e); }); }); }); diff --git a/xplat/Flipper/FlipperRSocketResponder.h b/xplat/Flipper/FlipperRSocketResponder.h index 0071b2bbd..5565d4a5e 100644 --- a/xplat/Flipper/FlipperRSocketResponder.h +++ b/xplat/Flipper/FlipperRSocketResponder.h @@ -9,15 +9,18 @@ namespace facebook { namespace flipper { -class FlipperConnectionManagerImpl; +class FlipperConnectionManager; class FlipperRSocketResponder : public rsocket::RSocketResponder { private: - FlipperConnectionManagerImpl* websocket_; + FlipperConnectionManager* websocket_; + folly::EventBase* eventBase_; public: - FlipperRSocketResponder(FlipperConnectionManagerImpl* websocket) - : websocket_(websocket){}; + FlipperRSocketResponder( + FlipperConnectionManager* websocket, + folly::EventBase* eventBase) + : websocket_(websocket), eventBase_(eventBase){}; void handleFireAndForget( rsocket::Payload request, diff --git a/xplat/FlipperTestLib/FlipperConnectionManagerMock.h b/xplat/FlipperTestLib/FlipperConnectionManagerMock.h index 9a3aa6b67..c1a357b79 100644 --- a/xplat/FlipperTestLib/FlipperConnectionManagerMock.h +++ b/xplat/FlipperTestLib/FlipperConnectionManagerMock.h @@ -1,11 +1,9 @@ -/* - * Copyright (c) 2018-present, Facebook, Inc. - * - * This source code is licensed under the MIT license found in the LICENSE - * file in the root directory of this source tree. +/** + * Copyright (c) Facebook, Inc. and its affiliates. * + * This source code is licensed under the MIT license found in the LICENSE + * file in the root directory of this source tree. */ - #pragma once #include @@ -40,6 +38,16 @@ class FlipperConnectionManagerMock : public FlipperConnectionManager { messages.push_back(message); } + void onMessageReceived( + const folly::dynamic& message, + std::unique_ptr responder) override { + if (responder) { + respondersReceived++; + } + callbacks->onMessageReceived(message, std::move(responder)); + messagesReceived.push_back(message); + } + void setCallbacks(Callbacks* aCallbacks) override { callbacks = aCallbacks; } @@ -48,6 +56,8 @@ class FlipperConnectionManagerMock : public FlipperConnectionManager { bool open = false; Callbacks* callbacks; std::vector messages; + std::vector messagesReceived; + int respondersReceived = 0; }; } // namespace test diff --git a/xplat/FlipperTests/FlipperClientTests.cpp b/xplat/FlipperTests/FlipperClientTests.cpp index 7e2826f2f..c9f9dd2d2 100644 --- a/xplat/FlipperTests/FlipperClientTests.cpp +++ b/xplat/FlipperTests/FlipperClientTests.cpp @@ -61,7 +61,7 @@ TEST_F(FlipperClientTest, testGetPlugins) { client->addPlugin(std::make_shared("Dog")); dynamic message = dynamic::object("id", 1)("method", "getPlugins"); - socket->callbacks->onMessageReceived(message, getResponder()); + socket->onMessageReceived(message, getResponder()); dynamic expected = dynamic::object("plugins", dynamic::array("Cat", "Dog")); EXPECT_EQ(successes[0], expected); @@ -92,8 +92,8 @@ TEST_F(FlipperClientTest, testRemovePlugin) { client->removePlugin(plugin); dynamic message = dynamic::object("id", 1)("method", "getPlugins"); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(message, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(message, getResponder()); dynamic expected = dynamic::object("plugins", dynamic::array()); EXPECT_EQ(successes[0], expected); @@ -148,24 +148,24 @@ TEST_F(FlipperClientTest, testInitDeinit) { { dynamic messageInit = dynamic::object("method", "init")( "params", dynamic::object("plugin", "Test")); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(messageInit, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(messageInit, getResponder()); EXPECT_TRUE(pluginConnected); } { dynamic messageDeinit = dynamic::object("method", "deinit")( "params", dynamic::object("plugin", "Test")); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(messageDeinit, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(messageDeinit, getResponder()); EXPECT_FALSE(pluginConnected); } { dynamic messageReinit = dynamic::object("method", "init")( "params", dynamic::object("plugin", "Test")); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(messageReinit, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(messageReinit, getResponder()); EXPECT_TRUE(pluginConnected); } @@ -198,14 +198,14 @@ TEST_F(FlipperClientTest, testUnhandleableMethod) { { dynamic messageInit = dynamic::object("method", "init")( "params", dynamic::object("plugin", "Test")); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(messageInit, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(messageInit, getResponder()); } { dynamic messageExecute = dynamic::object("id", 1)("method", "unexpected"); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(messageExecute, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(messageExecute, getResponder()); } dynamic expected = @@ -231,8 +231,8 @@ TEST_F(FlipperClientTest, testExecute) { { dynamic messageInit = dynamic::object("method", "init")( "params", dynamic::object("plugin", "Test")); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(messageInit, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(messageInit, getResponder()); } { @@ -266,8 +266,8 @@ TEST_F(FlipperClientTest, testExecuteWithParams) { { dynamic messageInit = dynamic::object("method", "init")( "params", dynamic::object("plugin", "Test")); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(messageInit, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(messageInit, getResponder()); } { @@ -275,8 +275,8 @@ TEST_F(FlipperClientTest, testExecuteWithParams) { "params", dynamic::object("api", "Test")("method", "animal_sounds")( "params", dynamic::object("first", "dog")("second", "cat"))); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(messageExecute, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(messageExecute, getResponder()); } dynamic expected = dynamic::object("dog", "woof")("cat", "meow"); @@ -289,8 +289,8 @@ TEST_F(FlipperClientTest, testExceptionUnknownPlugin) { dynamic messageInit = dynamic::object("method", "init")( "params", dynamic::object("plugin", "Unknown")); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(messageInit, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(messageInit, getResponder()); auto failure = failures[0]; EXPECT_EQ(failure["message"], "Plugin Unknown not found for method init"); @@ -302,8 +302,8 @@ TEST_F(FlipperClientTest, testExceptionUnknownApi) { dynamic messageInit = dynamic::object("method", "execute")( "params", dynamic::object("api", "Unknown")); - auto responder = std::make_shared(); - socket->callbacks->onMessageReceived(messageInit, getResponder()); + auto responder = std::make_unique(); + socket->onMessageReceived(messageInit, getResponder()); auto failure = failures[0]; EXPECT_EQ( failure["message"], "Connection Unknown not found for method execute"); diff --git a/xplat/FlipperTests/FlipperRSocketResponderTests.cpp b/xplat/FlipperTests/FlipperRSocketResponderTests.cpp new file mode 100644 index 000000000..00860090c --- /dev/null +++ b/xplat/FlipperTests/FlipperRSocketResponderTests.cpp @@ -0,0 +1,67 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the LICENSE + * file in the root directory of this source tree. + */ +#include +#include +#include +#include +#include + +namespace facebook { +namespace flipper { +namespace test { + +using folly::dynamic; + +class Callbacks + : public facebook::flipper::FlipperConnectionManager::Callbacks { + public: + void onConnected() {} + void onDisconnected() {} + void onMessageReceived( + const folly::dynamic& message, + std::unique_ptr responder) { + message_ = message; + responder_ = std::move(responder); + } + folly::dynamic message_; + std::unique_ptr responder_; +}; + +TEST(FlipperRSocketResponderTests, testFireAndForgetWithoutIdParam) { + auto socket = facebook::flipper::test::FlipperConnectionManagerMock(); + auto callbacks = new Callbacks(); + socket.setCallbacks(callbacks); + folly::EventBase* eb = new folly::EventBase(); + auto responder = facebook::flipper::FlipperRSocketResponder(&socket, eb); + dynamic d = dynamic::object("my", "message"); + auto json = folly::toJson(d); + + responder.handleFireAndForget(rsocket::Payload(json), rsocket::StreamId(1)); + EXPECT_EQ(socket.messagesReceived.size(), 1); + EXPECT_EQ(socket.messagesReceived[0]["my"], "message"); + EXPECT_EQ(socket.respondersReceived, 0); +} + +TEST(FlipperRSocketResponderTests, testFireAndForgetWithIdParam) { + auto socket = facebook::flipper::test::FlipperConnectionManagerMock(); + auto callbacks = new Callbacks(); + socket.setCallbacks(callbacks); + folly::EventBase* eb = new folly::EventBase(); + auto responder = facebook::flipper::FlipperRSocketResponder(&socket, eb); + dynamic d = dynamic::object("my", "message")("id", 7); + auto json = folly::toJson(d); + + responder.handleFireAndForget(rsocket::Payload(json), rsocket::StreamId(1)); + EXPECT_EQ(socket.messagesReceived.size(), 1); + EXPECT_EQ(socket.messagesReceived[0]["my"], "message"); + EXPECT_EQ(socket.messagesReceived[0]["id"], 7); + EXPECT_EQ(socket.respondersReceived, 1); +} + +} // namespace test +} // namespace flipper +} // namespace facebook