From d2a2e2ab75976306b1a137e0c4f029ebdc5f2315 Mon Sep 17 00:00:00 2001 From: Michel Weststrate Date: Thu, 2 Jan 2020 07:12:06 -0800 Subject: [PATCH] Introduce async message queuing Summary: This diff introduces the logic for queueing incoming messages rather then directly processing them you are behind the `flipper_event_queue` GK. The reason the queue processing is a bit complicated is to make the queue can be processed non-blocking, can be cancelled, and is safe to concurrency issues. The idea here is that the queue is processed when we switch to a plugin, report it's progress, and abort the process when switching to another plugin without loosing any work. This diff does not include [x] updates to the UI (**SO DON"T LAND IN ISOLATION**) [x] metrics to see the effect The effect of the changes can be seen when profiling the application, before this change there are very regular CPU spikes (see the small yellow bar on the top): https://pxl.cl/TQtl These go away when the events are no longer processed https://pxl.cl/TQtp Reviewed By: nikoant Differential Revision: D19095564 fbshipit-source-id: 0b8c3421acc4a4f240bf2aab5c1743132f69aa6e --- package.json | 2 + src/Client.tsx | 104 ++---- src/devices/BaseDevice.tsx | 4 +- src/plugin.tsx | 15 +- src/reducers/connections.tsx | 21 +- src/reducers/index.tsx | 7 + src/reducers/pluginMessageQueue.tsx | 108 ++++++ src/reducers/pluginStates.tsx | 2 +- .../createMockFlipperWithPlugin.tsx | 2 +- src/utils/Idler.tsx | 44 ++- src/utils/__tests__/Idler.node.js | 3 + src/utils/__tests__/messageQueue.node.tsx | 344 ++++++++++++++++++ src/utils/messageQueue.tsx | 215 +++++++++++ types/globals.tsx | 11 - yarn.lock | 20 +- 15 files changed, 783 insertions(+), 119 deletions(-) create mode 100644 src/reducers/pluginMessageQueue.tsx create mode 100644 src/utils/__tests__/messageQueue.node.tsx create mode 100644 src/utils/messageQueue.tsx diff --git a/package.json b/package.json index 4da8fc22b..c572b928e 100644 --- a/package.json +++ b/package.json @@ -76,6 +76,8 @@ "@types/react-redux": "^7.1.5", "@types/react-virtualized-auto-sizer": "^1.0.0", "@types/react-window": "^1.8.1", + "@types/redux-persist": "^4.3.1", + "@types/requestidlecallback": "^0.3.1", "@types/rsocket-core": "^0.0.3", "@types/testing-library__react": "^9.1.2", "@types/tmp": "^0.1.0", diff --git a/src/Client.tsx b/src/Client.tsx index 3001fa4c7..5484e1b57 100644 --- a/src/Client.tsx +++ b/src/Client.tsx @@ -25,48 +25,11 @@ import EventEmitter from 'events'; import invariant from 'invariant'; import {flipperRecorderAddEvent} from './utils/pluginStateRecorder'; import {getPluginKey} from './utils/pluginUtils'; - -const MAX_BACKGROUND_TASK_TIME = 25; - -const pluginBackgroundStats = new Map< - string, - { - cpuTime: number; // Total time spend in persisted Reducer - messages: number; // amount of message received for this plugin - maxTime: number; // maximum time spend in a single reducer call - } ->(); - -if (window) { - // @ts-ignore - window.flipperPrintPluginBackgroundStats = () => { - console.table( - Array.from(pluginBackgroundStats.entries()).map( - ([plugin, {cpuTime, messages, maxTime}]) => ({ - plugin, - cpuTime, - messages, - maxTime, - }), - ), - ); - }; -} - -function addBackgroundStat(plugin: string, cpuTime: number) { - if (!pluginBackgroundStats.has(plugin)) { - pluginBackgroundStats.set(plugin, {cpuTime: 0, messages: 0, maxTime: 0}); - } - const stat = pluginBackgroundStats.get(plugin)!; - stat.cpuTime += cpuTime; - stat.messages += 1; - stat.maxTime = Math.max(stat.maxTime, cpuTime); - if (cpuTime > MAX_BACKGROUND_TASK_TIME) { - console.warn( - `Plugin ${plugin} took too much time while doing background: ${cpuTime}ms. Handling background messages should take less than ${MAX_BACKGROUND_TASK_TIME}ms.`, - ); - } -} +import { + processMessageImmediately, + processMessageLater, +} from './utils/messageQueue'; +import GK from './fb-stubs/GK'; type Plugins = Array; @@ -351,8 +314,6 @@ export default class Client extends EventEmitter { error?: ErrorType; } = rawData; - console.debug(data, 'message:receive'); - const {id, method} = data; if (id == null) { @@ -371,39 +332,33 @@ export default class Client extends EventEmitter { const params: Params = data.params as Params; invariant(params, 'expected params'); - const statName = `${params.api}.${params.method}`; const persistingPlugin: | typeof FlipperPlugin | typeof FlipperDevicePlugin | undefined = this.store.getState().plugins.clientPlugins.get(params.api) || this.store.getState().plugins.devicePlugins.get(params.api); + if (persistingPlugin && persistingPlugin.persistedStateReducer) { - let pluginKey = getPluginKey(this.id, null, params.api); - if (persistingPlugin.prototype instanceof FlipperDevicePlugin) { - // For device plugins, we are just using the device id instead of client id as the prefix. - this.deviceSerial().then( - serial => (pluginKey = `${serial}#${params.api}`), - ); - } - const persistedState = { - ...persistingPlugin.defaultPersistedState, - ...this.store.getState().pluginStates[pluginKey], - }; - const reducerStartTime = Date.now(); - flipperRecorderAddEvent(pluginKey, params.method, params.params); - const newPluginState = persistingPlugin.persistedStateReducer( - persistedState, - params.method, - params.params, + const pluginKey = getPluginKey( + this.id, + this.getDeviceSync(), + params.api, ); - addBackgroundStat(statName, Date.now() - reducerStartTime); - if (persistedState !== newPluginState) { - this.store.dispatch( - setPluginState({ - pluginKey, - state: newPluginState, - }), + flipperRecorderAddEvent(pluginKey, params.method, params.params); + if (GK.get('flipper_event_queue')) { + processMessageLater( + this.store, + pluginKey, + persistingPlugin, + params, + ); + } else { + processMessageImmediately( + this.store, + pluginKey, + persistingPlugin, + params, ); } } @@ -546,6 +501,17 @@ export default class Client extends EventEmitter { }); } + getDeviceSync(): BaseDevice { + let device: BaseDevice | undefined; + this.device.then(d => { + device = d; + }); + if (!device) { + throw new Error('Device not ready yet'); + } + return device!; + } + startTimingRequestResponse(data: RequestMetadata) { performance.mark(this.getPerformanceMark(data)); } diff --git a/src/devices/BaseDevice.tsx b/src/devices/BaseDevice.tsx index 4de942deb..433a1770a 100644 --- a/src/devices/BaseDevice.tsx +++ b/src/devices/BaseDevice.tsx @@ -144,7 +144,7 @@ export default class BaseDevice { this.logListeners.delete(id); } - navigateToLocation(location: string) { + navigateToLocation(_location: string) { throw new Error('unimplemented'); } @@ -162,7 +162,7 @@ export default class BaseDevice { return false; } - async startScreenCapture(destination: string) { + async startScreenCapture(_destination: string) { throw new Error('startScreenCapture not implemented on BaseDevice '); } diff --git a/src/plugin.tsx b/src/plugin.tsx index 41c71e967..a28e39f51 100644 --- a/src/plugin.tsx +++ b/src/plugin.tsx @@ -19,6 +19,7 @@ import {serialize, deserialize} from './utils/serialization'; import {Idler} from './utils/Idler'; import {StaticView} from './reducers/connections'; import {State as ReduxState} from './reducers'; +import {PersistedState} from './plugins/layout'; type Parameters = any; // This function is intended to be called from outside of the plugin. @@ -69,6 +70,12 @@ export type BaseAction = { type: string; }; +export type PersistedStateReducer = ( + persistedState: StaticPersistedState, + method: string, + data: any, +) => StaticPersistedState; + type StaticPersistedState = any; export abstract class FlipperBasePlugin< @@ -90,13 +97,7 @@ export abstract class FlipperBasePlugin< static keyboardActions: KeyboardActions | null; static screenshot: string | null; static defaultPersistedState: any; - static persistedStateReducer: - | (( - persistedState: StaticPersistedState, - method: string, - data: any, - ) => StaticPersistedState) - | null; + static persistedStateReducer: PersistedStateReducer | null; static metricsReducer: | ((persistedState: StaticPersistedState) => Promise) | null; diff --git a/src/reducers/connections.tsx b/src/reducers/connections.tsx index 6180e3d95..c88427379 100644 --- a/src/reducers/connections.tsx +++ b/src/reducers/connections.tsx @@ -208,19 +208,15 @@ const reducer = (state: State = INITAL_STATE, action: Actions): State => { } case 'UNREGISTER_DEVICES': { - const {payload} = action; - const devices = state.devices.filter((device: BaseDevice) => { - if (payload.has(device.serial)) { - return false; - } else { - return true; - } - }); + const deviceSerials = action.payload; - return updateSelection({ - ...state, - devices, - }); + return updateSelection( + produce(state, draft => { + draft.devices = draft.devices.filter( + device => !deviceSerials.has(device.serial), + ); + }), + ); } case 'SELECT_PLUGIN': { const {payload} = action; @@ -380,6 +376,7 @@ const reducer = (state: State = INITAL_STATE, action: Actions): State => { errors, }; } + default: return state; } diff --git a/src/reducers/index.tsx b/src/reducers/index.tsx index 619c47106..c98f14876 100644 --- a/src/reducers/index.tsx +++ b/src/reducers/index.tsx @@ -20,6 +20,10 @@ import pluginStates, { State as PluginStatesState, Action as PluginStatesAction, } from './pluginStates'; +import pluginMessageQueue, { + State as PluginMessageQueueState, + Action as PluginMessageQueueAction, +} from './pluginMessageQueue'; import notifications, { State as NotificationsState, Action as NotificationsAction, @@ -65,6 +69,7 @@ export type Actions = | ApplicationAction | DevicesAction | PluginStatesAction + | PluginMessageQueueAction | NotificationsAction | PluginsAction | UserAction @@ -79,6 +84,7 @@ export type State = { application: ApplicationState; connections: DevicesState & PersistPartial; pluginStates: PluginStatesState; + pluginMessageQueue: PluginMessageQueueState; notifications: NotificationsState & PersistPartial; plugins: PluginsState; user: UserState & PersistPartial; @@ -120,6 +126,7 @@ export default combineReducers({ connections, ), pluginStates, + pluginMessageQueue: pluginMessageQueue as any, notifications: persistReducer( { key: 'notifications', diff --git a/src/reducers/pluginMessageQueue.tsx b/src/reducers/pluginMessageQueue.tsx new file mode 100644 index 000000000..40547aab6 --- /dev/null +++ b/src/reducers/pluginMessageQueue.tsx @@ -0,0 +1,108 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @format + */ + +import produce from 'immer'; +import {deconstructPluginKey} from '../utils/clientUtils'; + +export type Message = { + method: string; + params: any; +}; + +export type State = { + [pluginKey: string]: Message[]; +}; + +export type Action = + | { + type: 'QUEUE_MESSAGE'; + payload: { + pluginKey: string; // client + plugin + } & Message; + } + | { + type: 'CLEAR_MESSAGE_QUEUE'; + payload: { + pluginKey: string; // client + plugin + amount: number; + }; + } + | { + type: 'CLEAR_PLUGIN_STATE'; + payload: {clientId: string; devicePlugins: Set}; + }; + +const INITIAL_STATE: State = {}; + +export default function reducer( + state: State | undefined = INITIAL_STATE, + action: Action, +): State { + switch (action.type) { + case 'QUEUE_MESSAGE': { + const {pluginKey, method, params} = action.payload; + return produce(state, draft => { + if (!draft[pluginKey]) { + draft[pluginKey] = []; + } + draft[pluginKey].push({ + method, + params, + }); + }); + } + + case 'CLEAR_MESSAGE_QUEUE': { + const {pluginKey, amount} = action.payload; + return produce(state, draft => { + const messages = draft[pluginKey]; + if (messages) { + messages.splice(0, amount); + } + }); + } + + case 'CLEAR_PLUGIN_STATE': { + const {payload} = action; + return Object.keys(state).reduce((newState: State, pluginKey) => { + // Only add the pluginState, if its from a plugin other than the one that + // was removed. pluginKeys are in the form of ${clientID}#${pluginID}. + const plugin = deconstructPluginKey(pluginKey); + const clientId = plugin.client; + const pluginId = plugin.pluginName; + if ( + clientId !== payload.clientId || + (pluginId && payload.devicePlugins.has(pluginId)) + ) { + newState[pluginKey] = state[pluginKey]; + } + return newState; + }, {}); + } + default: + return state; + } +} + +export const queueMessage = ( + pluginKey: string, + method: string, + params: any, +): Action => ({ + type: 'QUEUE_MESSAGE', + payload: {pluginKey, method, params}, +}); + +export const clearMessageQueue = ( + pluginKey: string, + amount: number, +): Action => ({ + type: 'CLEAR_MESSAGE_QUEUE', + payload: {pluginKey, amount}, +}); diff --git a/src/reducers/pluginStates.tsx b/src/reducers/pluginStates.tsx index 52e1c9b4f..6c390f3d9 100644 --- a/src/reducers/pluginStates.tsx +++ b/src/reducers/pluginStates.tsx @@ -11,7 +11,7 @@ import {Actions} from '.'; import {deconstructPluginKey} from '../utils/clientUtils'; export type State = { - [pluginKey: string]: Object; + [pluginKey: string]: any; }; export const pluginKey = (serial: string, pluginName: string): string => { diff --git a/src/test-utils/createMockFlipperWithPlugin.tsx b/src/test-utils/createMockFlipperWithPlugin.tsx index c15843716..218cfddd2 100644 --- a/src/test-utils/createMockFlipperWithPlugin.tsx +++ b/src/test-utils/createMockFlipperWithPlugin.tsx @@ -73,7 +73,7 @@ export async function createMockFlipperWithPlugin( // yikes client._deviceSet = true; - // client.getDeviceSync = () => device; + client.getDeviceSync = () => device; client.device = { then() { return device; diff --git a/src/utils/Idler.tsx b/src/utils/Idler.tsx index 588d63fa3..f7bf678aa 100644 --- a/src/utils/Idler.tsx +++ b/src/utils/Idler.tsx @@ -14,12 +14,13 @@ export interface BaseIdler { shouldIdle(): boolean; idle(): Promise; cancel(): void; + isCancelled(): boolean; } export class Idler implements BaseIdler { - lastIdle = performance.now(); - interval = 16; - kill = false; + private lastIdle = performance.now(); + private interval = 16; + private kill = false; shouldIdle(): boolean { return this.kill || performance.now() - this.lastIdle > this.interval; @@ -32,7 +33,15 @@ export class Idler implements BaseIdler { const now = performance.now(); if (now - this.lastIdle > this.interval) { this.lastIdle = now; - return new Promise(resolve => setTimeout(resolve, 0)); + return new Promise(resolve => { + if (typeof requestIdleCallback !== 'undefined') { + requestIdleCallback(() => { + resolve(); + }); + } else { + setTimeout(resolve, 0); + } + }); } return undefined; } @@ -40,14 +49,18 @@ export class Idler implements BaseIdler { cancel() { this.kill = true; } + + isCancelled() { + return this.kill; + } } // This smills like we should be using generators :) export class TestIdler implements BaseIdler { - resolver?: () => void; - kill = false; - autoRun = false; - hasProgressed = false; + private resolver?: () => void; + private kill = false; + private autoRun = false; + private hasProgressed = false; shouldIdle() { if (this.kill) { @@ -56,9 +69,12 @@ export class TestIdler implements BaseIdler { if (this.autoRun) { return false; } - // In turn we signal idle is needed and that it isn't - this.hasProgressed = !this.hasProgressed; - return !this.hasProgressed; + // In turns we signal that idling is needed and that it isn't + if (!this.hasProgressed) { + this.hasProgressed = true; + return false; + } + return true; } async idle() { @@ -74,7 +90,7 @@ export class TestIdler implements BaseIdler { return new Promise(resolve => { this.resolver = () => { this.resolver = undefined; - // this.hasProgressed = false; + this.hasProgressed = false; resolve(); }; }); @@ -102,4 +118,8 @@ export class TestIdler implements BaseIdler { this.resolver?.(); this.autoRun = true; } + + isCancelled() { + return this.kill; + } } diff --git a/src/utils/__tests__/Idler.node.js b/src/utils/__tests__/Idler.node.js index 7ff29978c..d2238c25b 100644 --- a/src/utils/__tests__/Idler.node.js +++ b/src/utils/__tests__/Idler.node.js @@ -18,6 +18,7 @@ test('Idler should interrupt', async () => { if (i == 100) { expect(idler.shouldIdle()).toBe(false); idler.cancel(); + expect(idler.isCancelled()).toBe(true); expect(idler.shouldIdle()).toBe(true); } await idler.idle(); @@ -57,7 +58,9 @@ test('TestIdler can be controlled', async () => { idler.idle(); await idler.next(); + expect(idler.isCancelled()).toBe(false); idler.cancel(); + expect(idler.isCancelled()).toBe(true); expect(idler.shouldIdle()).toBe(true); let threw = false; diff --git a/src/utils/__tests__/messageQueue.node.tsx b/src/utils/__tests__/messageQueue.node.tsx new file mode 100644 index 000000000..f94251db5 --- /dev/null +++ b/src/utils/__tests__/messageQueue.node.tsx @@ -0,0 +1,344 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @format + */ + +import {FlipperPlugin} from '../../plugin'; +import {createMockFlipperWithPlugin} from '../../test-utils/createMockFlipperWithPlugin'; +import {GK} from 'flipper'; +import {selectPlugin} from '../../reducers/connections'; +import {processMessageQueue} from '../messageQueue'; +import {getPluginKey} from '../pluginUtils'; +import {TestIdler} from '../Idler'; + +interface PersistedState { + count: 1; +} + +class TestPlugin extends FlipperPlugin { + static id = 'TestPlugin'; + + static defaultPersistedState = { + count: 0, + }; + + static persistedStateReducer( + persistedState: PersistedState, + method: string, + payload: {delta?: number}, + ) { + if (method === 'inc') { + return Object.assign({}, persistedState, { + count: persistedState.count + ((payload && payload?.delta) || 1), + }); + } + return persistedState; + } + + render() { + return null; + } +} + +test('will process event with GK disabled', async () => { + await createMockFlipperWithPlugin( + TestPlugin, + async ({store, sendMessage}) => { + expect(store.getState().connections.selectedPlugin).toBe('TestPlugin'); + sendMessage('inc', {}); + expect(store.getState().pluginStates).toMatchInlineSnapshot(` + Object { + "TestApp#Android#unit_test#serial#TestPlugin": Object { + "count": 1, + }, + } + `); + }, + ); +}); + +test('queue - events are processed immediately if plugin is selected', async () => { + await createMockFlipperWithPlugin( + TestPlugin, + async ({store, sendMessage}) => { + await GK.withWhitelistedGK('flipper_event_queue', () => { + expect(store.getState().connections.selectedPlugin).toBe('TestPlugin'); + sendMessage('inc', {}); + expect(store.getState().pluginStates).toMatchInlineSnapshot(` + Object { + "TestApp#Android#unit_test#serial#TestPlugin": Object { + "count": 1, + }, + } + `); + expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot( + `Object {}`, + ); + }); + }, + ); +}); + +test('queue - events are NOT processed immediately if plugin is NOT selected', async () => { + await createMockFlipperWithPlugin( + TestPlugin, + async ({client, device, store, sendMessage}) => { + await GK.withWhitelistedGK('flipper_event_queue', async () => { + store.dispatch( + selectPlugin({ + selectedPlugin: 'DeviceLogs', + selectedApp: null, + deepLinkPayload: null, + }), + ); + expect(store.getState().connections.selectedPlugin).not.toBe( + 'TestPlugin', + ); + + sendMessage('inc', {}); + sendMessage('inc', {delta: 2}); + expect(store.getState().pluginStates).toMatchInlineSnapshot( + `Object {}`, + ); + expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot(` + Object { + "TestApp#Android#unit_test#serial#TestPlugin": Array [ + Object { + "method": "inc", + "params": Object {}, + }, + Object { + "method": "inc", + "params": Object { + "delta": 2, + }, + }, + ], + } + `); + + // process the message + const pluginKey = getPluginKey(client.id, device, TestPlugin.id); + await processMessageQueue(client, TestPlugin, pluginKey, store); + expect(store.getState().pluginStates).toEqual({ + [pluginKey]: { + count: 3, + }, + }); + + expect(store.getState().pluginMessageQueue).toEqual({ + [pluginKey]: [], + }); + }); + }, + ); +}); + +test('queue - events processing will be paused', async () => { + await createMockFlipperWithPlugin( + TestPlugin, + async ({client, device, store, sendMessage}) => { + await GK.withWhitelistedGK('flipper_event_queue', async () => { + // select a different plugin + store.dispatch( + selectPlugin({ + selectedPlugin: 'DeviceLogs', + selectedApp: null, + deepLinkPayload: null, + }), + ); + + sendMessage('inc', {}); + sendMessage('inc', {delta: 3}); + sendMessage('inc', {delta: 5}); + + // process the message + const pluginKey = getPluginKey(client.id, device, TestPlugin.id); + + // controlled idler will signal and and off that idling is needed + const idler = new TestIdler(); + + const p = processMessageQueue( + client, + TestPlugin, + pluginKey, + store, + undefined, + idler, + ); + + expect(store.getState().pluginStates).toEqual({ + [pluginKey]: { + count: 4, + }, + }); + + expect(store.getState().pluginMessageQueue).toEqual({ + [pluginKey]: [{method: 'inc', params: {delta: 5}}], + }); + + await idler.next(); + expect(store.getState().pluginStates).toEqual({ + [pluginKey]: { + count: 9, + }, + }); + + expect(store.getState().pluginMessageQueue).toEqual({ + [pluginKey]: [], + }); + + // don't idle anymore + idler.run(); + await p; + }); + }, + ); +}); + +test('queue - messages that arrive during processing will be queued', async () => { + await createMockFlipperWithPlugin( + TestPlugin, + async ({client, device, store, sendMessage}) => { + await GK.withWhitelistedGK('flipper_event_queue', async () => { + // select a different plugin + store.dispatch( + selectPlugin({ + selectedPlugin: 'DeviceLogs', + selectedApp: null, + deepLinkPayload: null, + }), + ); + + sendMessage('inc', {}); + sendMessage('inc', {delta: 2}); + sendMessage('inc', {delta: 3}); + + // process the message + const pluginKey = getPluginKey(client.id, device, TestPlugin.id); + + const idler = new TestIdler(); + + const p = processMessageQueue( + client, + TestPlugin, + pluginKey, + store, + undefined, + idler, + ); + + // first message is consumed + expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(1); + expect(store.getState().pluginStates[pluginKey].count).toBe(3); + + // Select the current plugin as active, still, messages should end up in the queue + store.dispatch( + selectPlugin({ + selectedPlugin: TestPlugin.id, + selectedApp: client.id, + deepLinkPayload: null, + }), + ); + expect(store.getState().connections.selectedPlugin).toBe('TestPlugin'); + + sendMessage('inc', {delta: 4}); + // should not be processed yet + expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(2); + expect(store.getState().pluginStates[pluginKey].count).toBe(3); + + await idler.next(); + expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(0); + expect(store.getState().pluginStates[pluginKey].count).toBe(10); + + idler.run(); + await p; + }); + }, + ); +}); + +test('queue - processing can be cancelled', async () => { + await createMockFlipperWithPlugin( + TestPlugin, + async ({client, device, store, sendMessage}) => { + await GK.withWhitelistedGK('flipper_event_queue', async () => { + // select a different plugin + store.dispatch( + selectPlugin({ + selectedPlugin: 'DeviceLogs', + selectedApp: null, + deepLinkPayload: null, + }), + ); + + sendMessage('inc', {}); + sendMessage('inc', {delta: 2}); + sendMessage('inc', {delta: 3}); + sendMessage('inc', {delta: 4}); + sendMessage('inc', {delta: 5}); + + // process the message + const pluginKey = getPluginKey(client.id, device, TestPlugin.id); + + const idler = new TestIdler(); + + const p = processMessageQueue( + client, + TestPlugin, + pluginKey, + store, + undefined, + idler, + ); + + // first message is consumed + await idler.next(); + expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(1); + expect(store.getState().pluginStates[pluginKey].count).toBe(10); + + idler.cancel(); + + expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(1); + expect(store.getState().pluginStates[pluginKey].count).toBe(10); + await p; + }); + }, + ); +}); + +test('queue - make sure resetting plugin state clears the message queue', async () => { + await createMockFlipperWithPlugin( + TestPlugin, + async ({client, device, store, sendMessage}) => { + await GK.withWhitelistedGK('flipper_event_queue', async () => { + // select a different plugin + store.dispatch( + selectPlugin({ + selectedPlugin: 'DeviceLogs', + selectedApp: null, + deepLinkPayload: null, + }), + ); + + sendMessage('inc', {}); + sendMessage('inc', {delta: 2}); + + const pluginKey = getPluginKey(client.id, device, TestPlugin.id); + + expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(2); + + store.dispatch({ + type: 'CLEAR_PLUGIN_STATE', + payload: {clientId: client.id, devicePlugins: new Set()}, + }); + + expect(store.getState().pluginMessageQueue[pluginKey]).toBe(undefined); + }); + }, + ); +}); diff --git a/src/utils/messageQueue.tsx b/src/utils/messageQueue.tsx new file mode 100644 index 000000000..23ea09638 --- /dev/null +++ b/src/utils/messageQueue.tsx @@ -0,0 +1,215 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @format + */ + +import {PersistedStateReducer} from '../plugin'; +import {Store, State} from '../reducers/index'; +import {setPluginState} from '../reducers/pluginStates'; +import {flipperRecorderAddEvent} from './pluginStateRecorder'; +import { + clearMessageQueue, + queueMessage, + Message, +} from '../reducers/pluginMessageQueue'; +import {Idler, BaseIdler} from './Idler'; +import Client from '../Client'; +import {getPluginKey} from './pluginUtils'; + +const MAX_BACKGROUND_TASK_TIME = 25; + +const pluginBackgroundStats = new Map< + string, + { + cpuTime: number; // Total time spend in persisted Reducer + messages: number; // amount of message received for this plugin + maxTime: number; // maximum time spend in a single reducer call + } +>(); + +if (window) { + // @ts-ignore + window.flipperPrintPluginBackgroundStats = () => { + console.table( + Array.from(pluginBackgroundStats.entries()).map( + ([plugin, {cpuTime, messages, maxTime}]) => ({ + plugin, + cpuTime, + messages, + maxTime, + }), + ), + ); + }; +} + +function addBackgroundStat(plugin: string, cpuTime: number) { + if (!pluginBackgroundStats.has(plugin)) { + pluginBackgroundStats.set(plugin, {cpuTime: 0, messages: 0, maxTime: 0}); + } + const stat = pluginBackgroundStats.get(plugin)!; + stat.cpuTime += cpuTime; + stat.messages += 1; + stat.maxTime = Math.max(stat.maxTime, cpuTime); + if (cpuTime > MAX_BACKGROUND_TASK_TIME) { + console.warn( + `Plugin ${plugin} took too much time while doing background: ${cpuTime}ms. Handling background messages should take less than ${MAX_BACKGROUND_TASK_TIME}ms.`, + ); + } +} + +function processMessage( + state: State, + pluginKey: string, + plugin: { + name: string; + persistedStateReducer: PersistedStateReducer | null; + }, + message: {method: string; params?: any}, +): State { + const statName = `${plugin.name}.${message.method}`; + const reducerStartTime = Date.now(); + flipperRecorderAddEvent(pluginKey, message.method, message.params); + const newPluginState = plugin.persistedStateReducer!( + state, + message.method, + message.params, + ); + addBackgroundStat(statName, Date.now() - reducerStartTime); + return newPluginState; +} + +export function processMessageImmediately( + store: Store, + pluginKey: string, + plugin: { + defaultPersistedState: any; + name: string; + persistedStateReducer: PersistedStateReducer | null; + }, + message: {method: string; params?: any}, +) { + const persistedState: any = + store.getState().pluginStates[pluginKey] ?? + plugin.defaultPersistedState ?? + {}; + const newPluginState = processMessage( + persistedState, + pluginKey, + plugin, + message, + ); + if (persistedState !== newPluginState) { + store.dispatch( + setPluginState({ + pluginKey, + state: newPluginState, + }), + ); + } +} + +export function processMessageLater( + store: Store, + pluginKey: string, + plugin: { + defaultPersistedState: any; + name: string; + persistedStateReducer: PersistedStateReducer | null; + }, + message: {method: string; params?: any}, +) { + // TODO: can we make this better? + const selection = store.getState().connections; + const selectedPlugin = + selection.selectedPlugin && + getPluginKey( + selection.selectedApp, + selection.selectedDevice, + selection.selectedPlugin, + ); + // if the plugin is active, and has no queued messaged, process the message immediately + if ( + selectedPlugin === pluginKey && + getMessages(store, pluginKey).length === 0 + ) { + processMessageImmediately(store, pluginKey, plugin, message); + } else { + // TODO: possible optimization: drop all messages for non-favorited plugins + // TODO: possible optimization: drop messages if queue is too large + store.dispatch(queueMessage(pluginKey, message.method, message.params)); + } +} + +export async function processMessageQueue( + client: Client, + plugin: { + defaultPersistedState: any; + name: string; + persistedStateReducer: PersistedStateReducer; + }, + pluginKey: string, + store: Store, + progressCallback?: (progress: string) => void, + idler: BaseIdler = new Idler(), +) { + const total = getMessages(store, pluginKey).length; + let progress = 0; + do { + const messages = getMessages(store, pluginKey); + if (!messages.length) { + break; + } + + // there are messages to process! lets do so until we have to idle + const persistedState = + store.getState().pluginStates[pluginKey] ?? + plugin.defaultPersistedState ?? + {}; + let offset = 0; + let newPluginState = persistedState; + do { + newPluginState = plugin.persistedStateReducer!( + newPluginState, + messages[offset].method, + messages[offset].params, + ); + offset++; + progress++; + + progressCallback?.( + `Processing events ${progress} / ${Math.max( + total, + progress, + )} (${Math.min(100, 100 * (progress / total))}%)`, + ); + } while (offset < messages.length && !idler.shouldIdle()); + // save progress + // by writing progress away first and then idling, we make sure this logic is + // resistent to kicking off this process twice; grabbing, processing messages, saving state is done synchronosly + // until the idler has to break + store.dispatch(clearMessageQueue(pluginKey, offset)); + if (newPluginState !== persistedState) { + store.dispatch( + setPluginState({ + pluginKey, + state: newPluginState, + }), + ); + } + + if (idler.isCancelled()) { + return; + } + await idler.idle(); + // new messages might have arrived, so keep looping + } while (getMessages(store, pluginKey).length); +} + +function getMessages(store: Store, pluginKey: string): Message[] { + return store.getState().pluginMessageQueue[pluginKey] || []; +} diff --git a/types/globals.tsx b/types/globals.tsx index d1940a6a5..c0a59ab44 100644 --- a/types/globals.tsx +++ b/types/globals.tsx @@ -30,17 +30,6 @@ declare global { init: () => void; }; - // rIC not supportedin TS: https://github.com/Microsoft/TypeScript/issues/21309 - requestIdleCallback: ( - callback: (deadline: { - didTimeout: boolean; - timeRemaining: () => number; - }) => void, - opts?: { - timeout: number; - }, - ) => RequestIdleHandle; - cancelIdleCallback: (handle: RequestIdleHandle) => void; ResizeObserver: ResizeObserver; } } diff --git a/yarn.lock b/yarn.lock index 5530eb131..566c16125 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1304,9 +1304,9 @@ integrity sha512-MGuvYJrPU0HUwqF7LqvIj50RZUX23Z+m583KBygKYUZLlZ88n6w28XRNJRJgsHukLEnLz6w6SvxZoLgbr5wLqQ== "@types/node@^12.12.20": - version "12.12.20" - resolved "https://registry.yarnpkg.com/@types/node/-/node-12.12.20.tgz#7b693038ce661fe57a7ffa4679440b5e7c5e8b99" - integrity sha512-VAe+DiwpnC/g448uN+/3gRl4th0BTdrR9gSLIOHA+SUQskaYZQDOHG7xmjiE7JUhjbXnbXytf6Ih+/pA6CtMFQ== + version "12.12.21" + resolved "https://registry.yarnpkg.com/@types/node/-/node-12.12.21.tgz#aa44a6363291c7037111c47e4661ad210aded23f" + integrity sha512-8sRGhbpU+ck1n0PGAUgVrWrWdjSW2aqNeyC15W88GRsMpSwzv6RJGlLhE7s2RhVSOdyDmxbqlWSeThq4/7xqlA== "@types/promise-retry@^1.1.3": version "1.1.3" @@ -1402,6 +1402,18 @@ dependencies: redux "^4.0.0" +"@types/redux-persist@^4.3.1": + version "4.3.1" + resolved "https://registry.yarnpkg.com/@types/redux-persist/-/redux-persist-4.3.1.tgz#aa4c876859e0bea5155e5f7980e5b8c4699dc2e6" + integrity sha1-qkyHaFngvqUVXl95gOW4xGmdwuY= + dependencies: + redux-persist "*" + +"@types/requestidlecallback@^0.3.1": + version "0.3.1" + resolved "https://registry.yarnpkg.com/@types/requestidlecallback/-/requestidlecallback-0.3.1.tgz#34bb89753b1cdc72d0547522527b1cb0f02b5ec4" + integrity sha512-BnnRkgWYijCIndUn+LgoqKHX/hNpJC5G03B9y7mZya/C2gUQTSn75fEj3ZP1/Rl2E6EYeXh2/7/8UNEZ4X7HuQ== + "@types/retry@*": version "0.12.0" resolved "https://registry.yarnpkg.com/@types/retry/-/retry-0.12.0.tgz#2b35eccfcee7d38cd72ad99232fbd58bffb3c84d" @@ -7789,7 +7801,7 @@ redux-mock-store@^1.5.3: dependencies: lodash.isplainobject "^4.0.6" -redux-persist@^6.0.0: +redux-persist@*, redux-persist@^6.0.0: version "6.0.0" resolved "https://registry.yarnpkg.com/redux-persist/-/redux-persist-6.0.0.tgz#b4d2972f9859597c130d40d4b146fecdab51b3a8" integrity sha512-71LLMbUq2r02ng2We9S215LtPu3fY0KgaGE0k8WRgl6RkqxtGfl7HUozz1Dftwsb0D/5mZ8dwAaPbtnzfvbEwQ==