FlipperClientConnection no longer redefines the RSocket Socket interface

Summary:
FlipperClientConnection used to define an interface which closely matched the Socket interface defined in RSocket.

Presumably it tried to 'decouple' RSocket from consumers of the client socket connection. It also limited the amount of actions that could be triggered on said socket connection.

This diff does two things:
- Renames FlipperClientConnection to ClientConnection.
- Changes the interface so that it no longer matches the RSocket Socket interface. The new interface doesn't use RSocket reactive types either.

As a result, current implementations of the interface will act as a proxy to the RSocket Socket type. The change allows the usage of other implementations such as WebSocket.

Reviewed By: fabiomassimo

Differential Revision: D29934090

fbshipit-source-id: 02742e50cd6e801310698969c81b3bf1ef0fa2a2
This commit is contained in:
Lorenzo Blasa
2021-07-27 08:28:37 -07:00
committed by Facebook GitHub Bot
parent 8bb47a38a1
commit c1496f621e
6 changed files with 307 additions and 253 deletions

View File

@@ -11,8 +11,6 @@ import {PluginDefinition} from './plugin';
import BaseDevice, {OS} from './devices/BaseDevice'; import BaseDevice, {OS} from './devices/BaseDevice';
import {Logger} from './fb-interfaces/Logger'; import {Logger} from './fb-interfaces/Logger';
import {Store} from './reducers/index'; import {Store} from './reducers/index';
import {Payload, ConnectionStatus} from 'rsocket-types';
import {Flowable, Single} from 'rsocket-flowable';
import {performance} from 'perf_hooks'; import {performance} from 'perf_hooks';
import {reportPluginFailures} from './utils/metrics'; import {reportPluginFailures} from './utils/metrics';
import {default as isProduction} from './utils/isProduction'; import {default as isProduction} from './utils/isProduction';
@@ -34,6 +32,11 @@ import {
isFlipperMessageDebuggingEnabled, isFlipperMessageDebuggingEnabled,
registerFlipperDebugMessage, registerFlipperDebugMessage,
} from './chrome/FlipperMessages'; } from './chrome/FlipperMessages';
import {
ConnectionStatus,
ErrorType,
ClientConnection,
} from './comms/ClientConnection';
type Plugins = Set<string>; type Plugins = Set<string>;
type PluginsArr = Array<string>; type PluginsArr = Array<string>;
@@ -51,7 +54,6 @@ export type ClientExport = {
query: ClientQuery; query: ClientQuery;
}; };
type ErrorType = {message: string; stacktrace: string; name: string};
type Params = { type Params = {
api: string; api: string;
method: string; method: string;
@@ -85,13 +87,6 @@ const handleError = (store: Store, device: BaseDevice, error: ErrorType) => {
crashReporterPlugin.instanceApi.reportCrash(payload); crashReporterPlugin.instanceApi.reportCrash(payload);
}; };
export interface FlipperClientConnection<D, M> {
connectionStatus(): Flowable<ConnectionStatus>;
close(): void;
fireAndForget(payload: Payload<D, M>): void;
requestResponse(payload: Payload<D, M>): Single<Payload<D, M>>;
}
export default class Client extends EventEmitter { export default class Client extends EventEmitter {
connected = createState(false); connected = createState(false);
id: string; id: string;
@@ -100,7 +95,7 @@ export default class Client extends EventEmitter {
messageIdCounter: number; messageIdCounter: number;
plugins: Plugins; plugins: Plugins;
backgroundPlugins: Plugins; backgroundPlugins: Plugins;
connection: FlipperClientConnection<any, any> | null | undefined; connection: ClientConnection | null | undefined;
store: Store; store: Store;
activePlugins: Set<string>; activePlugins: Set<string>;
freezeData = GK.get('flipper_frozen_data'); freezeData = GK.get('flipper_frozen_data');
@@ -135,7 +130,7 @@ export default class Client extends EventEmitter {
constructor( constructor(
id: string, id: string,
query: ClientQuery, query: ClientQuery,
conn: FlipperClientConnection<any, any> | null | undefined, conn: ClientConnection | null | undefined,
logger: Logger, logger: Logger,
store: Store, store: Store,
plugins: Plugins | null | undefined, plugins: Plugins | null | undefined,
@@ -161,18 +156,13 @@ export default class Client extends EventEmitter {
const client = this; const client = this;
if (conn) { if (conn) {
conn.connectionStatus().subscribe({ conn.subscribeToEvents((status) => {
onNext(payload) { if (
if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') { status === ConnectionStatus.CLOSED ||
status === ConnectionStatus.ERROR
) {
client.connected.set(false); client.connected.set(false);
} }
},
onSubscribe(subscription) {
subscription.request(Number.MAX_SAFE_INTEGER);
},
onError(payload) {
console.error('[client] connection status error ', payload);
},
}); });
} }
} }
@@ -531,7 +521,7 @@ export default class Client extends EventEmitter {
} }
rawCall<T>(method: string, fromPlugin: boolean, params?: Params): Promise<T> { rawCall<T>(method: string, fromPlugin: boolean, params?: Params): Promise<T> {
return new Promise((resolve, reject) => { return new Promise(async (resolve, reject) => {
const id = this.messageIdCounter++; const id = this.messageIdCounter++;
const metadata: RequestMetadata = { const metadata: RequestMetadata = {
method, method,
@@ -556,7 +546,7 @@ export default class Client extends EventEmitter {
if (this.sdkVersion < 1) { if (this.sdkVersion < 1) {
this.startTimingRequestResponse({method, id, params}); this.startTimingRequestResponse({method, id, params});
if (this.connection) { if (this.connection) {
this.connection.fireAndForget({data: JSON.stringify(data)}); this.connection.send(data);
} }
return; return;
} }
@@ -573,18 +563,12 @@ export default class Client extends EventEmitter {
return; return;
} }
if (!fromPlugin || this.isAcceptingMessagesFromPlugin(plugin)) { if (!fromPlugin || this.isAcceptingMessagesFromPlugin(plugin)) {
this.connection!.requestResponse({ try {
data: JSON.stringify(data), const response = await this.connection!.sendExpectResponse(data);
}).subscribe({
onComplete: (payload) => {
if (!fromPlugin || this.isAcceptingMessagesFromPlugin(plugin)) { if (!fromPlugin || this.isAcceptingMessagesFromPlugin(plugin)) {
const logEventName = this.getLogEventName(data); const logEventName = this.getLogEventName(data);
this.logger.trackTimeSince(mark, logEventName); this.logger.trackTimeSince(mark, logEventName);
emitBytesReceived(plugin || 'unknown', payload.data.length * 2); emitBytesReceived(plugin || 'unknown', response.length * 2);
const response: {
success?: Object;
error?: ErrorType;
} = JSON.parse(payload.data);
this.onResponse(response, resolve, reject); this.onResponse(response, resolve, reject);
@@ -600,14 +584,12 @@ export default class Client extends EventEmitter {
}); });
} }
} }
}, } catch (error) {
onError: (e) => {
// This is only called if the connection is dead. Not in expected // This is only called if the connection is dead. Not in expected
// and recoverable cases like a missing receiver/method. // and recoverable cases like a missing receiver/method.
this.disconnect(); this.disconnect();
reject(new Error('Connection disconnected: ' + e)); reject(new Error('Unable to send, connection error: ' + error));
}, }
});
} else { } else {
reject( reject(
new Error( new Error(
@@ -699,7 +681,7 @@ export default class Client extends EventEmitter {
}; };
console.debug(data, 'message:send'); console.debug(data, 'message:send');
if (this.connection) { if (this.connection) {
this.connection.fireAndForget({data: JSON.stringify(data)}); this.connection.send(data);
} }
if (isFlipperMessageDebuggingEnabled()) { if (isFlipperMessageDebuggingEnabled()) {

View File

@@ -0,0 +1,37 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @format
*/
export type ErrorType = {
message: string;
stacktrace: string;
name: string;
};
export type ResponseType = {
success?: Object;
error?: ErrorType;
length: number;
};
export enum ConnectionStatus {
ERROR = 'error',
CLOSED = 'closed',
CONNECTED = 'connected',
NOT_CONNECTED = 'not_connected',
CONNECTING = 'connecting',
}
export type ConnectionStatusChange = (status: ConnectionStatus) => void;
export interface ClientConnection {
subscribeToEvents(subscriber: ConnectionStatusChange): void;
close(): void;
send(data: any): void;
sendExpectResponse(data: any): Promise<ResponseType>;
}

View File

@@ -18,7 +18,12 @@ import CertificateProvider from '../utils/CertificateProvider';
import {RSocketServer} from 'rsocket-core'; import {RSocketServer} from 'rsocket-core';
import RSocketTCPServer from 'rsocket-tcp-server'; import RSocketTCPServer from 'rsocket-tcp-server';
import Client from '../Client'; import Client from '../Client';
import {FlipperClientConnection} from '../Client'; import {
ClientConnection,
ConnectionStatus,
ConnectionStatusChange,
ResponseType,
} from './ClientConnection';
import {UninitializedClient} from '../UninitializedClient'; import {UninitializedClient} from '../UninitializedClient';
import {reportPlatformFailures} from '../utils/metrics'; import {reportPlatformFailures} from '../utils/metrics';
import {EventEmitter} from 'events'; import {EventEmitter} from 'events';
@@ -43,7 +48,7 @@ import {sideEffect} from '../utils/sideEffect';
import {destroyDevice} from '../reducers/connections'; import {destroyDevice} from '../reducers/connections';
type ClientInfo = { type ClientInfo = {
connection: FlipperClientConnection<any, any> | null | undefined; connection: ClientConnection | null | undefined;
client: Client; client: Client;
}; };
@@ -234,7 +239,7 @@ class ServerController extends EventEmitter {
if (resolvedClient) { if (resolvedClient) {
resolvedClient.onMessage(message); resolvedClient.onMessage(message);
} else { } else {
client.then((c) => c.onMessage(message)); client.then((c) => c.onMessage(message)).catch((_) => {});
} }
} }
}); });
@@ -242,10 +247,12 @@ class ServerController extends EventEmitter {
} }
case 'disconnect': { case 'disconnect': {
const app = message.app; const app = message.app;
(clients[app] || Promise.resolve()).then((c) => { (clients[app] || Promise.resolve())
.then((c) => {
this.removeConnection(c.id); this.removeConnection(c.id);
delete clients[app]; delete clients[app];
}); })
.catch((_) => {});
break; break;
} }
} }
@@ -297,8 +304,58 @@ class ServerController extends EventEmitter {
}); });
} }
const clientConnection: ClientConnection = {
subscribeToEvents(subscriber: ConnectionStatusChange): void {
socket.connectionStatus().subscribe({
onNext(payload) {
let status = ConnectionStatus.CONNECTED;
if (payload.kind == 'ERROR') status = ConnectionStatus.ERROR;
else if (payload.kind == 'CLOSED') status = ConnectionStatus.CLOSED;
else if (payload.kind == 'CONNECTED')
status = ConnectionStatus.CONNECTED;
else if (payload.kind == 'NOT_CONNECTED')
status = ConnectionStatus.NOT_CONNECTED;
else if (payload.kind == 'CONNECTING')
status = ConnectionStatus.CONNECTING;
subscriber(status);
},
onSubscribe(subscription) {
subscription.request(Number.MAX_SAFE_INTEGER);
},
onError(payload) {
console.error('[client] connection status error ', payload);
},
});
},
close(): void {
socket.close();
},
send(data: any): void {
socket.fireAndForget({data: JSON.stringify(data)});
},
sendExpectResponse(data: any): Promise<any> {
return new Promise<any>((resolve, reject) => {
socket
.requestResponse({
data: JSON.stringify(data),
})
.subscribe({
onComplete: (payload: Payload<any, any>) => {
const response: ResponseType = JSON.parse(payload.data);
response.length = payload.data.length;
resolve(response);
},
onError: (e) => {
reject(e);
},
});
});
},
};
const client: Promise<Client> = this.addConnection( const client: Promise<Client> = this.addConnection(
socket, clientConnection,
{ {
app, app,
os, os,
@@ -316,10 +373,16 @@ class ServerController extends EventEmitter {
socket.connectionStatus().subscribe({ socket.connectionStatus().subscribe({
onNext(payload) { onNext(payload) {
if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') { if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') {
client.then((client) => { client
console.log(`Device disconnected ${client.id}`, 'server', payload); .then((client) => {
console.log(
`Device disconnected ${client.id}`,
'server',
payload,
);
server.removeConnection(client.id); server.removeConnection(client.id);
}); })
.catch((_) => {});
} }
}, },
onSubscribe(subscription) { onSubscribe(subscription) {
@@ -335,9 +398,11 @@ class ServerController extends EventEmitter {
if (resolvedClient) { if (resolvedClient) {
resolvedClient.onMessage(payload.data); resolvedClient.onMessage(payload.data);
} else { } else {
client.then((client) => { client
.then((client) => {
client.onMessage(payload.data); client.onMessage(payload.data);
}); })
.catch((_) => {});
} }
}, },
}; };
@@ -469,7 +534,7 @@ class ServerController extends EventEmitter {
transformCertificateExchangeMediumToType(medium), transformCertificateExchangeMediumToType(medium),
) )
.catch((e) => { .catch((e) => {
console.error(e); console.error('Unable to process CSR. Error:', e);
}); });
} }
}, },
@@ -495,7 +560,7 @@ class ServerController extends EventEmitter {
} }
async addConnection( async addConnection(
conn: FlipperClientConnection<any, any>, conn: ClientConnection,
query: ClientQuery & {medium: CertificateExchangeMedium}, query: ClientQuery & {medium: CertificateExchangeMedium},
csrQuery: ClientCsrQuery, csrQuery: ClientCsrQuery,
): Promise<Client> { ): Promise<Client> {
@@ -548,7 +613,9 @@ class ServerController extends EventEmitter {
connection: conn, connection: conn,
}; };
client.init().then(() => { client
.init()
.then(() => {
console.debug( console.debug(
`Device client initialised: ${id}. Supported plugins: ${Array.from( `Device client initialised: ${id}. Supported plugins: ${Array.from(
client.plugins, client.plugins,
@@ -572,7 +639,8 @@ class ServerController extends EventEmitter {
this.emit('new-client', client); this.emit('new-client', client);
this.emit('clients-change'); this.emit('clients-change');
client.emit('plugins-change'); client.emit('plugins-change');
}); })
.catch((_) => {});
return client; return client;
}); });
@@ -650,7 +718,7 @@ async function findDeviceForConnection(
const timeout = setTimeout(() => { const timeout = setTimeout(() => {
unsubscribe(); unsubscribe();
const error = `Timed out waiting for device ${serial} for client ${clientId}`; const error = `Timed out waiting for device ${serial} for client ${clientId}`;
console.error(error); console.error('Unable to find device for connection. Error:', error);
reject(error); reject(error);
}, 15000); }, 15000);
unsubscribe = sideEffect( unsubscribe = sideEffect(

View File

@@ -11,7 +11,11 @@ import {createStore} from 'redux';
import BaseDevice from '../devices/BaseDevice'; import BaseDevice from '../devices/BaseDevice';
import {createRootReducer} from '../reducers'; import {createRootReducer} from '../reducers';
import {Store} from '../reducers/index'; import {Store} from '../reducers/index';
import Client, {ClientQuery, FlipperClientConnection} from '../Client'; import Client, {ClientQuery} from '../Client';
import {
ClientConnection,
ConnectionStatusChange,
} from '../comms/ClientConnection';
import {buildClientId} from '../utils/clientUtils'; import {buildClientId} from '../utils/clientUtils';
import {Logger} from '../fb-interfaces/Logger'; import {Logger} from '../fb-interfaces/Logger';
import {PluginDefinition} from '../plugin'; import {PluginDefinition} from '../plugin';
@@ -235,33 +239,17 @@ export default class MockFlipper {
return client; return client;
} }
} }
function createStubConnection(): function createStubConnection(): ClientConnection | null | undefined {
| FlipperClientConnection<any, any>
| null
| undefined {
return { return {
subscribeToEvents(_: ConnectionStatusChange) {},
close() { close() {
throw new Error('Should not be called in test'); throw new Error('Should not be called in test');
}, },
fireAndForget() { send(_: any) {
throw new Error('Should not be called in test'); throw new Error('Should not be called in test');
}, },
requestResponse() { sendExpectResponse(_: any): Promise<ResponseType> {
throw new Error('Should not be called in test'); throw new Error('Should not be called in test');
}, },
connectionStatus() {
return {
subscribe() {},
lift() {
throw new Error('Should not be called in test');
},
map() {
throw new Error('Should not be called in test');
},
take() {
throw new Error('Should not be called in test');
},
};
},
}; };
} }

View File

@@ -8,19 +8,21 @@
*/ */
import Client, {ClientQuery} from '../../Client'; import Client, {ClientQuery} from '../../Client';
import {FlipperClientConnection} from '../../Client'; import {
ClientConnection,
ConnectionStatus,
ConnectionStatusChange,
ResponseType,
} from '../../comms/ClientConnection';
import {ipcRenderer, remote, IpcRendererEvent} from 'electron'; import {ipcRenderer, remote, IpcRendererEvent} from 'electron';
import JSDevice from '../../devices/JSDevice'; import JSDevice from '../../devices/JSDevice';
import {Store} from '../../reducers'; import {Store} from '../../reducers';
import {Logger} from '../../fb-interfaces/Logger'; import {Logger} from '../../fb-interfaces/Logger';
import {Payload, ConnectionStatus, ISubscriber} from 'rsocket-types';
import {Flowable, Single} from 'rsocket-flowable';
import ServerController from '../../comms/ServerController'; import ServerController from '../../comms/ServerController';
import {buildClientId} from '../clientUtils'; import {buildClientId} from '../clientUtils';
import {destroyDevice} from '../../reducers/connections'; import {destroyDevice} from '../../reducers/connections';
const connections: Map<number, JSClientFlipperConnection<any>> = new Map(); const connections: Map<number, JSClientFlipperConnection> = new Map();
const availablePlugins: Map<number, Array<string>> = new Map(); const availablePlugins: Map<number, Array<string>> = new Map();
@@ -35,7 +37,7 @@ export function initJsEmulatorIPC(
flipperConnections: Map< flipperConnections: Map<
string, string,
{ {
connection: FlipperClientConnection<any, any> | null | undefined; connection: ClientConnection | null | undefined;
client: Client; client: Client;
} }
>, >,
@@ -79,22 +81,22 @@ export function initJsEmulatorIPC(
client: client, client: client,
}); });
connection.connectionStatus().subscribe({ connection.subscribeToEvents((status) => {
onNext(payload) { if (
if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') { status == ConnectionStatus.ERROR ||
status == ConnectionStatus.CLOSED
) {
console.debug(`Device disconnected ${client.id}`, 'server'); console.debug(`Device disconnected ${client.id}`, 'server');
flipperServer.removeConnection(client.id); flipperServer.removeConnection(client.id);
destroyDevice(store, logger, jsDeviceId(windowId)); destroyDevice(store, logger, jsDeviceId(windowId));
connections.delete(windowId); connections.delete(windowId);
availablePlugins.delete(windowId); availablePlugins.delete(windowId);
} }
},
onSubscribe(subscription) {
subscription.request(Number.MAX_SAFE_INTEGER);
},
}); });
client.init().then(() => { client
.init()
.then(() => {
console.log(client); console.log(client);
flipperServer.emit('new-client', client); flipperServer.emit('new-client', client);
flipperServer.emit('clients-change'); flipperServer.emit('clients-change');
@@ -118,7 +120,8 @@ export function initJsEmulatorIPC(
} }
}, },
); );
}); })
.catch((_) => {});
}, },
); );
} }
@@ -156,65 +159,53 @@ export function launchJsEmulator(url: string, height: number, width: number) {
}); });
} }
export class JSClientFlipperConnection<M> export class JSClientFlipperConnection implements ClientConnection {
implements FlipperClientConnection<string, M>
{
webContentsId: number; webContentsId: number;
connStatusSubscribers: Set<ISubscriber<ConnectionStatus>> = new Set(); connStatusSubscribers: Set<ConnectionStatusChange> = new Set();
connStatus: ConnectionStatus; connStatus: ConnectionStatus;
constructor(webContentsId: number) { constructor(webContentsId: number) {
this.webContentsId = webContentsId; this.webContentsId = webContentsId;
this.connStatus = {kind: 'CONNECTED'}; this.connStatus = ConnectionStatus.CONNECTED;
} }
subscribeToEvents(subscriber: ConnectionStatusChange): void {
connectionStatus(): Flowable<ConnectionStatus> {
return new Flowable<ConnectionStatus>((subscriber) => {
subscriber.onSubscribe({
cancel: () => {
this.connStatusSubscribers.delete(subscriber);
},
request: (_) => {
this.connStatusSubscribers.add(subscriber); this.connStatusSubscribers.add(subscriber);
subscriber.onNext(this.connStatus);
},
});
});
} }
send(data: any): void {
close(): void {
this.connStatus = {kind: 'CLOSED'};
this.connStatusSubscribers.forEach((subscriber) => {
subscriber.onNext(this.connStatus);
});
}
fireAndForget(payload: Payload<string, M>): void {
ipcRenderer.sendTo( ipcRenderer.sendTo(
this.webContentsId, this.webContentsId,
'message-to-plugin', 'message-to-plugin',
JSON.parse(payload.data != null ? payload.data : '{}'), JSON.parse(data != null ? data : '{}'),
); );
} }
// TODO: fully implement and return actual result
sendExpectResponse(data: any): Promise<ResponseType> {
return new Promise((resolve, _) => {
const method = data != null ? JSON.parse(data).method : 'not-defined';
if (method !== 'getPlugins') {
this.send(data);
}
// TODO: fully implement and return actual result if (method === 'getPlugins') {
requestResponse(payload: Payload<string, M>): Single<Payload<string, M>> { resolve({
return new Single((subscriber) => { success: {
const method = plugins: availablePlugins.get(this.webContentsId),
payload.data != null ? JSON.parse(payload.data).method : 'not-defined'; },
if (method != 'getPlugins') { length: 0,
this.fireAndForget(payload); });
} else {
resolve({
success: undefined,
length: 0,
});
} }
subscriber.onSubscribe(() => {}); });
subscriber.onComplete(
method == 'getPlugins'
? {
data: JSON.stringify({
success: {plugins: availablePlugins.get(this.webContentsId)},
}),
} }
: {data: JSON.stringify({success: null})}, close(): void {
); this.connStatus = ConnectionStatus.CLOSED;
this.connStatusSubscribers.forEach((subscriber) => {
subscriber(this.connStatus);
}); });
} }
} }

View File

@@ -7,71 +7,48 @@
* @format * @format
*/ */
import {FlipperClientConnection} from '../../Client';
import {Flowable, Single} from 'rsocket-flowable';
import {Payload, ConnectionStatus, ISubscriber} from 'rsocket-types';
import WebSocket from 'ws'; import WebSocket from 'ws';
import {
ConnectionStatusChange,
ConnectionStatus,
ClientConnection,
ResponseType,
} from '../../comms/ClientConnection';
export class WebsocketClientFlipperConnection<M> export class WebsocketClientFlipperConnection implements ClientConnection {
implements FlipperClientConnection<string, M>
{
websocket: WebSocket; websocket: WebSocket;
connStatusSubscribers: Set<ISubscriber<ConnectionStatus>> = new Set(); connStatusSubscribers: Set<ConnectionStatusChange> = new Set();
connStatus: ConnectionStatus; connStatus: ConnectionStatus;
app: string; app: string;
plugins: string[] | undefined = undefined; plugins: string[] | undefined = undefined;
constructor(ws: WebSocket, app: string, plugins: string[]) { constructor(ws: WebSocket, app: string, plugins: string[]) {
this.websocket = ws; this.websocket = ws;
this.connStatus = {kind: 'CONNECTED'}; this.connStatus = ConnectionStatus.CONNECTED;
this.app = app; this.app = app;
this.plugins = plugins; this.plugins = plugins;
} }
subscribeToEvents(subscriber: ConnectionStatusChange): void {
connectionStatus(): Flowable<ConnectionStatus> {
return new Flowable<ConnectionStatus>((subscriber) => {
subscriber.onSubscribe({
cancel: () => {
this.connStatusSubscribers.delete(subscriber);
},
request: (_) => {
this.connStatusSubscribers.add(subscriber); this.connStatusSubscribers.add(subscriber);
subscriber.onNext(this.connStatus);
},
});
});
} }
send(data: any): void {
close(): void {
this.connStatus = {kind: 'CLOSED'};
this.connStatusSubscribers.forEach((subscriber) => {
subscriber.onNext(this.connStatus);
});
this.websocket.send(JSON.stringify({type: 'disconnect', app: this.app}));
}
fireAndForget(payload: Payload<string, M>): void {
this.websocket.send( this.websocket.send(
JSON.stringify({ JSON.stringify({
type: 'send', type: 'send',
app: this.app, app: this.app,
payload: payload.data != null ? payload.data : {}, payload: data != null ? data : {},
}), }),
); );
} }
sendExpectResponse(data: any): Promise<ResponseType> {
requestResponse(payload: Payload<string, M>): Single<Payload<string, M>> { return new Promise((resolve, reject) => {
return new Single((subscriber) => {
const {id: callId = undefined, method = undefined} = const {id: callId = undefined, method = undefined} =
payload.data != null ? JSON.parse(payload.data) : {}; data != null ? data : {};
subscriber.onSubscribe(() => {});
if (method === 'getPlugins' && this.plugins != null) { if (method === 'getPlugins' && this.plugins != null) {
subscriber.onComplete({ resolve({
data: JSON.stringify({
success: {plugins: this.plugins}, success: {plugins: this.plugins},
}), length: 0,
}); });
return; return;
} }
@@ -80,7 +57,7 @@ export class WebsocketClientFlipperConnection<M>
JSON.stringify({ JSON.stringify({
type: 'call', type: 'call',
app: this.app, app: this.app,
payload: payload.data != null ? payload.data : {}, payload: data != null ? data : {},
}), }),
); );
@@ -88,9 +65,20 @@ export class WebsocketClientFlipperConnection<M>
const {app, payload} = JSON.parse(message); const {app, payload} = JSON.parse(message);
if (app === this.app && payload?.id === callId) { if (app === this.app && payload?.id === callId) {
subscriber.onComplete({data: JSON.stringify(payload)}); resolve(payload);
} }
}); });
this.websocket.on('error', (error: Error) => {
reject(error);
});
}); });
} }
close(): void {
this.connStatus = ConnectionStatus.CLOSED;
this.connStatusSubscribers.forEach((subscriber) => {
subscriber(this.connStatus);
});
this.websocket.send(JSON.stringify({type: 'disconnect', app: this.app}));
}
} }