diff --git a/desktop/app/src/Client.tsx b/desktop/app/src/Client.tsx index f1b5d50cc..8a70f18ac 100644 --- a/desktop/app/src/Client.tsx +++ b/desktop/app/src/Client.tsx @@ -11,8 +11,6 @@ import {PluginDefinition} from './plugin'; import BaseDevice, {OS} from './devices/BaseDevice'; import {Logger} from './fb-interfaces/Logger'; import {Store} from './reducers/index'; -import {Payload, ConnectionStatus} from 'rsocket-types'; -import {Flowable, Single} from 'rsocket-flowable'; import {performance} from 'perf_hooks'; import {reportPluginFailures} from './utils/metrics'; import {default as isProduction} from './utils/isProduction'; @@ -34,6 +32,11 @@ import { isFlipperMessageDebuggingEnabled, registerFlipperDebugMessage, } from './chrome/FlipperMessages'; +import { + ConnectionStatus, + ErrorType, + ClientConnection, +} from './comms/ClientConnection'; type Plugins = Set; type PluginsArr = Array; @@ -51,7 +54,6 @@ export type ClientExport = { query: ClientQuery; }; -type ErrorType = {message: string; stacktrace: string; name: string}; type Params = { api: string; method: string; @@ -85,13 +87,6 @@ const handleError = (store: Store, device: BaseDevice, error: ErrorType) => { crashReporterPlugin.instanceApi.reportCrash(payload); }; -export interface FlipperClientConnection { - connectionStatus(): Flowable; - close(): void; - fireAndForget(payload: Payload): void; - requestResponse(payload: Payload): Single>; -} - export default class Client extends EventEmitter { connected = createState(false); id: string; @@ -100,7 +95,7 @@ export default class Client extends EventEmitter { messageIdCounter: number; plugins: Plugins; backgroundPlugins: Plugins; - connection: FlipperClientConnection | null | undefined; + connection: ClientConnection | null | undefined; store: Store; activePlugins: Set; freezeData = GK.get('flipper_frozen_data'); @@ -135,7 +130,7 @@ export default class Client extends EventEmitter { constructor( id: string, query: ClientQuery, - conn: FlipperClientConnection | null | undefined, + conn: ClientConnection | null | undefined, logger: Logger, store: Store, plugins: Plugins | null | undefined, @@ -161,18 +156,13 @@ export default class Client extends EventEmitter { const client = this; if (conn) { - conn.connectionStatus().subscribe({ - onNext(payload) { - if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') { - client.connected.set(false); - } - }, - onSubscribe(subscription) { - subscription.request(Number.MAX_SAFE_INTEGER); - }, - onError(payload) { - console.error('[client] connection status error ', payload); - }, + conn.subscribeToEvents((status) => { + if ( + status === ConnectionStatus.CLOSED || + status === ConnectionStatus.ERROR + ) { + client.connected.set(false); + } }); } } @@ -531,7 +521,7 @@ export default class Client extends EventEmitter { } rawCall(method: string, fromPlugin: boolean, params?: Params): Promise { - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { const id = this.messageIdCounter++; const metadata: RequestMetadata = { method, @@ -556,7 +546,7 @@ export default class Client extends EventEmitter { if (this.sdkVersion < 1) { this.startTimingRequestResponse({method, id, params}); if (this.connection) { - this.connection.fireAndForget({data: JSON.stringify(data)}); + this.connection.send(data); } return; } @@ -573,41 +563,33 @@ export default class Client extends EventEmitter { return; } if (!fromPlugin || this.isAcceptingMessagesFromPlugin(plugin)) { - this.connection!.requestResponse({ - data: JSON.stringify(data), - }).subscribe({ - onComplete: (payload) => { - if (!fromPlugin || this.isAcceptingMessagesFromPlugin(plugin)) { - const logEventName = this.getLogEventName(data); - this.logger.trackTimeSince(mark, logEventName); - emitBytesReceived(plugin || 'unknown', payload.data.length * 2); - const response: { - success?: Object; - error?: ErrorType; - } = JSON.parse(payload.data); + try { + const response = await this.connection!.sendExpectResponse(data); + if (!fromPlugin || this.isAcceptingMessagesFromPlugin(plugin)) { + const logEventName = this.getLogEventName(data); + this.logger.trackTimeSince(mark, logEventName); + emitBytesReceived(plugin || 'unknown', response.length * 2); - this.onResponse(response, resolve, reject); + this.onResponse(response, resolve, reject); - if (isFlipperMessageDebuggingEnabled()) { - registerFlipperDebugMessage({ - device: this.deviceSync?.displayTitle(), - app: this.query.app, - flipperInternalMethod: method, - payload: response, - plugin, - pluginMethod: params?.method, - direction: 'toFlipper:response', - }); - } + if (isFlipperMessageDebuggingEnabled()) { + registerFlipperDebugMessage({ + device: this.deviceSync?.displayTitle(), + app: this.query.app, + flipperInternalMethod: method, + payload: response, + plugin, + pluginMethod: params?.method, + direction: 'toFlipper:response', + }); } - }, - onError: (e) => { - // This is only called if the connection is dead. Not in expected - // and recoverable cases like a missing receiver/method. - this.disconnect(); - reject(new Error('Connection disconnected: ' + e)); - }, - }); + } + } catch (error) { + // This is only called if the connection is dead. Not in expected + // and recoverable cases like a missing receiver/method. + this.disconnect(); + reject(new Error('Unable to send, connection error: ' + error)); + } } else { reject( new Error( @@ -699,7 +681,7 @@ export default class Client extends EventEmitter { }; console.debug(data, 'message:send'); if (this.connection) { - this.connection.fireAndForget({data: JSON.stringify(data)}); + this.connection.send(data); } if (isFlipperMessageDebuggingEnabled()) { diff --git a/desktop/app/src/comms/ClientConnection.tsx b/desktop/app/src/comms/ClientConnection.tsx new file mode 100644 index 000000000..0192a8249 --- /dev/null +++ b/desktop/app/src/comms/ClientConnection.tsx @@ -0,0 +1,37 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @format + */ + +export type ErrorType = { + message: string; + stacktrace: string; + name: string; +}; + +export type ResponseType = { + success?: Object; + error?: ErrorType; + length: number; +}; + +export enum ConnectionStatus { + ERROR = 'error', + CLOSED = 'closed', + CONNECTED = 'connected', + NOT_CONNECTED = 'not_connected', + CONNECTING = 'connecting', +} + +export type ConnectionStatusChange = (status: ConnectionStatus) => void; + +export interface ClientConnection { + subscribeToEvents(subscriber: ConnectionStatusChange): void; + close(): void; + send(data: any): void; + sendExpectResponse(data: any): Promise; +} diff --git a/desktop/app/src/comms/ServerController.tsx b/desktop/app/src/comms/ServerController.tsx index 412a696df..edefcf889 100644 --- a/desktop/app/src/comms/ServerController.tsx +++ b/desktop/app/src/comms/ServerController.tsx @@ -18,7 +18,12 @@ import CertificateProvider from '../utils/CertificateProvider'; import {RSocketServer} from 'rsocket-core'; import RSocketTCPServer from 'rsocket-tcp-server'; import Client from '../Client'; -import {FlipperClientConnection} from '../Client'; +import { + ClientConnection, + ConnectionStatus, + ConnectionStatusChange, + ResponseType, +} from './ClientConnection'; import {UninitializedClient} from '../UninitializedClient'; import {reportPlatformFailures} from '../utils/metrics'; import {EventEmitter} from 'events'; @@ -43,7 +48,7 @@ import {sideEffect} from '../utils/sideEffect'; import {destroyDevice} from '../reducers/connections'; type ClientInfo = { - connection: FlipperClientConnection | null | undefined; + connection: ClientConnection | null | undefined; client: Client; }; @@ -234,7 +239,7 @@ class ServerController extends EventEmitter { if (resolvedClient) { resolvedClient.onMessage(message); } else { - client.then((c) => c.onMessage(message)); + client.then((c) => c.onMessage(message)).catch((_) => {}); } } }); @@ -242,10 +247,12 @@ class ServerController extends EventEmitter { } case 'disconnect': { const app = message.app; - (clients[app] || Promise.resolve()).then((c) => { - this.removeConnection(c.id); - delete clients[app]; - }); + (clients[app] || Promise.resolve()) + .then((c) => { + this.removeConnection(c.id); + delete clients[app]; + }) + .catch((_) => {}); break; } } @@ -297,8 +304,58 @@ class ServerController extends EventEmitter { }); } + const clientConnection: ClientConnection = { + subscribeToEvents(subscriber: ConnectionStatusChange): void { + socket.connectionStatus().subscribe({ + onNext(payload) { + let status = ConnectionStatus.CONNECTED; + + if (payload.kind == 'ERROR') status = ConnectionStatus.ERROR; + else if (payload.kind == 'CLOSED') status = ConnectionStatus.CLOSED; + else if (payload.kind == 'CONNECTED') + status = ConnectionStatus.CONNECTED; + else if (payload.kind == 'NOT_CONNECTED') + status = ConnectionStatus.NOT_CONNECTED; + else if (payload.kind == 'CONNECTING') + status = ConnectionStatus.CONNECTING; + + subscriber(status); + }, + onSubscribe(subscription) { + subscription.request(Number.MAX_SAFE_INTEGER); + }, + onError(payload) { + console.error('[client] connection status error ', payload); + }, + }); + }, + close(): void { + socket.close(); + }, + send(data: any): void { + socket.fireAndForget({data: JSON.stringify(data)}); + }, + sendExpectResponse(data: any): Promise { + return new Promise((resolve, reject) => { + socket + .requestResponse({ + data: JSON.stringify(data), + }) + .subscribe({ + onComplete: (payload: Payload) => { + const response: ResponseType = JSON.parse(payload.data); + response.length = payload.data.length; + resolve(response); + }, + onError: (e) => { + reject(e); + }, + }); + }); + }, + }; const client: Promise = this.addConnection( - socket, + clientConnection, { app, os, @@ -316,10 +373,16 @@ class ServerController extends EventEmitter { socket.connectionStatus().subscribe({ onNext(payload) { if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') { - client.then((client) => { - console.log(`Device disconnected ${client.id}`, 'server', payload); - server.removeConnection(client.id); - }); + client + .then((client) => { + console.log( + `Device disconnected ${client.id}`, + 'server', + payload, + ); + server.removeConnection(client.id); + }) + .catch((_) => {}); } }, onSubscribe(subscription) { @@ -335,9 +398,11 @@ class ServerController extends EventEmitter { if (resolvedClient) { resolvedClient.onMessage(payload.data); } else { - client.then((client) => { - client.onMessage(payload.data); - }); + client + .then((client) => { + client.onMessage(payload.data); + }) + .catch((_) => {}); } }, }; @@ -469,7 +534,7 @@ class ServerController extends EventEmitter { transformCertificateExchangeMediumToType(medium), ) .catch((e) => { - console.error(e); + console.error('Unable to process CSR. Error:', e); }); } }, @@ -495,7 +560,7 @@ class ServerController extends EventEmitter { } async addConnection( - conn: FlipperClientConnection, + conn: ClientConnection, query: ClientQuery & {medium: CertificateExchangeMedium}, csrQuery: ClientCsrQuery, ): Promise { @@ -548,31 +613,34 @@ class ServerController extends EventEmitter { connection: conn, }; - client.init().then(() => { - console.debug( - `Device client initialised: ${id}. Supported plugins: ${Array.from( - client.plugins, - ).join(', ')}`, - 'server', - ); + client + .init() + .then(() => { + console.debug( + `Device client initialised: ${id}. Supported plugins: ${Array.from( + client.plugins, + ).join(', ')}`, + 'server', + ); - /* If a device gets disconnected without being cleaned up properly, - * Flipper won't be aware until it attempts to reconnect. - * When it does we need to terminate the zombie connection. - */ - if (this.connections.has(id)) { - const connectionInfo = this.connections.get(id); - connectionInfo && - connectionInfo.connection && - connectionInfo.connection.close(); - this.removeConnection(id); - } + /* If a device gets disconnected without being cleaned up properly, + * Flipper won't be aware until it attempts to reconnect. + * When it does we need to terminate the zombie connection. + */ + if (this.connections.has(id)) { + const connectionInfo = this.connections.get(id); + connectionInfo && + connectionInfo.connection && + connectionInfo.connection.close(); + this.removeConnection(id); + } - this.connections.set(id, info); - this.emit('new-client', client); - this.emit('clients-change'); - client.emit('plugins-change'); - }); + this.connections.set(id, info); + this.emit('new-client', client); + this.emit('clients-change'); + client.emit('plugins-change'); + }) + .catch((_) => {}); return client; }); @@ -650,7 +718,7 @@ async function findDeviceForConnection( const timeout = setTimeout(() => { unsubscribe(); const error = `Timed out waiting for device ${serial} for client ${clientId}`; - console.error(error); + console.error('Unable to find device for connection. Error:', error); reject(error); }, 15000); unsubscribe = sideEffect( diff --git a/desktop/app/src/test-utils/MockFlipper.tsx b/desktop/app/src/test-utils/MockFlipper.tsx index d8e48e864..b6fe8dc11 100644 --- a/desktop/app/src/test-utils/MockFlipper.tsx +++ b/desktop/app/src/test-utils/MockFlipper.tsx @@ -11,7 +11,11 @@ import {createStore} from 'redux'; import BaseDevice from '../devices/BaseDevice'; import {createRootReducer} from '../reducers'; import {Store} from '../reducers/index'; -import Client, {ClientQuery, FlipperClientConnection} from '../Client'; +import Client, {ClientQuery} from '../Client'; +import { + ClientConnection, + ConnectionStatusChange, +} from '../comms/ClientConnection'; import {buildClientId} from '../utils/clientUtils'; import {Logger} from '../fb-interfaces/Logger'; import {PluginDefinition} from '../plugin'; @@ -235,33 +239,17 @@ export default class MockFlipper { return client; } } -function createStubConnection(): - | FlipperClientConnection - | null - | undefined { +function createStubConnection(): ClientConnection | null | undefined { return { + subscribeToEvents(_: ConnectionStatusChange) {}, close() { throw new Error('Should not be called in test'); }, - fireAndForget() { + send(_: any) { throw new Error('Should not be called in test'); }, - requestResponse() { + sendExpectResponse(_: any): Promise { throw new Error('Should not be called in test'); }, - connectionStatus() { - return { - subscribe() {}, - lift() { - throw new Error('Should not be called in test'); - }, - map() { - throw new Error('Should not be called in test'); - }, - take() { - throw new Error('Should not be called in test'); - }, - }; - }, }; } diff --git a/desktop/app/src/utils/js-client-server-utils/serverUtils.tsx b/desktop/app/src/utils/js-client-server-utils/serverUtils.tsx index c2101c004..0195e9474 100644 --- a/desktop/app/src/utils/js-client-server-utils/serverUtils.tsx +++ b/desktop/app/src/utils/js-client-server-utils/serverUtils.tsx @@ -8,19 +8,21 @@ */ import Client, {ClientQuery} from '../../Client'; -import {FlipperClientConnection} from '../../Client'; +import { + ClientConnection, + ConnectionStatus, + ConnectionStatusChange, + ResponseType, +} from '../../comms/ClientConnection'; import {ipcRenderer, remote, IpcRendererEvent} from 'electron'; import JSDevice from '../../devices/JSDevice'; import {Store} from '../../reducers'; import {Logger} from '../../fb-interfaces/Logger'; - -import {Payload, ConnectionStatus, ISubscriber} from 'rsocket-types'; -import {Flowable, Single} from 'rsocket-flowable'; import ServerController from '../../comms/ServerController'; import {buildClientId} from '../clientUtils'; import {destroyDevice} from '../../reducers/connections'; -const connections: Map> = new Map(); +const connections: Map = new Map(); const availablePlugins: Map> = new Map(); @@ -35,7 +37,7 @@ export function initJsEmulatorIPC( flipperConnections: Map< string, { - connection: FlipperClientConnection | null | undefined; + connection: ClientConnection | null | undefined; client: Client; } >, @@ -79,46 +81,47 @@ export function initJsEmulatorIPC( client: client, }); - connection.connectionStatus().subscribe({ - onNext(payload) { - if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') { - console.debug(`Device disconnected ${client.id}`, 'server'); - flipperServer.removeConnection(client.id); - destroyDevice(store, logger, jsDeviceId(windowId)); - connections.delete(windowId); - availablePlugins.delete(windowId); - } - }, - onSubscribe(subscription) { - subscription.request(Number.MAX_SAFE_INTEGER); - }, + connection.subscribeToEvents((status) => { + if ( + status == ConnectionStatus.ERROR || + status == ConnectionStatus.CLOSED + ) { + console.debug(`Device disconnected ${client.id}`, 'server'); + flipperServer.removeConnection(client.id); + destroyDevice(store, logger, jsDeviceId(windowId)); + connections.delete(windowId); + availablePlugins.delete(windowId); + } }); - client.init().then(() => { - console.log(client); - flipperServer.emit('new-client', client); - flipperServer.emit('clients-change'); - client.emit('plugins-change'); + client + .init() + .then(() => { + console.log(client); + flipperServer.emit('new-client', client); + flipperServer.emit('clients-change'); + client.emit('plugins-change'); - ipcRenderer.on( - 'from-js-emulator', - (_event: IpcRendererEvent, message: any) => { - const {command, payload} = message; - if (command === 'sendFlipperObject') { - client.onMessage( - JSON.stringify({ - params: { - api: payload.api, - method: payload.method, - params: JSON.parse(payload.params), - }, - method: 'execute', - }), - ); - } - }, - ); - }); + ipcRenderer.on( + 'from-js-emulator', + (_event: IpcRendererEvent, message: any) => { + const {command, payload} = message; + if (command === 'sendFlipperObject') { + client.onMessage( + JSON.stringify({ + params: { + api: payload.api, + method: payload.method, + params: JSON.parse(payload.params), + }, + method: 'execute', + }), + ); + } + }, + ); + }) + .catch((_) => {}); }, ); } @@ -156,65 +159,53 @@ export function launchJsEmulator(url: string, height: number, width: number) { }); } -export class JSClientFlipperConnection - implements FlipperClientConnection -{ +export class JSClientFlipperConnection implements ClientConnection { webContentsId: number; - connStatusSubscribers: Set> = new Set(); + connStatusSubscribers: Set = new Set(); connStatus: ConnectionStatus; constructor(webContentsId: number) { this.webContentsId = webContentsId; - this.connStatus = {kind: 'CONNECTED'}; + this.connStatus = ConnectionStatus.CONNECTED; } - - connectionStatus(): Flowable { - return new Flowable((subscriber) => { - subscriber.onSubscribe({ - cancel: () => { - this.connStatusSubscribers.delete(subscriber); - }, - request: (_) => { - this.connStatusSubscribers.add(subscriber); - subscriber.onNext(this.connStatus); - }, - }); - }); + subscribeToEvents(subscriber: ConnectionStatusChange): void { + this.connStatusSubscribers.add(subscriber); } - - close(): void { - this.connStatus = {kind: 'CLOSED'}; - this.connStatusSubscribers.forEach((subscriber) => { - subscriber.onNext(this.connStatus); - }); - } - - fireAndForget(payload: Payload): void { + send(data: any): void { ipcRenderer.sendTo( this.webContentsId, 'message-to-plugin', - JSON.parse(payload.data != null ? payload.data : '{}'), + JSON.parse(data != null ? data : '{}'), ); } - // TODO: fully implement and return actual result - requestResponse(payload: Payload): Single> { - return new Single((subscriber) => { - const method = - payload.data != null ? JSON.parse(payload.data).method : 'not-defined'; - if (method != 'getPlugins') { - this.fireAndForget(payload); + sendExpectResponse(data: any): Promise { + return new Promise((resolve, _) => { + const method = data != null ? JSON.parse(data).method : 'not-defined'; + + if (method !== 'getPlugins') { + this.send(data); } - subscriber.onSubscribe(() => {}); - subscriber.onComplete( - method == 'getPlugins' - ? { - data: JSON.stringify({ - success: {plugins: availablePlugins.get(this.webContentsId)}, - }), - } - : {data: JSON.stringify({success: null})}, - ); + + if (method === 'getPlugins') { + resolve({ + success: { + plugins: availablePlugins.get(this.webContentsId), + }, + length: 0, + }); + } else { + resolve({ + success: undefined, + length: 0, + }); + } + }); + } + close(): void { + this.connStatus = ConnectionStatus.CLOSED; + this.connStatusSubscribers.forEach((subscriber) => { + subscriber(this.connStatus); }); } } diff --git a/desktop/app/src/utils/js-client-server-utils/websocketClientFlipperConnection.tsx b/desktop/app/src/utils/js-client-server-utils/websocketClientFlipperConnection.tsx index 56d6887bf..985ea966a 100644 --- a/desktop/app/src/utils/js-client-server-utils/websocketClientFlipperConnection.tsx +++ b/desktop/app/src/utils/js-client-server-utils/websocketClientFlipperConnection.tsx @@ -7,71 +7,48 @@ * @format */ -import {FlipperClientConnection} from '../../Client'; -import {Flowable, Single} from 'rsocket-flowable'; -import {Payload, ConnectionStatus, ISubscriber} from 'rsocket-types'; import WebSocket from 'ws'; +import { + ConnectionStatusChange, + ConnectionStatus, + ClientConnection, + ResponseType, +} from '../../comms/ClientConnection'; -export class WebsocketClientFlipperConnection - implements FlipperClientConnection -{ +export class WebsocketClientFlipperConnection implements ClientConnection { websocket: WebSocket; - connStatusSubscribers: Set> = new Set(); + connStatusSubscribers: Set = new Set(); connStatus: ConnectionStatus; app: string; plugins: string[] | undefined = undefined; constructor(ws: WebSocket, app: string, plugins: string[]) { this.websocket = ws; - this.connStatus = {kind: 'CONNECTED'}; + this.connStatus = ConnectionStatus.CONNECTED; this.app = app; this.plugins = plugins; } - - connectionStatus(): Flowable { - return new Flowable((subscriber) => { - subscriber.onSubscribe({ - cancel: () => { - this.connStatusSubscribers.delete(subscriber); - }, - request: (_) => { - this.connStatusSubscribers.add(subscriber); - subscriber.onNext(this.connStatus); - }, - }); - }); + subscribeToEvents(subscriber: ConnectionStatusChange): void { + this.connStatusSubscribers.add(subscriber); } - - close(): void { - this.connStatus = {kind: 'CLOSED'}; - this.connStatusSubscribers.forEach((subscriber) => { - subscriber.onNext(this.connStatus); - }); - this.websocket.send(JSON.stringify({type: 'disconnect', app: this.app})); - } - - fireAndForget(payload: Payload): void { + send(data: any): void { this.websocket.send( JSON.stringify({ type: 'send', app: this.app, - payload: payload.data != null ? payload.data : {}, + payload: data != null ? data : {}, }), ); } - - requestResponse(payload: Payload): Single> { - return new Single((subscriber) => { + sendExpectResponse(data: any): Promise { + return new Promise((resolve, reject) => { const {id: callId = undefined, method = undefined} = - payload.data != null ? JSON.parse(payload.data) : {}; - - subscriber.onSubscribe(() => {}); + data != null ? data : {}; if (method === 'getPlugins' && this.plugins != null) { - subscriber.onComplete({ - data: JSON.stringify({ - success: {plugins: this.plugins}, - }), + resolve({ + success: {plugins: this.plugins}, + length: 0, }); return; } @@ -80,7 +57,7 @@ export class WebsocketClientFlipperConnection JSON.stringify({ type: 'call', app: this.app, - payload: payload.data != null ? payload.data : {}, + payload: data != null ? data : {}, }), ); @@ -88,9 +65,20 @@ export class WebsocketClientFlipperConnection const {app, payload} = JSON.parse(message); if (app === this.app && payload?.id === callId) { - subscriber.onComplete({data: JSON.stringify(payload)}); + resolve(payload); } }); + this.websocket.on('error', (error: Error) => { + reject(error); + }); }); } + + close(): void { + this.connStatus = ConnectionStatus.CLOSED; + this.connStatusSubscribers.forEach((subscriber) => { + subscriber(this.connStatus); + }); + this.websocket.send(JSON.stringify({type: 'disconnect', app: this.app})); + } }