Refactor server implementation for WebSockets

Summary:
Standardize WS implementation for JS environments.

Why do we need a separate server implementation for browsers?
Browser targets cannot authenticate via the default certificate exchange flow. For browser targets we verify the origin instead.
Moreover, for already forgotten reasons the initial implementation of the WS server for browsers used a different kind of message structure and added extra `connect`/`disconnect` messages. After examination, it seems the `connect`/`disconnect` flow is redundant.

Major changes:
1. Updated class hierarchy for WS server implementations.
2. Updated browser WS server to support the modern and the legacy protocols.
3. Now a websocket connection with the device is closed on error. The idea is it is highly unlikely to handle any subsequent messages properly once we observe an error. It is better to bail and reconnect. What do you think?

Reviewed By: mweststrate

Differential Revision: D31532172

fbshipit-source-id: f86aa63a40efe4d5263353cc124fac8c63b80e45
This commit is contained in:
Andrey Goncharov
2021-10-21 03:30:41 -07:00
committed by Facebook GitHub Bot
parent 6bd4a07286
commit 37498ad5a9
24 changed files with 1584 additions and 725 deletions

View File

@@ -7,198 +7,224 @@
* @format
*/
import ServerWebSocketBase from './ServerWebSocketBase';
import WebSocket from 'ws';
import ws from 'ws';
import {SecureClientQuery, ServerEventsListener} from './ServerAdapter';
import querystring from 'querystring';
import {
ClientConnection,
ConnectionStatus,
ConnectionStatusChange,
} from './ClientConnection';
import {IncomingMessage} from 'http';
import ServerAdapter from './ServerAdapter';
import WebSocket, {
AddressInfo,
Server as WSServer,
VerifyClientCallbackSync,
} from 'ws';
import {createServer as createHttpsServer} from 'https';
import {createServer as createHttpServer} from 'http';
import querystring from 'querystring';
import {ClientQuery} from 'flipper-common';
import {
ClientDescription,
ClientErrorType,
ClientQuery,
DeviceOS,
} from 'flipper-common';
import {cloneClientQuerySafeForLogging} from './Utilities';
assertNotNull,
parseClientQuery,
parseMessageToJson,
verifyClientQueryComesFromCertExchangeSupportedOS,
} from './Utilities';
import {SecureServerConfig} from '../utils/CertificateProvider';
import {Server} from 'net';
import {serializeError} from 'serialize-error';
import {WSCloseCode} from '../utils/WSCloseCode';
export interface ConnectionCtx {
clientQuery?: ClientQuery;
ws: WebSocket;
request: IncomingMessage;
}
/**
* WebSocket-based server.
* It serves as a base class for WebSocket based servers. It delegates the 'connection'
* event to subclasses as a customisation point.
*/
class ServerWebSocket extends ServerWebSocketBase {
constructor(listener: ServerEventsListener) {
super(listener);
class ServerWebSocket extends ServerAdapter {
protected wsServer?: WSServer;
private httpServer?: Server;
async start(port: number, sslConfig?: SecureServerConfig): Promise<number> {
const assignedPort = await new Promise<number>((resolve, reject) => {
const server = sslConfig
? createHttpsServer(sslConfig)
: createHttpServer();
const wsServer = new WSServer({
server,
verifyClient: this.verifyClient(),
});
// We do not need to listen to http server's `error` because it is propagated to WS
// https://github.com/websockets/ws/blob/a3a22e4ed39c1a3be8e727e9c630dd440edc61dd/lib/websocket-server.js#L109
const onConnectionError = (error: Error) => {
console.error(`[conn] Unable to start server at port ${port}`, error);
this.listener.onError(error);
reject(
new Error(
`Unable to start server at port ${port} due to ${JSON.stringify(
serializeError(error),
)}`,
),
);
};
wsServer.once('error', onConnectionError);
server.listen(port, () => {
console.debug(
`${sslConfig ? 'Secure' : 'Insecure'} server started on port ${port}`,
'server',
);
// Unsubscribe connection error listener. We'll attach a permanent error listener later
wsServer.off('error', onConnectionError);
this.listener.onListening(port);
this.wsServer = wsServer;
this.httpServer = server;
resolve((server.address() as AddressInfo).port);
});
});
assertNotNull(this.wsServer);
assertNotNull(this.httpServer);
this.wsServer.on(
'connection',
(ws: WebSocket, request: IncomingMessage) => {
ws.on('error', (error) => {
console.error('[conn] WS connection error:', error);
this.listener.onError(error);
});
try {
this.onConnection(ws, request);
} catch (error) {
// If an exception is thrown, an `error` event is not emitted automatically.
// We need to explicitly handle the error and emit an error manually.
// If we leave it unhanled, the process just dies
// https://replit.com/@aigoncharov/WS-error-handling#index.js
ws.emit('error', error);
// TODO: Investigate if we need to close the socket in the `error` listener
// DRI: @aigoncharov
ws.close(WSCloseCode.InternalError);
}
},
);
this.wsServer.on('error', (error) => {
console.error('[conn] WS server error:', error);
this.listener.onError(error);
});
return assignedPort;
}
/**
* Client verification is not necessary. The connected client has
* already been verified using its certificate signed by the server.
* @returns
*/
verifyClient(): ws.VerifyClientCallbackSync {
return (_info: {origin: string; req: IncomingMessage; secure: boolean}) => {
return true;
};
async stop(): Promise<void> {
if (!this.wsServer) {
return;
}
await new Promise<void>((resolve, reject) => {
console.info('[conn] Stopping WS server');
assertNotNull(this.wsServer);
this.wsServer.close((err) => {
if (err) {
reject(err);
return;
}
resolve();
});
});
await new Promise<void>((resolve, reject) => {
console.info('[conn] Stopping HTTP server');
assertNotNull(this.httpServer);
this.httpServer.close((err) => {
if (err) {
reject(err);
return;
}
resolve();
});
});
}
/**
* A connection has been established between the server and a client. Only ever used for
* certificate exchange.
*
* @param ws An active WebSocket.
* @param message Incoming request message.
* @param request Incoming request message.
*/
onConnection(ws: WebSocket, message: any): void {
const query = querystring.decode(message.url.split('?')[1]);
const clientQuery = this._parseClientQuery(query);
onConnection(ws: WebSocket, request: IncomingMessage): void {
const ctx: ConnectionCtx = {ws, request};
this.handleClientQuery(ctx);
this.handleConnectionAttempt(ctx);
ws.on('message', async (message: unknown) => {
try {
const parsedMessage = this.handleMessageDeserialization(message);
// Successful deserialization is a proof that the message is a string
this.handleMessage(ctx, parsedMessage, message as string);
} catch (error) {
// See the reasoning in the error handler for a `connection` event
ws.emit('error', error);
ws.close(WSCloseCode.InternalError);
}
});
}
protected handleClientQuery(ctx: ConnectionCtx): void {
const {request} = ctx;
const query = querystring.decode(request.url!.split('?')[1]);
const clientQuery = this.parseClientQuery(query);
if (!clientQuery) {
console.warn(
console.error(
'[conn] Unable to extract the client query from the request URL.',
);
throw new Error(
'[conn] Unable to extract the client query from the request URL.',
);
ws.close();
return;
}
ctx.clientQuery = clientQuery;
}
protected handleConnectionAttempt(ctx: ConnectionCtx): void {
const {clientQuery} = ctx;
assertNotNull(clientQuery);
console.info(
`[conn] Insecure websocket connection attempt: ${clientQuery.app} on ${clientQuery.device_id}.`,
clientQuery,
);
this.listener.onConnectionAttempt(clientQuery);
ws.on('message', async (message: any) => {
const json = JSON.parse(message.toString());
const response = await this._onHandleUntrustedMessage(clientQuery, json);
if (response) {
ws.send(response);
}
});
}
/**
* A secure connection has been established between the server and a client. Once a client
* has a valid certificate, it can use a secure connection with Flipper and start exchanging
* messages.
* @param _ws An active WebSocket.
* @param message Incoming request message.
*/
onSecureConnection(ws: WebSocket, message: any): void {
const query = querystring.decode(message.url.split('?')[1]);
const clientQuery = this._parseSecureClientQuery(query);
if (!clientQuery) {
console.warn(
'[conn] Unable to extract the client query from the request URL.',
);
ws.close();
return;
protected handleMessageDeserialization(message: unknown): object {
const parsedMessage = parseMessageToJson(message);
if (!parsedMessage) {
console.error('[conn] Failed to parse message', message);
// TODO: Create custom DeserializationError
throw new Error(`[conn] Failed to parse message`);
}
console.info(
`[conn] Secure websocket connection attempt: ${clientQuery.app} on ${clientQuery.device_id}. Medium ${clientQuery.medium}. CSR: ${clientQuery.csr_path}`,
cloneClientQuerySafeForLogging(clientQuery),
);
this.listener.onSecureConnectionAttempt(clientQuery);
const pendingRequests: Map<
number,
{
resolve: (data: any) => void;
reject: (err: Error) => void;
}
> = new Map();
const clientConnection: ClientConnection = {
subscribeToEvents(subscriber: ConnectionStatusChange): void {
ws.on('close', () => subscriber(ConnectionStatus.CLOSED));
ws.on('error', () => subscriber(ConnectionStatus.ERROR));
},
close(): void {
ws.close();
},
send(data: any): void {
ws.send(JSON.stringify(data));
},
sendExpectResponse(data: any): Promise<any> {
return new Promise((resolve, reject) => {
pendingRequests.set(data.id, {reject, resolve});
ws.send(JSON.stringify(data));
});
},
};
let resolvedClient: ClientDescription | undefined;
const client: Promise<ClientDescription> =
this.listener.onConnectionCreated(clientQuery, clientConnection);
client
.then((client) => (resolvedClient = client))
.catch((e) => {
console.error(
`[conn] Failed to resolve client ${clientQuery.app} on ${clientQuery.device_id} medium ${clientQuery.medium}`,
e,
);
});
ws.on('message', (message: any) => {
let json: any | undefined;
try {
json = JSON.parse(message);
} catch (err) {
console.warn(`Invalid JSON: ${message}`, 'clientMessage');
return;
}
const data: {
id?: number;
success?: Object | undefined;
error?: ClientErrorType | undefined;
} = json;
if (data.hasOwnProperty('id') && data.id !== undefined) {
const callbacks = pendingRequests.get(data.id);
if (!callbacks) {
return;
}
pendingRequests.delete(data.id);
if (data.success) {
callbacks.resolve && callbacks.resolve(data);
} else if (data.error) {
callbacks.reject && callbacks.reject(data.error);
}
} else {
if (resolvedClient) {
this.listener.onClientMessage(resolvedClient.id, message);
} else {
client &&
client
.then((client) => {
this.listener.onClientMessage(client.id, message);
})
.catch((e) => {
console.warn(
'Could not deliver message, client did not resolve. ',
e,
);
});
}
}
});
return parsedMessage;
}
/**
* Validates a string as being one of those defined as valid OS.
* @param str An input string.
*/
private isOS(str: string): str is DeviceOS {
return (
str === 'iOS' ||
str === 'Android' ||
str === 'Metro' ||
str === 'Windows' ||
str === 'MacOS'
protected async handleMessage(
ctx: ConnectionCtx,
parsedMessage: object,
// Not used in this method, but left as a reference for overriding classes
_rawMessage: string,
) {
const {clientQuery, ws} = ctx;
assertNotNull(clientQuery);
const response = await this._onHandleUntrustedMessage(
clientQuery,
parsedMessage,
);
if (response) {
ws.send(response);
}
}
/**
@@ -206,95 +232,29 @@ class ServerWebSocket extends ServerWebSocketBase {
* data will be contained in the message url query string.
* @param message An incoming web socket message.
*/
private _parseClientQuery(
protected parseClientQuery(
query: querystring.ParsedUrlQuery,
): ClientQuery | undefined {
/** Any required arguments to construct a ClientQuery come
* embedded in the query string.
*/
let device_id: string | undefined;
if (typeof query.device_id === 'string') {
device_id = query.device_id;
} else {
return;
}
let device: string | undefined;
if (typeof query.device === 'string') {
device = query.device;
} else {
return;
}
let app: string | undefined;
if (typeof query.app === 'string') {
app = query.app;
} else {
return;
}
let os: DeviceOS | undefined;
if (typeof query.os === 'string' && this.isOS(query.os)) {
os = query.os;
} else {
return;
}
const clientQuery: ClientQuery = {
device_id,
device,
app,
os,
};
if (typeof query.sdk_version === 'string') {
const sdk_version = parseInt(query.sdk_version, 10);
if (sdk_version) {
// TODO: allocate new object, kept now as is to keep changes minimal
(clientQuery as any).sdk_version = sdk_version;
}
}
return clientQuery;
return verifyClientQueryComesFromCertExchangeSupportedOS(
parseClientQuery(query),
);
}
/**
* Parse and extract a SecureClientQuery instance from a message. The ClientQuery
* data will be contained in the message url query string.
* @param message An incoming web socket message.
* WebSocket client verification. Usually used to validate the origin.
*
* Base implementation simply returns true, but this can be overriden by subclasses
* that require verification.
*
* @returns Return true if the client was successfully verified, otherwise
* returns false.
*/
private _parseSecureClientQuery(
query: querystring.ParsedUrlQuery,
): SecureClientQuery | undefined {
/** Any required arguments to construct a SecureClientQuery come
* embedded in the query string.
*/
const clientQuery = this._parseClientQuery(query);
if (!clientQuery) {
return;
}
let csr: string | undefined;
if (typeof query.csr === 'string') {
const buffer = Buffer.from(query.csr, 'base64');
if (buffer) {
csr = buffer.toString('ascii');
}
}
let csr_path: string | undefined;
if (typeof query.csr_path === 'string') {
csr_path = query.csr_path;
}
let medium: number | undefined;
if (typeof query.medium === 'string') {
medium = parseInt(query.medium, 10);
}
if (medium !== undefined && (medium < 1 || medium > 3)) {
throw new Error('Unsupported exchange medium: ' + medium);
}
return {...clientQuery, csr, csr_path, medium: medium as any};
protected verifyClient(): VerifyClientCallbackSync {
return (_info: {origin: string; req: IncomingMessage; secure: boolean}) => {
// Client verification is not necessary. The connected client has
// already been verified using its certificate signed by the server.
return true;
};
}
}