ServerController switch to use the refactored components

Summary:
This is the change that will pull the previous diffs together by making ServerController use the refactored components.

There are no logical changes.

The only additions are the ServerEventsListener interface implementation that re-use logic that was previously scattered around this same type.

Reviewed By: passy

Differential Revision: D29989270

fbshipit-source-id: 9ed7ff84ae8cd8ca46fad0db0737456eb705241c
This commit is contained in:
Lorenzo Blasa
2021-07-30 05:58:51 -07:00
committed by Facebook GitHub Bot
parent c850113193
commit 2a39738ece

View File

@@ -7,49 +7,33 @@
* @format
*/
import {
SecureServerConfig,
CertificateExchangeMedium,
} from '../utils/CertificateProvider';
import {CertificateExchangeMedium} from '../utils/CertificateProvider';
import {Logger} from '../fb-interfaces/Logger';
import {ClientQuery} from '../Client';
import {Store, State} from '../reducers/index';
import CertificateProvider from '../utils/CertificateProvider';
import {RSocketServer} from 'rsocket-core';
import RSocketTCPServer from 'rsocket-tcp-server';
import Client from '../Client';
import {
ClientConnection,
ConnectionStatus,
ConnectionStatusChange,
ResponseType,
} from './ClientConnection';
import {ClientConnection, ConnectionStatus} from './ClientConnection';
import {UninitializedClient} from '../UninitializedClient';
import {reportPlatformFailures} from '../utils/metrics';
import {EventEmitter} from 'events';
import invariant from 'invariant';
import tls from 'tls';
import net, {Socket} from 'net';
import {Responder, Payload, ReactiveSocket} from 'rsocket-types';
import constants from '../fb-stubs/constants';
import GK from '../fb-stubs/GK';
import {initJsEmulatorIPC} from '../utils/js-client-server-utils/serverUtils';
import {buildClientId} from '../utils/clientUtils';
import {Single} from 'rsocket-flowable';
import WebSocket from 'ws';
import JSDevice from '../devices/JSDevice';
import {BrowserClientFlipperConnection} from './BrowserClientFlipperConnection';
import querystring from 'querystring';
import {IncomingMessage} from 'http';
import ws from 'ws';
import DummyDevice from '../devices/DummyDevice';
import BaseDevice from '../devices/BaseDevice';
import {sideEffect} from '../utils/sideEffect';
import {destroyDevice} from '../reducers/connections';
import {
appNameWithUpdateHint,
transformCertificateExchangeMediumToType,
} from './Utilities';
import ServerAdapter, {
SecureClientQuery,
ServerEventsListener,
} from './ServerAdapter';
import {createBrowserServer, createServer} from './ServerFactory';
type ClientInfo = {
connection: ClientConnection | null | undefined;
@@ -67,16 +51,30 @@ declare interface ServerController {
on(event: 'clients-change', callback: () => void): this;
}
class ServerController extends EventEmitter {
/**
* Responsible of creating and managing the actual underlying servers:
* - Insecure (used for certificate exchange)
* - Secure (used for secure communication between the client and server)
* - Browser (only ever used between Desktop and a local Browser)
*
* Additionally, it manages client connections.
*/
class ServerController extends EventEmitter implements ServerEventsListener {
connections: Map<string, ClientInfo>;
secureServer: Promise<RSocketServer<any, any>> | null;
insecureServer: Promise<RSocketServer<any, any>> | null;
initialized: Promise<void> | null;
secureServer: Promise<ServerAdapter> | null;
insecureServer: Promise<ServerAdapter> | null;
browserServer: Promise<ServerAdapter> | null;
certificateProvider: CertificateProvider;
connectionTracker: ConnectionTracker;
logger: Logger;
store: Store;
initialisePromise: Promise<void> | null;
timeHandler: NodeJS.Timeout | undefined;
constructor(logger: Logger, store: Store) {
super();
this.logger = logger;
@@ -85,257 +83,63 @@ class ServerController extends EventEmitter {
this.connectionTracker = new ConnectionTracker(logger);
this.secureServer = null;
this.insecureServer = null;
this.initialisePromise = null;
this.browserServer = null;
this.initialized = null;
this.store = store;
this.timeHandler = undefined;
}
/**
* Loads the secure server configuration and starts any necessary servers.
* Initialisation is complete once the initialized promise is fullfilled at
* which point Flipper is accepting connections.
*/
init() {
const {insecure, secure} = this.store.getState().application.serverPorts;
this.initialisePromise = this.certificateProvider
this.initialized = this.certificateProvider
.loadSecureServerConfig()
.then(
(options) => (this.secureServer = this.startServer(secure, options)),
(options) => (this.secureServer = createServer(secure, this, options)),
)
.then(() => {
this.insecureServer = this.startServer(insecure);
this.insecureServer = createServer(insecure, this);
return;
});
if (GK.get('comet_enable_flipper_connection')) {
this.startWsServer(8333);
this.browserServer = createBrowserServer(8333, this);
}
reportPlatformFailures(this.initialisePromise, 'initializeServer');
reportPlatformFailures(this.initialized, 'initializeServer');
if (GK.get('flipper_js_client_emulator')) {
initJsEmulatorIPC(this.store, this.logger, this, this.connections);
}
return this.initialisePromise;
return this.initialized;
}
startServer(
port: number,
sslConfig?: SecureServerConfig,
): Promise<RSocketServer<any, any>> {
const server = this;
return new Promise((resolve, reject) => {
let rsServer: RSocketServer<any, any> | undefined; // eslint-disable-line prefer-const
const serverFactory = (onConnect: (socket: Socket) => void) => {
const transportServer = sslConfig
? tls.createServer(sslConfig, (socket) => {
onConnect(socket);
})
: net.createServer(onConnect);
transportServer
.on('error', (err) => {
server.emit('error', err);
console.error(`Error opening server on port ${port}`, 'server');
reject(err);
})
.on('listening', () => {
console.debug(
`${
sslConfig ? 'Secure' : 'Certificate'
} server started on port ${port}`,
'server',
);
server.emit('listening', port);
resolve(rsServer!);
});
return transportServer;
};
rsServer = new RSocketServer({
getRequestHandler: sslConfig
? this._trustedRequestHandler
: this._untrustedRequestHandler,
transport: new RSocketTCPServer({
port: port,
serverFactory: serverFactory,
}),
});
rsServer && rsServer.start();
});
}
startWsServer(port: number) {
const wss = new ws.Server({
host: 'localhost',
port,
verifyClient: (info: {
origin: string;
req: IncomingMessage;
secure: boolean;
}) => {
return constants.VALID_WEB_SOCKET_REQUEST_ORIGIN_PREFIXES.some(
(validPrefix) => info.origin.startsWith(validPrefix),
);
},
});
wss.on('connection', (ws: WebSocket, message: any) => {
const clients: {[app: string]: Promise<Client>} = {};
const query = querystring.decode(message.url.split('?')[1]);
const deviceId: string =
typeof query.deviceId === 'string' ? query.deviceId : 'webbrowser';
const device =
typeof query.device === 'string' ? query.device : 'WebSocket';
this.store.dispatch({
type: 'REGISTER_DEVICE',
payload: new JSDevice(deviceId, device, 1),
});
const cleanup = () => {
Object.values(clients).map((p) =>
p.then((c) => this.removeConnection(c.id)),
);
destroyDevice(this.store, this.logger, deviceId);
};
ws.on('message', (rawMessage: any) => {
const message = JSON.parse(rawMessage.toString());
switch (message.type) {
case 'connect': {
const app = message.app;
const plugins = message.plugins;
let resolvedClient: Client | null = null;
const client = this.addConnection(
new BrowserClientFlipperConnection(ws, app, plugins),
{
app,
os: 'JSWebApp',
device: 'device',
device_id: deviceId,
// if plugins != null -> we are using old api, where we send the list of plugins with connect message
sdk_version: plugins == null ? 4 : 1,
medium: 'FS_ACCESS',
},
{},
).then((c) => (resolvedClient = c));
clients[app] = client;
ws.on('message', (m: any) => {
const parsed = JSON.parse(m.toString());
// non-null payload id means response to prev request, it's handled in connection
if (parsed.app === app && parsed.payload?.id == null) {
const message = JSON.stringify(parsed.payload);
if (resolvedClient) {
resolvedClient.onMessage(message);
} else {
client.then((c) => c.onMessage(message)).catch((_) => {});
}
}
});
break;
}
case 'disconnect': {
const app = message.app;
(clients[app] || Promise.resolve())
.then((c) => {
this.removeConnection(c.id);
delete clients[app];
})
.catch((_) => {});
break;
}
}
});
ws.on('close', () => {
cleanup();
});
ws.on('error', (error) => {
console.error('[server] ws connection error ', error);
cleanup();
});
});
wss.on('error', (_ws: WebSocket, e: any) => {
console.error('error from wss' + e);
});
}
_trustedRequestHandler = (
socket: ReactiveSocket<string, any>,
payload: Payload<string, any>,
): Partial<Responder<string, any>> => {
const server = this;
if (!payload.data) {
return {};
/**
* If initialized, it stops any started servers.
*/
async close() {
if (this.initialized && (await this.initialized)) {
await Promise.all([
this.insecureServer && (await this.insecureServer).stop(),
this.secureServer && (await this.secureServer).stop(),
this.browserServer && (await this.browserServer).stop(),
]);
}
if (this.timeHandler) {
clearTimeout(this.timeHandler);
}
const clientData: ClientQuery &
ClientCsrQuery & {medium: number | undefined} = JSON.parse(payload.data);
this.logger.track('usage', 'trusted-request-handler-called', {
app: clientData.app,
os: clientData.os,
device: clientData.device,
device_id: clientData.device_id,
medium: clientData.medium,
});
this.connectionTracker.logConnectionAttempt(clientData);
}
onConnectionCreated(
clientQuery: SecureClientQuery,
clientConnection: ClientConnection,
): Promise<Client> {
const {app, os, device, device_id, sdk_version, csr, csr_path, medium} =
clientData;
clientQuery;
const transformedMedium = transformCertificateExchangeMediumToType(medium);
if (transformedMedium === 'WWW') {
this.store.dispatch({
type: 'REGISTER_DEVICE',
payload: new DummyDevice(device_id, app + ' Server Exchanged', os),
});
}
const clientConnection: ClientConnection = {
subscribeToEvents(subscriber: ConnectionStatusChange): void {
socket.connectionStatus().subscribe({
onNext(payload) {
let status = ConnectionStatus.CONNECTED;
if (payload.kind == 'ERROR') status = ConnectionStatus.ERROR;
else if (payload.kind == 'CLOSED') status = ConnectionStatus.CLOSED;
else if (payload.kind == 'CONNECTED')
status = ConnectionStatus.CONNECTED;
else if (payload.kind == 'NOT_CONNECTED')
status = ConnectionStatus.NOT_CONNECTED;
else if (payload.kind == 'CONNECTING')
status = ConnectionStatus.CONNECTING;
subscriber(status);
},
onSubscribe(subscription) {
subscription.request(Number.MAX_SAFE_INTEGER);
},
onError(payload) {
console.error('[client] connection status error ', payload);
},
});
},
close(): void {
socket.close();
},
send(data: any): void {
socket.fireAndForget({data: JSON.stringify(data)});
},
sendExpectResponse(data: any): Promise<any> {
return new Promise<any>((resolve, reject) => {
socket
.requestResponse({
data: JSON.stringify(data),
})
.subscribe({
onComplete: (payload: Payload<any, any>) => {
const response: ResponseType = JSON.parse(payload.data);
response.length = payload.data.length;
resolve(response);
},
onError: (e) => {
reject(e);
},
});
});
},
};
const client: Promise<Client> = this.addConnection(
return this.addConnection(
clientConnection,
{
app,
@@ -346,202 +150,124 @@ class ServerController extends EventEmitter {
medium: transformedMedium,
},
{csr, csr_path},
).then((client) => {
return (resolvedClient = client);
});
let resolvedClient: Client | undefined;
);
}
socket.connectionStatus().subscribe({
onNext(payload) {
if (payload.kind == 'ERROR' || payload.kind == 'CLOSED') {
client
.then((client) => {
console.log(
`Device disconnected ${client.id}`,
'server',
payload,
);
server.removeConnection(client.id);
})
.catch((_) => {});
}
},
onSubscribe(subscription) {
subscription.request(Number.MAX_SAFE_INTEGER);
},
onError(error) {
console.error('[server] connection status error ', error);
},
});
onConnectionClosed(clientId: string) {
this.removeConnection(clientId);
}
return {
fireAndForget: (payload: {data: string}) => {
if (resolvedClient) {
resolvedClient.onMessage(payload.data);
} else {
client
.then((client) => {
client.onMessage(payload.data);
})
.catch((_) => {});
}
},
};
};
onListening(port: number): void {
this.emit('listening', port);
}
_untrustedRequestHandler = (
_socket: ReactiveSocket<string, any>,
payload: Payload<string, any>,
): Partial<Responder<string, any>> => {
if (!payload.data) {
return {};
onSecureConnectionAttempt(clientQuery: SecureClientQuery): void {
this.logger.track('usage', 'trusted-request-handler-called', clientQuery);
this.connectionTracker.logConnectionAttempt(clientQuery);
if (this.timeHandler) {
clearTimeout(this.timeHandler);
}
const clientData: ClientQuery = JSON.parse(payload.data);
this.logger.track('usage', 'untrusted-request-handler-called', clientData);
this.connectionTracker.logConnectionAttempt(clientData);
const transformedMedium = transformCertificateExchangeMediumToType(
clientQuery.medium,
);
if (transformedMedium === 'WWW') {
this.store.dispatch({
type: 'REGISTER_DEVICE',
payload: new DummyDevice(
clientQuery.device_id,
clientQuery.app + ' Server Exchanged',
clientQuery.os,
),
});
}
}
onConnectionAttempt(clientQuery: ClientQuery): void {
this.logger.track('usage', 'untrusted-request-handler-called', clientQuery);
this.connectionTracker.logConnectionAttempt(clientQuery);
const client: UninitializedClient = {
os: clientData.os,
deviceName: clientData.device,
appName: appNameWithUpdateHint(clientData),
os: clientQuery.os,
deviceName: clientQuery.device,
appName: appNameWithUpdateHint(clientQuery),
};
this.emit('start-client-setup', client);
return {
requestResponse: (
payload: Payload<string, any>,
): Single<Payload<string, any>> => {
if (typeof payload.data !== 'string') {
return new Single((_) => {});
}
let rawData;
try {
rawData = JSON.parse(payload.data);
} catch (err) {
console.error(
`Invalid JSON: ${payload.data}`,
'clientMessage',
'server',
);
return new Single((_) => {});
}
const json: {
method: 'signCertificate';
csr: string;
destination: string;
medium: number | undefined; // OSS's older Client SDK might not send medium information. This is not an issue for internal FB users, as Flipper release is insync with client SDK through launcher.
} = rawData;
if (json.method === 'signCertificate') {
console.debug('CSR received from device', 'server');
const {csr, destination, medium} = json;
return new Single((subscriber) => {
subscriber.onSubscribe(undefined);
reportPlatformFailures(
this.certificateProvider.processCertificateSigningRequest(
csr,
clientData.os,
destination,
transformCertificateExchangeMediumToType(medium),
),
'processCertificateSigningRequest',
)
.then((result) => {
subscriber.onComplete({
data: JSON.stringify({
deviceId: result.deviceId,
}),
metadata: '',
});
this.timeHandler = setTimeout(() => {
// Fire notification
this.emit('client-unresponsive-error', {
client,
medium: transformCertificateExchangeMediumToType(medium),
deviceID: result.deviceId,
});
}, 30 * 1000);
this.emit('finish-client-setup', {
client,
deviceId: result.deviceId,
});
})
.catch((e) => {
subscriber.onError(e);
this.emit('client-setup-error', {client, error: e});
});
});
}
return new Single((_) => {});
},
// Leaving this here for a while for backwards compatibility,
// but for up to date SDKs it will no longer used.
// We can delete it after the SDK change has been using requestResponse for a few weeks.
fireAndForget: (payload: Payload<string, any>) => {
if (typeof payload.data !== 'string') {
return;
}
let json:
| {
method: 'signCertificate';
csr: string;
destination: string;
medium: number | undefined;
}
| undefined;
try {
json = JSON.parse(payload.data);
} catch (err) {
console.error(`Invalid JSON: ${payload.data}`, 'server');
return;
}
if (json && json.method === 'signCertificate') {
console.debug('CSR received from device', 'server');
const {csr, destination, medium} = json;
this.certificateProvider
.processCertificateSigningRequest(
csr,
clientData.os,
destination,
transformCertificateExchangeMediumToType(medium),
)
.catch((e) => {
console.error('Unable to process CSR. Error:', e);
});
}
},
};
};
close(): Promise<void> {
if (this.initialisePromise) {
return this.initialisePromise.then((_) => {
return Promise.all([
this.secureServer &&
this.secureServer.then((server) => server.stop()),
this.insecureServer &&
this.insecureServer.then((server) => server.stop()),
]).then(() => undefined);
if (clientQuery.os === 'JSWebApp') {
this.store.dispatch({
type: 'REGISTER_DEVICE',
payload: new JSDevice(clientQuery.device_id, clientQuery.app, 1),
});
}
return Promise.resolve();
}
onProcessCSR(
unsanitizedCSR: string,
clientQuery: ClientQuery,
appDirectory: string,
medium: CertificateExchangeMedium,
): Promise<{deviceId: string}> {
return new Promise((resolve, reject) => {
reportPlatformFailures(
this.certificateProvider.processCertificateSigningRequest(
unsanitizedCSR,
clientQuery.os,
appDirectory,
medium,
),
'processCertificateSigningRequest',
)
.then((response) => {
const client: UninitializedClient = {
os: clientQuery.os,
deviceName: clientQuery.device,
appName: appNameWithUpdateHint(clientQuery),
};
// TODO: if multiple clients are establishing a connection
// at the same time, then this unresponsive timeout can potentially
// lead to errors. For example, client A starts connectiving followed
// by client B. Client B timeHandler will override client A, thus, if
// client A takes longer, then the unresponsive timeout will not be called
// for it.
this.timeHandler = setTimeout(() => {
this.emit('client-unresponsive-error', {
client,
medium,
deviceID: response.deviceId,
});
}, 30 * 1000);
this.emit('finish-client-setup', {
client,
deviceId: response.deviceId,
});
resolve(response);
})
.catch((error) => {
reject(error);
});
});
}
onError(error: Error): void {
this.emit('error', error);
}
toJSON() {
return null;
}
/**
* Creates a Client and sets the underlying connection.
* @param connection A client connection to communicate between server and client.
* @param query The client query created from the initial handshake.
* @param csrQuery The CSR query which contains CSR related information.
*/
async addConnection(
conn: ClientConnection,
connection: ClientConnection,
query: ClientQuery & {medium: CertificateExchangeMedium},
csrQuery: ClientCsrQuery,
): Promise<Client> {
@@ -550,81 +276,90 @@ class ServerController extends EventEmitter {
// try to get id by comparing giving `csr` to file from `csr_path`
// otherwise, use given device_id
const {csr_path, csr} = csrQuery;
// For iOS we do not need to confirm the device id, as it never changes unlike android.
return (
csr_path && csr && query.os != 'iOS'
? this.certificateProvider
.extractAppNameFromCSR(csr)
.then((appName) => {
return this.certificateProvider.getTargetDeviceId(
query.os,
appName,
csr_path,
csr,
);
})
: Promise.resolve(query.device_id)
).then(async (csrId) => {
query.device_id = csrId;
query.app = appNameWithUpdateHint(query);
const id = buildClientId({
app: query.app,
os: query.os,
device: query.device,
device_id: csrId,
});
console.debug(`Device connected: ${id}`, 'server');
const device =
getDeviceBySerial(this.store.getState(), query.device_id) ??
(await findDeviceForConnection(this.store, query.app, query.device_id));
const client = new Client(
id,
query,
conn,
this.logger,
this.store,
undefined,
device,
if (csr_path && csr && query.os != 'iOS') {
const app_name = await this.certificateProvider.extractAppNameFromCSR(
csr,
);
query.device_id = await this.certificateProvider.getTargetDeviceId(
query.os,
app_name,
csr_path,
csr,
);
}
const info = {
client,
connection: conn,
};
query.app = appNameWithUpdateHint(query);
client
.init()
.then(() => {
console.debug(
`Device client initialised: ${id}. Supported plugins: ${Array.from(
client.plugins,
).join(', ')}`,
'server',
);
/* If a device gets disconnected without being cleaned up properly,
* Flipper won't be aware until it attempts to reconnect.
* When it does we need to terminate the zombie connection.
*/
if (this.connections.has(id)) {
const connectionInfo = this.connections.get(id);
connectionInfo &&
connectionInfo.connection &&
connectionInfo.connection.close();
this.removeConnection(id);
}
this.connections.set(id, info);
this.emit('new-client', client);
this.emit('clients-change');
client.emit('plugins-change');
})
.catch((_) => {});
return client;
const id = buildClientId({
app: query.app,
os: query.os,
device: query.device,
device_id: query.device_id,
});
console.debug(`Device connected: ${id}`, 'server');
const device =
getDeviceBySerial(this.store.getState(), query.device_id) ??
(await findDeviceForConnection(this.store, query.app, query.device_id));
const client = new Client(
id,
query,
connection,
this.logger,
this.store,
undefined,
device,
);
const info = {
client,
connection: connection,
};
await client.init();
connection.subscribeToEvents((status: ConnectionStatus) => {
if (
status === ConnectionStatus.CLOSED ||
status === ConnectionStatus.ERROR
) {
this.onConnectionClosed(client.id);
}
});
console.debug(
`Device client initialized: ${id}. Supported plugins: ${Array.from(
client.plugins,
).join(', ')}`,
'server',
);
/* If a device gets disconnected without being cleaned up properly,
* Flipper won't be aware until it attempts to reconnect.
* When it does we need to terminate the zombie connection.
*/
if (this.connections.has(id)) {
const connectionInfo = this.connections.get(id);
if (connectionInfo) {
if (
connectionInfo.connection &&
connectionInfo.connection !== connection
) {
connectionInfo.connection.close();
this.removeConnection(id);
}
}
}
this.connections.set(id, info);
this.emit('new-client', client);
this.emit('clients-change');
client.emit('plugins-change');
return client;
}
attachFakeClient(client: Client) {
@@ -634,6 +369,11 @@ class ServerController extends EventEmitter {
});
}
/**
* Removes a client connection by disconnecting it, if still connected
* and then deleting it from the tracked connections.
* @param id The client connection identifier.
*/
removeConnection(id: string) {
const info = this.connections.get(id);
if (info) {