Fix potential race conditions for starting/stopping server add-ons

Reviewed By: mweststrate

Differential Revision: D34301593

fbshipit-source-id: 2950de8a8567318cd3e87eff176657df5ba8fd1b
This commit is contained in:
Andrey Goncharov
2022-02-28 03:50:34 -08:00
committed by Facebook GitHub Bot
parent bdbf79e3e1
commit 81d0057a8d
6 changed files with 317 additions and 123 deletions

View File

@@ -425,20 +425,23 @@ export class FlipperServerImpl implements FlipperServer {
'plugins-server-add-on-stop': async (pluginName, owner) =>
this.pluginManager.stopServerAddOn(pluginName, owner),
'plugins-server-add-on-request-response': async (payload) => {
const serverAddOn = this.pluginManager.getServerAddOnForMessage(payload);
if (serverAddOn) {
return await serverAddOn.connection.sendExpectResponse(payload);
try {
const serverAddOn =
this.pluginManager.getServerAddOnForMessage(payload);
assertNotNull(serverAddOn);
return await serverAddOn.sendExpectResponse(payload);
} catch {
return {
length: 0,
error: {
message: `Server add-on for message '${JSON.stringify(
payload,
)} is no longer running.`,
name: 'SERVER_ADDON_STOPPED',
stacktrace: '',
},
};
}
return {
length: 0,
error: {
message: `Server add-on for message '${JSON.stringify(
payload,
)} is no longer running.`,
name: 'SERVER_ADDON_STOPPED',
stacktrace: '',
},
};
},
'doctor-get-healthchecks': getHealthChecks,
'doctor-run-healthcheck': runHealthcheck,

View File

@@ -33,7 +33,7 @@ import {
getInstalledPlugin,
installPluginFromNpm,
} from 'flipper-plugin-lib';
import {ServerAddOn} from './ServerAddOn';
import {ServerAddOnManager} from './ServerAddManager';
const maxInstalledPluginVersionsToKeep = 2;
@@ -50,7 +50,7 @@ const isExecuteMessage = (message: object): message is ExecuteMessage =>
(message as ExecuteMessage).method === 'execute';
export class PluginManager {
private readonly serverAddOns = new Map<string, ServerAddOn>();
private readonly serverAddOns = new Map<string, ServerAddOnManager>();
constructor(private readonly flipperServer: FlipperServerForServerAddOn) {}
@@ -178,43 +178,89 @@ export class PluginManager {
pluginName: string,
details: ServerAddOnStartDetails,
owner: string,
) {
): Promise<void> {
console.debug('PluginManager.startServerAddOn', pluginName);
const existingServerAddOn = this.serverAddOns.get(pluginName);
if (existingServerAddOn) {
if (existingServerAddOn.state.is('stopping')) {
console.debug(
'PluginManager.startServerAddOn -> currently stropping',
pluginName,
owner,
existingServerAddOn.state.currentState,
);
await existingServerAddOn.state.wait(['inactive', 'zombie']);
return this.startServerAddOn(pluginName, details, owner);
}
console.debug(
'PluginManager.startServerAddOn -> already started, adding an owner',
'PluginManager.startServerAddOn -> already started',
pluginName,
owner,
existingServerAddOn.state.currentState,
);
existingServerAddOn.addOwner(owner);
await existingServerAddOn.addOwner(owner);
return;
}
const newServerAddOn = await ServerAddOn.start(
const newServerAddOn = new ServerAddOnManager(
pluginName,
details,
owner,
() => this.serverAddOns.delete(pluginName),
this.flipperServer,
);
this.serverAddOns.set(pluginName, newServerAddOn);
newServerAddOn.state.once(['fatal', 'zombie', 'inactive'], () => {
this.serverAddOns.delete(pluginName);
});
await newServerAddOn.state.wait(['active', 'fatal']);
if (newServerAddOn.state.is('fatal')) {
this.serverAddOns.delete(pluginName);
throw newServerAddOn.state.error;
}
}
stopServerAddOn(pluginName: string, owner: string) {
async stopServerAddOn(pluginName: string, owner: string): Promise<void> {
console.debug('PluginManager.stopServerAddOn', pluginName);
const serverAddOn = this.serverAddOns.get(pluginName);
if (!serverAddOn) {
console.debug('PluginManager.stopServerAddOn -> not started', pluginName);
console.warn('PluginManager.stopServerAddOn -> not started', pluginName);
return;
}
serverAddOn.removeOwner(owner);
try {
await serverAddOn.removeOwner(owner);
} catch (e) {
console.error(
'PluginManager.stopServerAddOn -> error',
pluginName,
owner,
e,
);
this.serverAddOns.delete(pluginName);
throw e;
}
}
stopAllServerAddOns(owner: string) {
console.debug('PluginManager.stopAllServerAddOns');
this.serverAddOns.forEach((serverAddOn) => {
serverAddOn.removeOwner(owner);
console.debug('PluginManager.stopAllServerAddOns', owner);
this.serverAddOns.forEach(async (serverAddOnPromise) => {
try {
const serverAddOn = await serverAddOnPromise;
serverAddOn.removeOwner(owner);
} catch (e) {
// It is OK to use a debug level here because any failure would be logged in "stopServerAddOn"
console.debug(
'PluginManager.stopAllServerAddOns -> failed to remove owner',
owner,
e,
);
}
});
}
}

View File

@@ -0,0 +1,147 @@
/**
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @format
*/
import {
ClientResponseType,
ExecuteMessage,
FlipperServerForServerAddOn,
ServerAddOnStartDetails,
} from 'flipper-common';
import {assertNotNull} from '../comms/Utilities';
import {StateMachine} from '../utils/StateMachine';
import {ServerAddOn} from './ServerAddOn';
type TState =
| 'inactive'
| 'starting'
| 'active'
| 'fatal'
| 'stopping'
| 'zombie';
export class ServerAddOnManager {
public readonly state = new StateMachine<TState, 'fatal'>('inactive');
private _serverAddOn?: ServerAddOn;
constructor(
public readonly pluginName: string,
details: ServerAddOnStartDetails,
initialOwner: string,
flipperServer: FlipperServerForServerAddOn,
) {
this.startServerAddOn(details, initialOwner, flipperServer);
}
sendExpectResponse(message: ExecuteMessage): Promise<ClientResponseType> {
if (!this.state.is('active')) {
console.info(
'StateAddOnManager.sendExpectResponse -> error: server add-on is not active, Current state:',
this.state.currentState,
);
throw new Error(
'StateAddOnManager.sendExpectResponse -> error: server add-on is not active',
);
}
assertNotNull(
this._serverAddOn,
'StateAddOnManager.sendExpectResponse -> _serverAddOn is undefined',
);
return this._serverAddOn.connection.sendExpectResponse(message);
}
async addOwner(owner: string) {
if (this.state.is('starting')) {
await this.state.wait(['active', 'fatal']);
}
if (!this.state.is('active')) {
console.info(
'StateAddOnManager.addOwner -> error: server add-on is not active, Current state:',
this.state.currentState,
);
throw new Error(
'StateAddOnManager.addOwner -> error: server add-on is not active',
);
}
assertNotNull(
this._serverAddOn,
'StateAddOnManager.addOwner -> _serverAddOn is undefined',
);
this._serverAddOn.addOwner(owner);
}
async removeOwner(owner: string) {
if (this.state.is(['stopping', 'inactive'])) {
return this.state.wait(['zombie', 'inactive']);
}
if (this.state.is('starting')) {
await this.state.wait(['active', 'fatal']);
}
if (!this.state.is('active')) {
console.debug(
'StateAddOnManager.removeOwner -> error: server add-on failed to start, Current state:',
this.state.currentState,
);
return;
}
assertNotNull(
this._serverAddOn,
'StateAddOnManager.addOwner -> _serverAddOn is undefined',
);
const stopping = this._serverAddOn.removeOwner(owner);
if (stopping) {
this.state.set('stopping');
try {
await stopping;
this.state.set('inactive');
} catch (e) {
this.state.set('zombie');
console.error(
'ServerAddOnManager.removeOwner -> server add-on failed to clean up',
this.pluginName,
e,
);
throw e;
}
}
}
private async startServerAddOn(
details: ServerAddOnStartDetails,
initialOwner: string,
flipperServer: FlipperServerForServerAddOn,
) {
try {
this.state.set('starting');
this._serverAddOn = await ServerAddOn.start(
this.pluginName,
details,
initialOwner,
flipperServer,
);
this.state.set('active');
} catch (e) {
this.state.set('fatal', e);
console.error(
'StateAddOnManager.startServerAddOn -> error',
this.pluginName,
details,
initialOwner,
e,
);
}
}
}

View File

@@ -49,7 +49,6 @@ const loadPlugin = (
return serverAddOnModule;
};
// TODO: Fix potential race conditions when starting/stopping concurrently
export class ServerAddOn {
private owners: Set<string>;
@@ -66,7 +65,6 @@ export class ServerAddOn {
pluginName: string,
details: ServerAddOnStartDetails,
initialOwner: string,
onStop: () => void,
flipperServer: FlipperServerForServerAddOn,
): Promise<ServerAddOn> {
console.info('ServerAddOn.start', pluginName, details);
@@ -89,11 +87,6 @@ export class ServerAddOn {
`ServerAddOn ${pluginName} must return a clean up function, instead it returned ${typeof cleanup}.`,
);
const onStopCombined = async () => {
onStop();
await cleanup();
};
const desktopToModuleConnection = new ServerAddOnDesktopToModuleConnection(
serverAddOnModuleToDesktopConnection,
flipperServer,
@@ -101,7 +94,7 @@ export class ServerAddOn {
return new ServerAddOn(
pluginName,
onStopCombined,
cleanup,
desktopToModuleConnection,
initialOwner,
);
@@ -115,22 +108,12 @@ export class ServerAddOn {
const ownerExisted = this.owners.delete(owner);
if (!this.owners.size && ownerExisted) {
this.stop().catch((e) => {
console.error(
'ServerAddOn.removeOwner -> failed to stop automatically when no owners left',
this.pluginName,
e,
);
});
return this.stop();
}
}
private async stop() {
async stop() {
console.info('ServerAddOn.stop', this.pluginName);
try {
await this.cleanup();
} catch (e) {
console.error('ServerAddOn.stop -> failed to clean up', this.pluginName);
}
await this.cleanup();
}
}

View File

@@ -7,9 +7,9 @@
* @format
*/
import EventEmitter from 'events';
import {sleep} from 'flipper-common';
import {assertNotNull} from '../comms/Utilities';
import {StateMachine} from './StateMachine';
export const RESTART_CNT = 3;
const RESTART_SLEEP = 100;
@@ -21,84 +21,13 @@ export type DeviceLogListenerState =
| 'inactive'
| 'fatal'
| 'zombie';
class State {
private _currentState: DeviceLogListenerState = 'inactive';
private _error?: Error;
private valueEmitter = new EventEmitter();
get error() {
return this._error;
}
get currentState() {
return this._currentState;
}
set<T extends DeviceLogListenerState>(
...[newState, error]: T extends 'fatal' | 'zombie' ? [T, Error] : [T]
) {
this._currentState = newState;
this._error = error;
this.valueEmitter.emit(newState);
}
once(
state: DeviceLogListenerState | DeviceLogListenerState[],
cb: () => void,
): () => void {
return this.subscribe(state, cb, {once: true});
}
on(
state: DeviceLogListenerState | DeviceLogListenerState[],
cb: () => void,
): () => void {
return this.subscribe(state, cb);
}
is(targetState: DeviceLogListenerState | DeviceLogListenerState[]) {
if (!Array.isArray(targetState)) {
targetState = [targetState];
}
return targetState.includes(this._currentState);
}
private subscribe(
state: DeviceLogListenerState | DeviceLogListenerState[],
cb: () => void,
{once}: {once?: boolean} = {},
): () => void {
const statesNormalized = Array.isArray(state) ? state : [state];
if (statesNormalized.includes(this._currentState)) {
cb();
return () => {};
}
let executed = false;
const wrappedCb = () => {
if (!executed) {
executed = true;
cb();
}
};
const fn = once ? 'once' : 'on';
statesNormalized.forEach((item) => {
this.valueEmitter[fn](item, wrappedCb);
});
return () => {
statesNormalized.forEach((item) => {
this.valueEmitter.off(item, wrappedCb);
});
};
}
}
export abstract class DeviceListener {
private name: string = this.constructor.name;
protected _state = new State();
protected _state = new StateMachine<
DeviceLogListenerState,
'fatal' | 'zombie'
>('inactive');
private stopLogListener?: () => Promise<void> | void;

View File

@@ -0,0 +1,86 @@
/**
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @format
*/
import {EventEmitter} from 'events';
export class StateMachine<TState extends string, TError extends TState> {
private _error?: Error;
private valueEmitter = new EventEmitter();
constructor(private _currentState: TState) {}
get error() {
return this._error;
}
get currentState() {
return this._currentState;
}
set<T extends TState>(
...[newState, error]: T extends TError ? [T, Error] : [T]
) {
this._currentState = newState as TState;
this._error = error;
this.valueEmitter.emit(newState as TState);
}
wait<T extends TState | TState[]>(state: T): Promise<void> {
return new Promise((resolve) => {
this.once(state, resolve);
});
}
once(state: TState | TState[], cb: () => void): () => void {
return this.subscribe(state, cb, {once: true});
}
on(state: TState | TState[], cb: () => void): () => void {
return this.subscribe(state, cb);
}
is(targetState: TState | TState[]) {
if (!Array.isArray(targetState)) {
targetState = [targetState];
}
return targetState.includes(this._currentState);
}
private subscribe(
state: TState | TState[],
cb: () => void,
{once}: {once?: boolean} = {},
): () => void {
const statesNormalized = Array.isArray(state) ? state : [state];
if (statesNormalized.includes(this._currentState)) {
cb();
return () => {};
}
let executed = false;
const wrappedCb = () => {
if (!executed) {
executed = true;
cb();
}
};
const fn = once ? 'once' : 'on';
statesNormalized.forEach((item) => {
this.valueEmitter[fn](item, wrappedCb);
});
return () => {
statesNormalized.forEach((item) => {
this.valueEmitter.off(item, wrappedCb);
});
};
}
}