diff --git a/desktop/app/src/Client.tsx b/desktop/app/src/Client.tsx index 5c24f2954..81d60dc06 100644 --- a/desktop/app/src/Client.tsx +++ b/desktop/app/src/Client.tsx @@ -23,14 +23,15 @@ import {registerPlugins} from './reducers/plugins'; import createTableNativePlugin from './plugins/TableNativePlugin'; import {EventEmitter} from 'events'; import invariant from 'invariant'; -import {flipperRecorderAddEvent} from './utils/pluginStateRecorder'; import { getPluginKey, defaultEnabledBackgroundPlugins, } from './utils/pluginUtils'; -import {processMessageLater} from './utils/messageQueue'; +import {processMessagesLater} from './utils/messageQueue'; import {sideEffect} from './utils/sideEffect'; import {emitBytesReceived} from './dispatcher/tracking'; +import {debounce} from 'lodash'; +import {batch} from 'react-redux'; type Plugins = Array; @@ -126,6 +127,13 @@ export default class Client extends EventEmitter { logger: Logger; lastSeenDeviceList: Array; broadcastCallbacks: Map>>; + messageBuffer: Record< + string /*pluginKey*/, + { + plugin: typeof FlipperPlugin | typeof FlipperDevicePlugin; + messages: Params[]; + } + > = {}; requestCallbacks: Map< number, @@ -398,8 +406,15 @@ export default class Client extends EventEmitter { {serial: this.query.device_id}, params.api, ); - flipperRecorderAddEvent(pluginKey, params.method, params.params); - processMessageLater(this.store, pluginKey, persistingPlugin, params); + if (!this.messageBuffer[pluginKey]) { + this.messageBuffer[pluginKey] = { + plugin: persistingPlugin, + messages: [params], + }; + } else { + this.messageBuffer[pluginKey].messages.push(params); + } + this.flushMessageBufferDebounced(); } const apiCallbacks = this.broadcastCallbacks.get(params.api); if (!apiCallbacks) { @@ -545,6 +560,26 @@ export default class Client extends EventEmitter { }); } + flushMessageBuffer = () => { + // batch to make sure that Redux collapsed the dispatches + batch(() => { + for (const pluginKey in this.messageBuffer) { + processMessagesLater( + this.store, + pluginKey, + this.messageBuffer[pluginKey].plugin, + this.messageBuffer[pluginKey].messages, + ); + } + this.messageBuffer = {}; + }); + }; + + flushMessageBufferDebounced = debounce(this.flushMessageBuffer, 200, { + leading: true, + trailing: true, + }); + startTimingRequestResponse(data: RequestMetadata) { performance.mark(this.getPerformanceMark(data)); } diff --git a/desktop/app/src/reducers/pluginMessageQueue.tsx b/desktop/app/src/reducers/pluginMessageQueue.tsx index e146997fc..06f71a2d1 100644 --- a/desktop/app/src/reducers/pluginMessageQueue.tsx +++ b/desktop/app/src/reducers/pluginMessageQueue.tsx @@ -14,7 +14,7 @@ export const DEFAULT_MAX_QUEUE_SIZE = 10000; export type Message = { method: string; - params: any; + params?: any; }; export type State = { @@ -23,11 +23,12 @@ export type State = { export type Action = | { - type: 'QUEUE_MESSAGE'; + type: 'QUEUE_MESSAGES'; payload: { pluginKey: string; // client + plugin maxQueueSize: number; - } & Message; + messages: Message[]; + }; } | { type: 'CLEAR_MESSAGE_QUEUE'; @@ -48,17 +49,17 @@ export default function reducer( action: Action, ): State { switch (action.type) { - case 'QUEUE_MESSAGE': { - const {pluginKey, method, params, maxQueueSize} = action.payload; + case 'QUEUE_MESSAGES': { + const {pluginKey, messages, maxQueueSize} = action.payload; // this is hit very often, so try to do it a bit optimal const currentMessages = state[pluginKey] || []; - const newMessages = - currentMessages.length < maxQueueSize - ? currentMessages.slice() - : // throw away first 10% of the queue if it gets too full - (console.log(`Dropping events for plugin ${pluginKey}`), - currentMessages.slice(Math.floor(maxQueueSize / 10))); - newMessages.push({method, params}); + let newMessages = currentMessages.concat(messages); + if (newMessages.length > maxQueueSize) { + // only keep last 90% of max queue size + newMessages = newMessages.slice( + newMessages.length - 1 - Math.ceil(maxQueueSize * 0.9), + ); + } return { ...state, [pluginKey]: newMessages, @@ -97,17 +98,15 @@ export default function reducer( } } -export const queueMessage = ( +export const queueMessages = ( pluginKey: string, - method: string, - params: any, + messages: Message[], maxQueueSize: number | undefined, ): Action => ({ - type: 'QUEUE_MESSAGE', + type: 'QUEUE_MESSAGES', payload: { pluginKey, - method, - params, + messages, maxQueueSize: maxQueueSize || DEFAULT_MAX_QUEUE_SIZE, }, }); diff --git a/desktop/app/src/utils/__tests__/messageQueue.node.tsx b/desktop/app/src/utils/__tests__/messageQueue.node.tsx index ccb9a85b9..7975aa9ac 100644 --- a/desktop/app/src/utils/__tests__/messageQueue.node.tsx +++ b/desktop/app/src/utils/__tests__/messageQueue.node.tsx @@ -7,9 +7,9 @@ * @format */ -import {FlipperPlugin} from '../../plugin'; +import {FlipperPlugin, FlipperDevicePlugin} from '../../plugin'; import {createMockFlipperWithPlugin} from '../../test-utils/createMockFlipperWithPlugin'; -import {Store, Client} from '../../'; +import {Store, Client, sleep} from '../../'; import { selectPlugin, starPlugin, @@ -21,8 +21,9 @@ import {getPluginKey} from '../pluginUtils'; import {TestIdler} from '../Idler'; import pluginMessageQueue, { State, - queueMessage, + queueMessages, } from '../../reducers/pluginMessageQueue'; +import {registerPlugins} from '../../reducers/plugins'; interface PersistedState { count: 1; @@ -104,7 +105,7 @@ test('queue - events are processed immediately if plugin is selected', async () ); }); -test('queue - events are NOT processed immediately if plugin is NOT selected (but starred)', async () => { +test('queue - events are NOT processed immediately if plugin is NOT selected (but enabled)', async () => { await createMockFlipperWithPlugin( TestPlugin, async ({client, device, store, sendMessage}) => { @@ -115,30 +116,53 @@ test('queue - events are NOT processed immediately if plugin is NOT selected (bu sendMessage('inc', {}); sendMessage('inc', {delta: 2}); + sendMessage('inc', {delta: 3}); expect(store.getState().pluginStates).toMatchInlineSnapshot(`Object {}`); + // the first message is already visible cause of the leading debounce expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot(` - Object { - "TestApp#Android#MockAndroidDevice#serial#TestPlugin": Array [ - Object { - "method": "inc", - "params": Object {}, + Object { + "TestApp#Android#MockAndroidDevice#serial#TestPlugin": Array [ + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object {}, + }, + ], + } + `); + client.flushMessageBuffer(); + expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot(` + Object { + "TestApp#Android#MockAndroidDevice#serial#TestPlugin": Array [ + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object {}, + }, + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object { + "delta": 2, }, - Object { - "method": "inc", - "params": Object { - "delta": 2, - }, + }, + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object { + "delta": 3, }, - ], - } - `); + }, + ], + } + `); // process the message const pluginKey = getPluginKey(client.id, device, TestPlugin.id); await processMessageQueue(TestPlugin, pluginKey, store); expect(store.getState().pluginStates).toEqual({ [pluginKey]: { - count: 3, + count: 6, }, }); @@ -150,16 +174,18 @@ test('queue - events are NOT processed immediately if plugin is NOT selected (bu starTestPlugin(store, client); selectTestPlugin(store, client); sendMessage('inc', {delta: 3}); + client.flushMessageBuffer(); // active, immediately processed expect(store.getState().pluginStates).toEqual({ [pluginKey]: { - count: 6, + count: 9, }, }); // different plugin, and not starred, message will never arrive selectDeviceLogs(store); sendMessage('inc', {delta: 4}); + client.flushMessageBuffer(); expect(store.getState().pluginMessageQueue).toEqual({ [pluginKey]: [], }); @@ -167,9 +193,10 @@ test('queue - events are NOT processed immediately if plugin is NOT selected (bu // star again, plugin still not selected, message is queued starTestPlugin(store, client); sendMessage('inc', {delta: 5}); + client.flushMessageBuffer(); expect(store.getState().pluginMessageQueue).toEqual({ - [pluginKey]: [{method: 'inc', params: {delta: 5}}], + [pluginKey]: [{api: 'TestPlugin', method: 'inc', params: {delta: 5}}], }); }, ); @@ -191,11 +218,11 @@ test('queue - events are queued for plugins that are favorite when app is not se // as the plugin was enabled already on the first client as well sendMessage('inc', {delta: 2}); expect(store.getState().pluginStates).toMatchInlineSnapshot(`Object {}`); - expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot( - ` + expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot(` Object { "TestApp#Android#MockAndroidDevice#serial#TestPlugin": Array [ Object { + "api": "TestPlugin", "method": "inc", "params": Object { "delta": 2, @@ -203,8 +230,7 @@ test('queue - events are queued for plugins that are favorite when app is not se }, ], } - `, - ); + `); }, ); }); @@ -227,11 +253,11 @@ test('queue - events are queued for plugins that are favorite when app is select // as the plugin was enabled already on the first client as well sendMessage('inc', {delta: 2}); expect(store.getState().pluginStates).toMatchInlineSnapshot(`Object {}`); - expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot( - ` + expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot(` Object { "TestApp#Android#MockAndroidDevice#serial#TestPlugin": Array [ Object { + "api": "TestPlugin", "method": "inc", "params": Object { "delta": 2, @@ -239,8 +265,7 @@ test('queue - events are queued for plugins that are favorite when app is select }, ], } - `, - ); + `); }, ); }); @@ -254,6 +279,7 @@ test('queue - events processing will be paused', async () => { sendMessage('inc', {}); sendMessage('inc', {delta: 3}); sendMessage('inc', {delta: 5}); + client.flushMessageBuffer(); // process the message const pluginKey = getPluginKey(client.id, device, TestPlugin.id); @@ -276,7 +302,7 @@ test('queue - events processing will be paused', async () => { }); expect(store.getState().pluginMessageQueue).toEqual({ - [pluginKey]: [{method: 'inc', params: {delta: 5}}], + [pluginKey]: [{api: 'TestPlugin', method: 'inc', params: {delta: 5}}], }); await idler.next(); @@ -306,6 +332,7 @@ test('queue - messages that arrive during processing will be queued', async () = sendMessage('inc', {}); sendMessage('inc', {delta: 2}); sendMessage('inc', {delta: 3}); + client.flushMessageBuffer(); // process the message const pluginKey = getPluginKey(client.id, device, TestPlugin.id); @@ -336,6 +363,7 @@ test('queue - messages that arrive during processing will be queued', async () = expect(store.getState().connections.selectedPlugin).toBe('TestPlugin'); sendMessage('inc', {delta: 4}); + client.flushMessageBuffer(); // should not be processed yet expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(2); expect(store.getState().pluginStates[pluginKey].count).toBe(3); @@ -361,6 +389,7 @@ test('queue - processing can be cancelled', async () => { sendMessage('inc', {delta: 3}); sendMessage('inc', {delta: 4}); sendMessage('inc', {delta: 5}); + client.flushMessageBuffer(); // process the message const pluginKey = getPluginKey(client.id, device, TestPlugin.id); @@ -397,6 +426,7 @@ test('queue - make sure resetting plugin state clears the message queue', async sendMessage('inc', {}); sendMessage('inc', {delta: 2}); + client.flushMessageBuffer(); const pluginKey = getPluginKey(client.id, device, TestPlugin.id); @@ -420,7 +450,7 @@ test('queue will be cleaned up when it exceeds maximum size', () => { for (i = 0; i < queueSize; i++) { state = pluginMessageQueue( state, - queueMessage(pluginKey, 'test', {i}, queueSize), + queueMessages(pluginKey, [{method: 'test', params: {i}}], queueSize), ); } // almost full @@ -433,7 +463,7 @@ test('queue will be cleaned up when it exceeds maximum size', () => { state = pluginMessageQueue( state, - queueMessage(pluginKey, 'test', {i: ++i}, queueSize), + queueMessages(pluginKey, [{method: 'test', params: {i: ++i}}], queueSize), ); const newLength = Math.ceil(0.9 * queueSize) + 1; // ~4500 @@ -447,3 +477,172 @@ test('queue will be cleaned up when it exceeds maximum size', () => { params: {i: i}, // ~50001 }); }); + +test('client - incoming messages are buffered and flushed together', async () => { + class StubDeviceLogs extends FlipperDevicePlugin { + static id = 'DevicePlugin'; + + static supportsDevice() { + return true; + } + + static persistedStateReducer = jest.fn(); + } + + await createMockFlipperWithPlugin( + TestPlugin, + async ({client, store, device, sendMessage}) => { + selectDeviceLogs(store); + + store.dispatch(registerPlugins([StubDeviceLogs])); + sendMessage('inc', {}); + sendMessage('inc', {delta: 2}); + sendMessage('inc', {delta: 3}); + + // send a message to device logs + client.onMessage( + JSON.stringify({ + method: 'execute', + params: { + api: 'DevicePlugin', + method: 'log', + params: {line: 'suff'}, + }, + }), + ); + + expect(store.getState().pluginStates).toMatchInlineSnapshot(`Object {}`); + // the first message is already visible cause of the leading debounce + expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot(` + Object { + "TestApp#Android#MockAndroidDevice#serial#TestPlugin": Array [ + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object {}, + }, + ], + } + `); + expect(client.messageBuffer).toMatchInlineSnapshot(` + Object { + "TestApp#Android#MockAndroidDevice#serial#DevicePlugin": Object { + "messages": Array [ + Object { + "api": "DevicePlugin", + "method": "log", + "params": Object { + "line": "suff", + }, + }, + ], + "plugin": [Function], + }, + "TestApp#Android#MockAndroidDevice#serial#TestPlugin": Object { + "messages": Array [ + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object { + "delta": 2, + }, + }, + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object { + "delta": 3, + }, + }, + ], + "plugin": [Function], + }, + } + `); + + await sleep(500); + expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot(` + Object { + "TestApp#Android#MockAndroidDevice#serial#DevicePlugin": Array [ + Object { + "api": "DevicePlugin", + "method": "log", + "params": Object { + "line": "suff", + }, + }, + ], + "TestApp#Android#MockAndroidDevice#serial#TestPlugin": Array [ + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object {}, + }, + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object { + "delta": 2, + }, + }, + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object { + "delta": 3, + }, + }, + ], + } + `); + expect(client.messageBuffer).toMatchInlineSnapshot(`Object {}`); + expect( + StubDeviceLogs.persistedStateReducer.mock.calls, + ).toMatchInlineSnapshot(`Array []`); + + // tigger processing the queue + const pluginKey = getPluginKey(client.id, device, StubDeviceLogs.id); + await processMessageQueue(StubDeviceLogs, pluginKey, store); + + expect(StubDeviceLogs.persistedStateReducer.mock.calls) + .toMatchInlineSnapshot(` + Array [ + Array [ + Object {}, + "log", + Object { + "line": "suff", + }, + ], + ] + `); + + expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot(` + Object { + "TestApp#Android#MockAndroidDevice#serial#DevicePlugin": Array [], + "TestApp#Android#MockAndroidDevice#serial#TestPlugin": Array [ + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object {}, + }, + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object { + "delta": 2, + }, + }, + Object { + "api": "TestPlugin", + "method": "inc", + "params": Object { + "delta": 3, + }, + }, + ], + } + `); + }, + ); +}); diff --git a/desktop/app/src/utils/messageQueue.tsx b/desktop/app/src/utils/messageQueue.tsx index 824d8f769..0440f6141 100644 --- a/desktop/app/src/utils/messageQueue.tsx +++ b/desktop/app/src/utils/messageQueue.tsx @@ -13,7 +13,7 @@ import {setPluginState} from '../reducers/pluginStates'; import {flipperRecorderAddEvent} from './pluginStateRecorder'; import { clearMessageQueue, - queueMessage, + queueMessages, Message, } from '../reducers/pluginMessageQueue'; import {Idler, BaseIdler} from './Idler'; @@ -143,7 +143,7 @@ function processMessage( id: string; persistedStateReducer: PersistedStateReducer | null; }, - message: {method: string; params?: any}, + message: Message, ): State { const reducerStartTime = Date.now(); flipperRecorderAddEvent(pluginKey, message.method, message.params); @@ -161,7 +161,7 @@ function processMessage( } } -export function processMessageImmediately( +export function processMessagesImmediately( store: MiddlewareAPI, pluginKey: string, plugin: { @@ -169,15 +169,13 @@ export function processMessageImmediately( id: string; persistedStateReducer: PersistedStateReducer | null; }, - message: {method: string; params?: any}, + messages: Message[], ) { const persistedState = getCurrentPluginState(store, plugin, pluginKey); - const newPluginState = processMessage( - persistedState, - pluginKey, - plugin, - message, - ); + let newPluginState: any; + messages.forEach((message) => { + newPluginState = processMessage(persistedState, pluginKey, plugin, message); + }); if (persistedState !== newPluginState) { store.dispatch( setPluginState({ @@ -188,7 +186,7 @@ export function processMessageImmediately( } } -export function processMessageLater( +export function processMessagesLater( store: MiddlewareAPI, pluginKey: string, plugin: { @@ -197,30 +195,24 @@ export function processMessageLater( persistedStateReducer: PersistedStateReducer | null; maxQueueSize?: number; }, - message: {method: string; params?: any}, + messages: Message[], ) { const isSelected = pluginKey === getSelectedPluginKey(store.getState().connections); switch (true) { case plugin.id === 'Navigation': // Navigation events are always processed, to make sure the navbar stays up to date case isSelected && getPendingMessages(store, pluginKey).length === 0: - processMessageImmediately(store, pluginKey, plugin, message); + processMessagesImmediately(store, pluginKey, plugin, messages); break; case isSelected: case plugin instanceof FlipperDevicePlugin: + case (plugin as any).prototype instanceof FlipperDevicePlugin: case pluginIsStarred( store.getState().connections.userStarredPlugins, deconstructPluginKey(pluginKey).client, plugin.id, ): - store.dispatch( - queueMessage( - pluginKey, - message.method, - message.params, - plugin.maxQueueSize, - ), - ); + store.dispatch(queueMessages(pluginKey, messages, plugin.maxQueueSize)); break; default: // In all other cases, messages will be dropped...