From c1496f621ec26290205b591dc51b01df4d810d92 Mon Sep 17 00:00:00 2001 From: Lorenzo Blasa Date: Tue, 27 Jul 2021 08:28:37 -0700 Subject: [PATCH] FlipperClientConnection no longer redefines the RSocket Socket interface Summary: FlipperClientConnection used to define an interface which closely matched the Socket interface defined in RSocket. Presumably it tried to 'decouple' RSocket from consumers of the client socket connection. It also limited the amount of actions that could be triggered on said socket connection. This diff does two things: - Renames FlipperClientConnection to ClientConnection. - Changes the interface so that it no longer matches the RSocket Socket interface. The new interface doesn't use RSocket reactive types either. As a result, current implementations of the interface will act as a proxy to the RSocket Socket type. The change allows the usage of other implementations such as WebSocket. Reviewed By: fabiomassimo Differential Revision: D29934090 fbshipit-source-id: 02742e50cd6e801310698969c81b3bf1ef0fa2a2 --- desktop/app/src/Client.tsx | 100 +++++------ desktop/app/src/comms/ClientConnection.tsx | 37 ++++ desktop/app/src/comms/ServerController.tsx | 150 +++++++++++----- desktop/app/src/test-utils/MockFlipper.tsx | 30 +--- .../js-client-server-utils/serverUtils.tsx | 167 +++++++++--------- .../websocketClientFlipperConnection.tsx | 76 ++++---- 6 files changed, 307 insertions(+), 253 deletions(-) create mode 100644 desktop/app/src/comms/ClientConnection.tsx diff --git a/desktop/app/src/Client.tsx b/desktop/app/src/Client.tsx index f1b5d50cc..8a70f18ac 100644 --- a/desktop/app/src/Client.tsx +++ b/desktop/app/src/Client.tsx @@ -11,8 +11,6 @@ import {PluginDefinition} from './plugin'; import BaseDevice, {OS} from './devices/BaseDevice'; import {Logger} from './fb-interfaces/Logger'; import {Store} from './reducers/index'; -import {Payload, ConnectionStatus} from 'rsocket-types'; -import {Flowable, Single} from 'rsocket-flowable'; import {performance} from 'perf_hooks'; import {reportPluginFailures} from './utils/metrics'; import {default as isProduction} from './utils/isProduction'; @@ -34,6 +32,11 @@ import { isFlipperMessageDebuggingEnabled, registerFlipperDebugMessage, } from './chrome/FlipperMessages'; +import { + ConnectionStatus, + ErrorType, + ClientConnection, +} from './comms/ClientConnection'; type Plugins = Set; type PluginsArr = Array; @@ -51,7 +54,6 @@ export type ClientExport = { query: ClientQuery; }; -type ErrorType = {message: string; stacktrace: string; name: string}; type Params = { api: string; method: string; @@ -85,13 +87,6 @@ const handleError = (store: Store, device: BaseDevice, error: ErrorType) => { crashReporterPlugin.instanceApi.reportCrash(payload); }; -export interface FlipperClientConnection { - connectionStatus(): Flowable; - close(): void; - fireAndForget(payload: Payload): void; - requestResponse(payload: Payload): Single>; -} - export default class Client extends EventEmitter { connected = createState(false); id: string; @@ -100,7 +95,7 @@ export default class Client extends EventEmitter { messageIdCounter: number; plugins: Plugins; backgroundPlugins: Plugins; - connection: FlipperClientConnection | null | undefined; + connection: ClientConnection | null | undefined; store: Store; activePlugins: Set; freezeData = GK.get('flipper_frozen_data'); @@ -135,7 +130,7 @@ export default class Client extends EventEmitter { constructor( id: string, query: ClientQuery, - conn: FlipperClientConnection | null | undefined, + conn: ClientConnection | null | undefined, logger: Logger, store: Store, plugins: Plugins | null | undefined, @@ -161,18 +156,13 @@ export default class Client extends EventEmitter { const client = this; if (conn) { - conn.connectionStatus().subscribe({ - onNext(payload) { - if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') { - client.connected.set(false); - } - }, - onSubscribe(subscription) { - subscription.request(Number.MAX_SAFE_INTEGER); - }, - onError(payload) { - console.error('[client] connection status error ', payload); - }, + conn.subscribeToEvents((status) => { + if ( + status === ConnectionStatus.CLOSED || + status === ConnectionStatus.ERROR + ) { + client.connected.set(false); + } }); } } @@ -531,7 +521,7 @@ export default class Client extends EventEmitter { } rawCall(method: string, fromPlugin: boolean, params?: Params): Promise { - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { const id = this.messageIdCounter++; const metadata: RequestMetadata = { method, @@ -556,7 +546,7 @@ export default class Client extends EventEmitter { if (this.sdkVersion < 1) { this.startTimingRequestResponse({method, id, params}); if (this.connection) { - this.connection.fireAndForget({data: JSON.stringify(data)}); + this.connection.send(data); } return; } @@ -573,41 +563,33 @@ export default class Client extends EventEmitter { return; } if (!fromPlugin || this.isAcceptingMessagesFromPlugin(plugin)) { - this.connection!.requestResponse({ - data: JSON.stringify(data), - }).subscribe({ - onComplete: (payload) => { - if (!fromPlugin || this.isAcceptingMessagesFromPlugin(plugin)) { - const logEventName = this.getLogEventName(data); - this.logger.trackTimeSince(mark, logEventName); - emitBytesReceived(plugin || 'unknown', payload.data.length * 2); - const response: { - success?: Object; - error?: ErrorType; - } = JSON.parse(payload.data); + try { + const response = await this.connection!.sendExpectResponse(data); + if (!fromPlugin || this.isAcceptingMessagesFromPlugin(plugin)) { + const logEventName = this.getLogEventName(data); + this.logger.trackTimeSince(mark, logEventName); + emitBytesReceived(plugin || 'unknown', response.length * 2); - this.onResponse(response, resolve, reject); + this.onResponse(response, resolve, reject); - if (isFlipperMessageDebuggingEnabled()) { - registerFlipperDebugMessage({ - device: this.deviceSync?.displayTitle(), - app: this.query.app, - flipperInternalMethod: method, - payload: response, - plugin, - pluginMethod: params?.method, - direction: 'toFlipper:response', - }); - } + if (isFlipperMessageDebuggingEnabled()) { + registerFlipperDebugMessage({ + device: this.deviceSync?.displayTitle(), + app: this.query.app, + flipperInternalMethod: method, + payload: response, + plugin, + pluginMethod: params?.method, + direction: 'toFlipper:response', + }); } - }, - onError: (e) => { - // This is only called if the connection is dead. Not in expected - // and recoverable cases like a missing receiver/method. - this.disconnect(); - reject(new Error('Connection disconnected: ' + e)); - }, - }); + } + } catch (error) { + // This is only called if the connection is dead. Not in expected + // and recoverable cases like a missing receiver/method. + this.disconnect(); + reject(new Error('Unable to send, connection error: ' + error)); + } } else { reject( new Error( @@ -699,7 +681,7 @@ export default class Client extends EventEmitter { }; console.debug(data, 'message:send'); if (this.connection) { - this.connection.fireAndForget({data: JSON.stringify(data)}); + this.connection.send(data); } if (isFlipperMessageDebuggingEnabled()) { diff --git a/desktop/app/src/comms/ClientConnection.tsx b/desktop/app/src/comms/ClientConnection.tsx new file mode 100644 index 000000000..0192a8249 --- /dev/null +++ b/desktop/app/src/comms/ClientConnection.tsx @@ -0,0 +1,37 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @format + */ + +export type ErrorType = { + message: string; + stacktrace: string; + name: string; +}; + +export type ResponseType = { + success?: Object; + error?: ErrorType; + length: number; +}; + +export enum ConnectionStatus { + ERROR = 'error', + CLOSED = 'closed', + CONNECTED = 'connected', + NOT_CONNECTED = 'not_connected', + CONNECTING = 'connecting', +} + +export type ConnectionStatusChange = (status: ConnectionStatus) => void; + +export interface ClientConnection { + subscribeToEvents(subscriber: ConnectionStatusChange): void; + close(): void; + send(data: any): void; + sendExpectResponse(data: any): Promise; +} diff --git a/desktop/app/src/comms/ServerController.tsx b/desktop/app/src/comms/ServerController.tsx index 412a696df..edefcf889 100644 --- a/desktop/app/src/comms/ServerController.tsx +++ b/desktop/app/src/comms/ServerController.tsx @@ -18,7 +18,12 @@ import CertificateProvider from '../utils/CertificateProvider'; import {RSocketServer} from 'rsocket-core'; import RSocketTCPServer from 'rsocket-tcp-server'; import Client from '../Client'; -import {FlipperClientConnection} from '../Client'; +import { + ClientConnection, + ConnectionStatus, + ConnectionStatusChange, + ResponseType, +} from './ClientConnection'; import {UninitializedClient} from '../UninitializedClient'; import {reportPlatformFailures} from '../utils/metrics'; import {EventEmitter} from 'events'; @@ -43,7 +48,7 @@ import {sideEffect} from '../utils/sideEffect'; import {destroyDevice} from '../reducers/connections'; type ClientInfo = { - connection: FlipperClientConnection | null | undefined; + connection: ClientConnection | null | undefined; client: Client; }; @@ -234,7 +239,7 @@ class ServerController extends EventEmitter { if (resolvedClient) { resolvedClient.onMessage(message); } else { - client.then((c) => c.onMessage(message)); + client.then((c) => c.onMessage(message)).catch((_) => {}); } } }); @@ -242,10 +247,12 @@ class ServerController extends EventEmitter { } case 'disconnect': { const app = message.app; - (clients[app] || Promise.resolve()).then((c) => { - this.removeConnection(c.id); - delete clients[app]; - }); + (clients[app] || Promise.resolve()) + .then((c) => { + this.removeConnection(c.id); + delete clients[app]; + }) + .catch((_) => {}); break; } } @@ -297,8 +304,58 @@ class ServerController extends EventEmitter { }); } + const clientConnection: ClientConnection = { + subscribeToEvents(subscriber: ConnectionStatusChange): void { + socket.connectionStatus().subscribe({ + onNext(payload) { + let status = ConnectionStatus.CONNECTED; + + if (payload.kind == 'ERROR') status = ConnectionStatus.ERROR; + else if (payload.kind == 'CLOSED') status = ConnectionStatus.CLOSED; + else if (payload.kind == 'CONNECTED') + status = ConnectionStatus.CONNECTED; + else if (payload.kind == 'NOT_CONNECTED') + status = ConnectionStatus.NOT_CONNECTED; + else if (payload.kind == 'CONNECTING') + status = ConnectionStatus.CONNECTING; + + subscriber(status); + }, + onSubscribe(subscription) { + subscription.request(Number.MAX_SAFE_INTEGER); + }, + onError(payload) { + console.error('[client] connection status error ', payload); + }, + }); + }, + close(): void { + socket.close(); + }, + send(data: any): void { + socket.fireAndForget({data: JSON.stringify(data)}); + }, + sendExpectResponse(data: any): Promise { + return new Promise((resolve, reject) => { + socket + .requestResponse({ + data: JSON.stringify(data), + }) + .subscribe({ + onComplete: (payload: Payload) => { + const response: ResponseType = JSON.parse(payload.data); + response.length = payload.data.length; + resolve(response); + }, + onError: (e) => { + reject(e); + }, + }); + }); + }, + }; const client: Promise = this.addConnection( - socket, + clientConnection, { app, os, @@ -316,10 +373,16 @@ class ServerController extends EventEmitter { socket.connectionStatus().subscribe({ onNext(payload) { if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') { - client.then((client) => { - console.log(`Device disconnected ${client.id}`, 'server', payload); - server.removeConnection(client.id); - }); + client + .then((client) => { + console.log( + `Device disconnected ${client.id}`, + 'server', + payload, + ); + server.removeConnection(client.id); + }) + .catch((_) => {}); } }, onSubscribe(subscription) { @@ -335,9 +398,11 @@ class ServerController extends EventEmitter { if (resolvedClient) { resolvedClient.onMessage(payload.data); } else { - client.then((client) => { - client.onMessage(payload.data); - }); + client + .then((client) => { + client.onMessage(payload.data); + }) + .catch((_) => {}); } }, }; @@ -469,7 +534,7 @@ class ServerController extends EventEmitter { transformCertificateExchangeMediumToType(medium), ) .catch((e) => { - console.error(e); + console.error('Unable to process CSR. Error:', e); }); } }, @@ -495,7 +560,7 @@ class ServerController extends EventEmitter { } async addConnection( - conn: FlipperClientConnection, + conn: ClientConnection, query: ClientQuery & {medium: CertificateExchangeMedium}, csrQuery: ClientCsrQuery, ): Promise { @@ -548,31 +613,34 @@ class ServerController extends EventEmitter { connection: conn, }; - client.init().then(() => { - console.debug( - `Device client initialised: ${id}. Supported plugins: ${Array.from( - client.plugins, - ).join(', ')}`, - 'server', - ); + client + .init() + .then(() => { + console.debug( + `Device client initialised: ${id}. Supported plugins: ${Array.from( + client.plugins, + ).join(', ')}`, + 'server', + ); - /* If a device gets disconnected without being cleaned up properly, - * Flipper won't be aware until it attempts to reconnect. - * When it does we need to terminate the zombie connection. - */ - if (this.connections.has(id)) { - const connectionInfo = this.connections.get(id); - connectionInfo && - connectionInfo.connection && - connectionInfo.connection.close(); - this.removeConnection(id); - } + /* If a device gets disconnected without being cleaned up properly, + * Flipper won't be aware until it attempts to reconnect. + * When it does we need to terminate the zombie connection. + */ + if (this.connections.has(id)) { + const connectionInfo = this.connections.get(id); + connectionInfo && + connectionInfo.connection && + connectionInfo.connection.close(); + this.removeConnection(id); + } - this.connections.set(id, info); - this.emit('new-client', client); - this.emit('clients-change'); - client.emit('plugins-change'); - }); + this.connections.set(id, info); + this.emit('new-client', client); + this.emit('clients-change'); + client.emit('plugins-change'); + }) + .catch((_) => {}); return client; }); @@ -650,7 +718,7 @@ async function findDeviceForConnection( const timeout = setTimeout(() => { unsubscribe(); const error = `Timed out waiting for device ${serial} for client ${clientId}`; - console.error(error); + console.error('Unable to find device for connection. Error:', error); reject(error); }, 15000); unsubscribe = sideEffect( diff --git a/desktop/app/src/test-utils/MockFlipper.tsx b/desktop/app/src/test-utils/MockFlipper.tsx index d8e48e864..b6fe8dc11 100644 --- a/desktop/app/src/test-utils/MockFlipper.tsx +++ b/desktop/app/src/test-utils/MockFlipper.tsx @@ -11,7 +11,11 @@ import {createStore} from 'redux'; import BaseDevice from '../devices/BaseDevice'; import {createRootReducer} from '../reducers'; import {Store} from '../reducers/index'; -import Client, {ClientQuery, FlipperClientConnection} from '../Client'; +import Client, {ClientQuery} from '../Client'; +import { + ClientConnection, + ConnectionStatusChange, +} from '../comms/ClientConnection'; import {buildClientId} from '../utils/clientUtils'; import {Logger} from '../fb-interfaces/Logger'; import {PluginDefinition} from '../plugin'; @@ -235,33 +239,17 @@ export default class MockFlipper { return client; } } -function createStubConnection(): - | FlipperClientConnection - | null - | undefined { +function createStubConnection(): ClientConnection | null | undefined { return { + subscribeToEvents(_: ConnectionStatusChange) {}, close() { throw new Error('Should not be called in test'); }, - fireAndForget() { + send(_: any) { throw new Error('Should not be called in test'); }, - requestResponse() { + sendExpectResponse(_: any): Promise { throw new Error('Should not be called in test'); }, - connectionStatus() { - return { - subscribe() {}, - lift() { - throw new Error('Should not be called in test'); - }, - map() { - throw new Error('Should not be called in test'); - }, - take() { - throw new Error('Should not be called in test'); - }, - }; - }, }; } diff --git a/desktop/app/src/utils/js-client-server-utils/serverUtils.tsx b/desktop/app/src/utils/js-client-server-utils/serverUtils.tsx index c2101c004..0195e9474 100644 --- a/desktop/app/src/utils/js-client-server-utils/serverUtils.tsx +++ b/desktop/app/src/utils/js-client-server-utils/serverUtils.tsx @@ -8,19 +8,21 @@ */ import Client, {ClientQuery} from '../../Client'; -import {FlipperClientConnection} from '../../Client'; +import { + ClientConnection, + ConnectionStatus, + ConnectionStatusChange, + ResponseType, +} from '../../comms/ClientConnection'; import {ipcRenderer, remote, IpcRendererEvent} from 'electron'; import JSDevice from '../../devices/JSDevice'; import {Store} from '../../reducers'; import {Logger} from '../../fb-interfaces/Logger'; - -import {Payload, ConnectionStatus, ISubscriber} from 'rsocket-types'; -import {Flowable, Single} from 'rsocket-flowable'; import ServerController from '../../comms/ServerController'; import {buildClientId} from '../clientUtils'; import {destroyDevice} from '../../reducers/connections'; -const connections: Map> = new Map(); +const connections: Map = new Map(); const availablePlugins: Map> = new Map(); @@ -35,7 +37,7 @@ export function initJsEmulatorIPC( flipperConnections: Map< string, { - connection: FlipperClientConnection | null | undefined; + connection: ClientConnection | null | undefined; client: Client; } >, @@ -79,46 +81,47 @@ export function initJsEmulatorIPC( client: client, }); - connection.connectionStatus().subscribe({ - onNext(payload) { - if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') { - console.debug(`Device disconnected ${client.id}`, 'server'); - flipperServer.removeConnection(client.id); - destroyDevice(store, logger, jsDeviceId(windowId)); - connections.delete(windowId); - availablePlugins.delete(windowId); - } - }, - onSubscribe(subscription) { - subscription.request(Number.MAX_SAFE_INTEGER); - }, + connection.subscribeToEvents((status) => { + if ( + status == ConnectionStatus.ERROR || + status == ConnectionStatus.CLOSED + ) { + console.debug(`Device disconnected ${client.id}`, 'server'); + flipperServer.removeConnection(client.id); + destroyDevice(store, logger, jsDeviceId(windowId)); + connections.delete(windowId); + availablePlugins.delete(windowId); + } }); - client.init().then(() => { - console.log(client); - flipperServer.emit('new-client', client); - flipperServer.emit('clients-change'); - client.emit('plugins-change'); + client + .init() + .then(() => { + console.log(client); + flipperServer.emit('new-client', client); + flipperServer.emit('clients-change'); + client.emit('plugins-change'); - ipcRenderer.on( - 'from-js-emulator', - (_event: IpcRendererEvent, message: any) => { - const {command, payload} = message; - if (command === 'sendFlipperObject') { - client.onMessage( - JSON.stringify({ - params: { - api: payload.api, - method: payload.method, - params: JSON.parse(payload.params), - }, - method: 'execute', - }), - ); - } - }, - ); - }); + ipcRenderer.on( + 'from-js-emulator', + (_event: IpcRendererEvent, message: any) => { + const {command, payload} = message; + if (command === 'sendFlipperObject') { + client.onMessage( + JSON.stringify({ + params: { + api: payload.api, + method: payload.method, + params: JSON.parse(payload.params), + }, + method: 'execute', + }), + ); + } + }, + ); + }) + .catch((_) => {}); }, ); } @@ -156,65 +159,53 @@ export function launchJsEmulator(url: string, height: number, width: number) { }); } -export class JSClientFlipperConnection - implements FlipperClientConnection -{ +export class JSClientFlipperConnection implements ClientConnection { webContentsId: number; - connStatusSubscribers: Set> = new Set(); + connStatusSubscribers: Set = new Set(); connStatus: ConnectionStatus; constructor(webContentsId: number) { this.webContentsId = webContentsId; - this.connStatus = {kind: 'CONNECTED'}; + this.connStatus = ConnectionStatus.CONNECTED; } - - connectionStatus(): Flowable { - return new Flowable((subscriber) => { - subscriber.onSubscribe({ - cancel: () => { - this.connStatusSubscribers.delete(subscriber); - }, - request: (_) => { - this.connStatusSubscribers.add(subscriber); - subscriber.onNext(this.connStatus); - }, - }); - }); + subscribeToEvents(subscriber: ConnectionStatusChange): void { + this.connStatusSubscribers.add(subscriber); } - - close(): void { - this.connStatus = {kind: 'CLOSED'}; - this.connStatusSubscribers.forEach((subscriber) => { - subscriber.onNext(this.connStatus); - }); - } - - fireAndForget(payload: Payload): void { + send(data: any): void { ipcRenderer.sendTo( this.webContentsId, 'message-to-plugin', - JSON.parse(payload.data != null ? payload.data : '{}'), + JSON.parse(data != null ? data : '{}'), ); } - // TODO: fully implement and return actual result - requestResponse(payload: Payload): Single> { - return new Single((subscriber) => { - const method = - payload.data != null ? JSON.parse(payload.data).method : 'not-defined'; - if (method != 'getPlugins') { - this.fireAndForget(payload); + sendExpectResponse(data: any): Promise { + return new Promise((resolve, _) => { + const method = data != null ? JSON.parse(data).method : 'not-defined'; + + if (method !== 'getPlugins') { + this.send(data); } - subscriber.onSubscribe(() => {}); - subscriber.onComplete( - method == 'getPlugins' - ? { - data: JSON.stringify({ - success: {plugins: availablePlugins.get(this.webContentsId)}, - }), - } - : {data: JSON.stringify({success: null})}, - ); + + if (method === 'getPlugins') { + resolve({ + success: { + plugins: availablePlugins.get(this.webContentsId), + }, + length: 0, + }); + } else { + resolve({ + success: undefined, + length: 0, + }); + } + }); + } + close(): void { + this.connStatus = ConnectionStatus.CLOSED; + this.connStatusSubscribers.forEach((subscriber) => { + subscriber(this.connStatus); }); } } diff --git a/desktop/app/src/utils/js-client-server-utils/websocketClientFlipperConnection.tsx b/desktop/app/src/utils/js-client-server-utils/websocketClientFlipperConnection.tsx index 56d6887bf..985ea966a 100644 --- a/desktop/app/src/utils/js-client-server-utils/websocketClientFlipperConnection.tsx +++ b/desktop/app/src/utils/js-client-server-utils/websocketClientFlipperConnection.tsx @@ -7,71 +7,48 @@ * @format */ -import {FlipperClientConnection} from '../../Client'; -import {Flowable, Single} from 'rsocket-flowable'; -import {Payload, ConnectionStatus, ISubscriber} from 'rsocket-types'; import WebSocket from 'ws'; +import { + ConnectionStatusChange, + ConnectionStatus, + ClientConnection, + ResponseType, +} from '../../comms/ClientConnection'; -export class WebsocketClientFlipperConnection - implements FlipperClientConnection -{ +export class WebsocketClientFlipperConnection implements ClientConnection { websocket: WebSocket; - connStatusSubscribers: Set> = new Set(); + connStatusSubscribers: Set = new Set(); connStatus: ConnectionStatus; app: string; plugins: string[] | undefined = undefined; constructor(ws: WebSocket, app: string, plugins: string[]) { this.websocket = ws; - this.connStatus = {kind: 'CONNECTED'}; + this.connStatus = ConnectionStatus.CONNECTED; this.app = app; this.plugins = plugins; } - - connectionStatus(): Flowable { - return new Flowable((subscriber) => { - subscriber.onSubscribe({ - cancel: () => { - this.connStatusSubscribers.delete(subscriber); - }, - request: (_) => { - this.connStatusSubscribers.add(subscriber); - subscriber.onNext(this.connStatus); - }, - }); - }); + subscribeToEvents(subscriber: ConnectionStatusChange): void { + this.connStatusSubscribers.add(subscriber); } - - close(): void { - this.connStatus = {kind: 'CLOSED'}; - this.connStatusSubscribers.forEach((subscriber) => { - subscriber.onNext(this.connStatus); - }); - this.websocket.send(JSON.stringify({type: 'disconnect', app: this.app})); - } - - fireAndForget(payload: Payload): void { + send(data: any): void { this.websocket.send( JSON.stringify({ type: 'send', app: this.app, - payload: payload.data != null ? payload.data : {}, + payload: data != null ? data : {}, }), ); } - - requestResponse(payload: Payload): Single> { - return new Single((subscriber) => { + sendExpectResponse(data: any): Promise { + return new Promise((resolve, reject) => { const {id: callId = undefined, method = undefined} = - payload.data != null ? JSON.parse(payload.data) : {}; - - subscriber.onSubscribe(() => {}); + data != null ? data : {}; if (method === 'getPlugins' && this.plugins != null) { - subscriber.onComplete({ - data: JSON.stringify({ - success: {plugins: this.plugins}, - }), + resolve({ + success: {plugins: this.plugins}, + length: 0, }); return; } @@ -80,7 +57,7 @@ export class WebsocketClientFlipperConnection JSON.stringify({ type: 'call', app: this.app, - payload: payload.data != null ? payload.data : {}, + payload: data != null ? data : {}, }), ); @@ -88,9 +65,20 @@ export class WebsocketClientFlipperConnection const {app, payload} = JSON.parse(message); if (app === this.app && payload?.id === callId) { - subscriber.onComplete({data: JSON.stringify(payload)}); + resolve(payload); } }); + this.websocket.on('error', (error: Error) => { + reject(error); + }); }); } + + close(): void { + this.connStatus = ConnectionStatus.CLOSED; + this.connStatusSubscribers.forEach((subscriber) => { + subscriber(this.connStatus); + }); + this.websocket.send(JSON.stringify({type: 'disconnect', app: this.app})); + } }