From 6c72e78d046db985edfc70bab58656fe1e313348 Mon Sep 17 00:00:00 2001 From: Lorenzo Blasa Date: Thu, 29 Jul 2021 03:19:01 -0700 Subject: [PATCH] RSocket server Summary: RSocket server which implements the ServerAdapter interface. Most of the code was taken from the ServerController class with a few minor adjustments. Once ServerController starts using ServerAdapter instances the code will be removed from there. Reviewed By: fabiomassimo Differential Revision: D29984578 fbshipit-source-id: e35e7635dff995314b3c1fbc85177f90384e025d --- desktop/app/src/comms/ServerRSocket.tsx | 268 ++++++++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 desktop/app/src/comms/ServerRSocket.tsx diff --git a/desktop/app/src/comms/ServerRSocket.tsx b/desktop/app/src/comms/ServerRSocket.tsx new file mode 100644 index 000000000..3f28d5b53 --- /dev/null +++ b/desktop/app/src/comms/ServerRSocket.tsx @@ -0,0 +1,268 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @format + */ + +import {SecureServerConfig} from '../utils/CertificateProvider'; +import ServerAdapter, { + SecureClientQuery, + ServerEventsListener, +} from './ServerAdapter'; +import tls from 'tls'; +import net, {Socket} from 'net'; +import {RSocketServer} from 'rsocket-core'; +import RSocketTCPServer from 'rsocket-tcp-server'; +import {Payload, ReactiveSocket, Responder} from 'rsocket-types'; +import Client, {ClientQuery} from '../Client'; +import {Single} from 'rsocket-flowable'; +import { + ClientConnection, + ConnectionStatusChange, + ConnectionStatus, + ResponseType, +} from './ClientConnection'; + +/** + * RSocket based server. RSocket uses its own protocol for communication between + * client and server. + */ +class ServerRSocket extends ServerAdapter { + rawServer_: RSocketServer | null | undefined; + constructor(listener: ServerEventsListener) { + super(listener); + this.rawServer_ = null; + } + + /** + * Start the server bound to the specified port. It configures + * the RSocket server factory and request handler based on the optional + * sslConfig argument. + */ + start(port: number, sslConfig?: SecureServerConfig): Promise { + const self = this; + return new Promise((resolve, reject) => { + // eslint-disable-next-line prefer-const + let rawServer: RSocketServer | undefined; + const serverFactory = (onConnect: (socket: Socket) => void) => { + const transportServer = sslConfig + ? tls.createServer(sslConfig, (socket) => { + onConnect(socket); + }) + : net.createServer(onConnect); + transportServer + .on('error', (err) => { + self.listener.onError(err); + console.error(`Error opening server on port ${port}`, 'server'); + reject(err); + }) + .on('listening', () => { + console.debug( + `${ + sslConfig ? 'Secure' : 'Certificate' + } server started on port ${port}`, + 'server', + ); + self.listener.onListening(port); + self.rawServer_ = rawServer; + resolve(true); + }); + return transportServer; + }; + rawServer = new RSocketServer({ + getRequestHandler: sslConfig + ? this._trustedRequestHandler + : this._untrustedRequestHandler, + transport: new RSocketTCPServer({ + port: port, + serverFactory: serverFactory, + }), + }); + rawServer && rawServer.start(); + }); + } + + stop(): Promise { + if (this.rawServer_) { + return Promise.resolve(this.rawServer_.stop()); + } + return Promise.resolve(); + } + + /** + * Handle an incoming connection request over TLS. + * @param socket Underlying socket connection. + * @param payload Payload or message received. + * @returns Returns a valid RSocket responder which will handle further + * communication from the client. + */ + _trustedRequestHandler = ( + socket: ReactiveSocket, + payload: Payload, + ): Partial> => { + if (!payload.data) { + return {}; + } + + const clientQuery: SecureClientQuery = JSON.parse(payload.data); + this.listener.onSecureConnectionAttempt(clientQuery); + + const clientConnection: ClientConnection = { + subscribeToEvents(subscriber: ConnectionStatusChange): void { + socket.connectionStatus().subscribe({ + onNext(payload) { + let status = ConnectionStatus.CONNECTED; + + if (payload.kind == 'ERROR') status = ConnectionStatus.ERROR; + else if (payload.kind == 'CLOSED') status = ConnectionStatus.CLOSED; + else if (payload.kind == 'CONNECTED') + status = ConnectionStatus.CONNECTED; + else if (payload.kind == 'NOT_CONNECTED') + status = ConnectionStatus.NOT_CONNECTED; + else if (payload.kind == 'CONNECTING') + status = ConnectionStatus.CONNECTING; + + subscriber(status); + }, + onSubscribe(subscription) { + subscription.request(Number.MAX_SAFE_INTEGER); + }, + onError(payload) { + console.error('[client] connection status error ', payload); + }, + }); + }, + close(): void { + socket.close(); + }, + send(data: any): void { + socket.fireAndForget({data: JSON.stringify(data)}); + }, + sendExpectResponse(data: any): Promise { + return new Promise((resolve, reject) => { + socket + .requestResponse({ + data: JSON.stringify(data), + }) + .subscribe({ + onComplete: (payload: Payload) => { + const response: ResponseType = JSON.parse(payload.data); + response.length = payload.data.length; + resolve(response); + }, + onError: (e) => { + reject(e); + }, + }); + }); + }, + }; + + let resolvedClient: Client | undefined; + const client: Promise = this.listener.onConnectionCreated( + clientQuery, + clientConnection, + ); + client.then((client) => (resolvedClient = client)).catch((_) => {}); + + return { + fireAndForget: (payload: {data: string}) => { + if (resolvedClient) { + resolvedClient.onMessage(payload.data); + } else { + client && + client + .then((client) => { + client.onMessage(payload.data); + }) + .catch((_) => {}); + } + }, + }; + }; + + /** + * Handle an incoming connection request over an insecure connection. + * @param socket Underlying socket connection. + * @param payload Payload or message received. + * @returns Returns a valid RSocket responder which will handle further + * communication from the client. + */ + _untrustedRequestHandler = ( + _socket: ReactiveSocket, + payload: Payload, + ): Partial> => { + if (!payload.data) { + return {}; + } + + const clientQuery: ClientQuery = JSON.parse(payload.data); + this.listener.onConnectionAttempt(clientQuery); + + return { + requestResponse: ( + payload: Payload, + ): Single> => { + if (typeof payload.data !== 'string') { + return new Single((_) => {}); + } + + let rawData: any; + try { + rawData = JSON.parse(payload.data); + } catch (err) { + console.error( + `Invalid JSON: ${payload.data}`, + 'clientMessage', + 'server', + ); + return new Single((_) => {}); + } + + return new Single((subscriber) => { + subscriber.onSubscribe(undefined); + this._onHandleUntrustedMessage(clientQuery, rawData) + .then((response) => { + subscriber.onComplete({ + data: response, + metadata: '', + }); + }) + .catch((err) => { + subscriber.onError(err); + }); + }); + }, + // Can probably refactor this out + // Leaving this here for a while for backwards compatibility, + // but for up to date SDKs it will no longer used. + fireAndForget: (payload: Payload) => { + if (typeof payload.data !== 'string') { + return; + } + + let rawData: any; + try { + rawData = JSON.parse(payload.data); + } catch (err) { + console.error(`Invalid JSON: ${payload.data}`, 'server'); + return; + } + + if (rawData && rawData.method === 'signCertificate') { + console.debug('CSR received from device', 'server'); + this._onHandleUntrustedMessage(clientQuery, rawData) + .then((_) => {}) + .catch((err) => { + console.error('Unable to process CSR, failed with error.', err); + }); + } + }, + }; + }; +} + +export default ServerRSocket;