Introduce async message queuing

Summary:
This diff introduces the logic for queueing incoming messages rather then directly processing them you are behind the `flipper_event_queue` GK.

The reason the queue processing is a bit complicated is to make the queue can be processed non-blocking, can be cancelled, and is safe to concurrency issues.
The idea here is that the queue is processed when we switch to a plugin, report it's progress, and abort the process when switching to another plugin without loosing any work.

This diff does not include
[x] updates to the UI (**SO DON"T LAND IN ISOLATION**)
[x] metrics to see the effect

The effect of the changes can be seen when profiling the application, before this change there are very regular CPU spikes (see the small yellow bar on the top):

https://pxl.cl/TQtl

These go away when the events are no longer processed
https://pxl.cl/TQtp

Reviewed By: nikoant

Differential Revision: D19095564

fbshipit-source-id: 0b8c3421acc4a4f240bf2aab5c1743132f69aa6e
This commit is contained in:
Michel Weststrate
2020-01-02 07:12:06 -08:00
committed by Facebook Github Bot
parent c87c0edbb8
commit d2a2e2ab75
15 changed files with 783 additions and 119 deletions

View File

@@ -14,12 +14,13 @@ export interface BaseIdler {
shouldIdle(): boolean;
idle(): Promise<void>;
cancel(): void;
isCancelled(): boolean;
}
export class Idler implements BaseIdler {
lastIdle = performance.now();
interval = 16;
kill = false;
private lastIdle = performance.now();
private interval = 16;
private kill = false;
shouldIdle(): boolean {
return this.kill || performance.now() - this.lastIdle > this.interval;
@@ -32,7 +33,15 @@ export class Idler implements BaseIdler {
const now = performance.now();
if (now - this.lastIdle > this.interval) {
this.lastIdle = now;
return new Promise(resolve => setTimeout(resolve, 0));
return new Promise(resolve => {
if (typeof requestIdleCallback !== 'undefined') {
requestIdleCallback(() => {
resolve();
});
} else {
setTimeout(resolve, 0);
}
});
}
return undefined;
}
@@ -40,14 +49,18 @@ export class Idler implements BaseIdler {
cancel() {
this.kill = true;
}
isCancelled() {
return this.kill;
}
}
// This smills like we should be using generators :)
export class TestIdler implements BaseIdler {
resolver?: () => void;
kill = false;
autoRun = false;
hasProgressed = false;
private resolver?: () => void;
private kill = false;
private autoRun = false;
private hasProgressed = false;
shouldIdle() {
if (this.kill) {
@@ -56,9 +69,12 @@ export class TestIdler implements BaseIdler {
if (this.autoRun) {
return false;
}
// In turn we signal idle is needed and that it isn't
this.hasProgressed = !this.hasProgressed;
return !this.hasProgressed;
// In turns we signal that idling is needed and that it isn't
if (!this.hasProgressed) {
this.hasProgressed = true;
return false;
}
return true;
}
async idle() {
@@ -74,7 +90,7 @@ export class TestIdler implements BaseIdler {
return new Promise<void>(resolve => {
this.resolver = () => {
this.resolver = undefined;
// this.hasProgressed = false;
this.hasProgressed = false;
resolve();
};
});
@@ -102,4 +118,8 @@ export class TestIdler implements BaseIdler {
this.resolver?.();
this.autoRun = true;
}
isCancelled() {
return this.kill;
}
}

View File

@@ -18,6 +18,7 @@ test('Idler should interrupt', async () => {
if (i == 100) {
expect(idler.shouldIdle()).toBe(false);
idler.cancel();
expect(idler.isCancelled()).toBe(true);
expect(idler.shouldIdle()).toBe(true);
}
await idler.idle();
@@ -57,7 +58,9 @@ test('TestIdler can be controlled', async () => {
idler.idle();
await idler.next();
expect(idler.isCancelled()).toBe(false);
idler.cancel();
expect(idler.isCancelled()).toBe(true);
expect(idler.shouldIdle()).toBe(true);
let threw = false;

View File

@@ -0,0 +1,344 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @format
*/
import {FlipperPlugin} from '../../plugin';
import {createMockFlipperWithPlugin} from '../../test-utils/createMockFlipperWithPlugin';
import {GK} from 'flipper';
import {selectPlugin} from '../../reducers/connections';
import {processMessageQueue} from '../messageQueue';
import {getPluginKey} from '../pluginUtils';
import {TestIdler} from '../Idler';
interface PersistedState {
count: 1;
}
class TestPlugin extends FlipperPlugin<any, any, any> {
static id = 'TestPlugin';
static defaultPersistedState = {
count: 0,
};
static persistedStateReducer(
persistedState: PersistedState,
method: string,
payload: {delta?: number},
) {
if (method === 'inc') {
return Object.assign({}, persistedState, {
count: persistedState.count + ((payload && payload?.delta) || 1),
});
}
return persistedState;
}
render() {
return null;
}
}
test('will process event with GK disabled', async () => {
await createMockFlipperWithPlugin(
TestPlugin,
async ({store, sendMessage}) => {
expect(store.getState().connections.selectedPlugin).toBe('TestPlugin');
sendMessage('inc', {});
expect(store.getState().pluginStates).toMatchInlineSnapshot(`
Object {
"TestApp#Android#unit_test#serial#TestPlugin": Object {
"count": 1,
},
}
`);
},
);
});
test('queue - events are processed immediately if plugin is selected', async () => {
await createMockFlipperWithPlugin(
TestPlugin,
async ({store, sendMessage}) => {
await GK.withWhitelistedGK('flipper_event_queue', () => {
expect(store.getState().connections.selectedPlugin).toBe('TestPlugin');
sendMessage('inc', {});
expect(store.getState().pluginStates).toMatchInlineSnapshot(`
Object {
"TestApp#Android#unit_test#serial#TestPlugin": Object {
"count": 1,
},
}
`);
expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot(
`Object {}`,
);
});
},
);
});
test('queue - events are NOT processed immediately if plugin is NOT selected', async () => {
await createMockFlipperWithPlugin(
TestPlugin,
async ({client, device, store, sendMessage}) => {
await GK.withWhitelistedGK('flipper_event_queue', async () => {
store.dispatch(
selectPlugin({
selectedPlugin: 'DeviceLogs',
selectedApp: null,
deepLinkPayload: null,
}),
);
expect(store.getState().connections.selectedPlugin).not.toBe(
'TestPlugin',
);
sendMessage('inc', {});
sendMessage('inc', {delta: 2});
expect(store.getState().pluginStates).toMatchInlineSnapshot(
`Object {}`,
);
expect(store.getState().pluginMessageQueue).toMatchInlineSnapshot(`
Object {
"TestApp#Android#unit_test#serial#TestPlugin": Array [
Object {
"method": "inc",
"params": Object {},
},
Object {
"method": "inc",
"params": Object {
"delta": 2,
},
},
],
}
`);
// process the message
const pluginKey = getPluginKey(client.id, device, TestPlugin.id);
await processMessageQueue(client, TestPlugin, pluginKey, store);
expect(store.getState().pluginStates).toEqual({
[pluginKey]: {
count: 3,
},
});
expect(store.getState().pluginMessageQueue).toEqual({
[pluginKey]: [],
});
});
},
);
});
test('queue - events processing will be paused', async () => {
await createMockFlipperWithPlugin(
TestPlugin,
async ({client, device, store, sendMessage}) => {
await GK.withWhitelistedGK('flipper_event_queue', async () => {
// select a different plugin
store.dispatch(
selectPlugin({
selectedPlugin: 'DeviceLogs',
selectedApp: null,
deepLinkPayload: null,
}),
);
sendMessage('inc', {});
sendMessage('inc', {delta: 3});
sendMessage('inc', {delta: 5});
// process the message
const pluginKey = getPluginKey(client.id, device, TestPlugin.id);
// controlled idler will signal and and off that idling is needed
const idler = new TestIdler();
const p = processMessageQueue(
client,
TestPlugin,
pluginKey,
store,
undefined,
idler,
);
expect(store.getState().pluginStates).toEqual({
[pluginKey]: {
count: 4,
},
});
expect(store.getState().pluginMessageQueue).toEqual({
[pluginKey]: [{method: 'inc', params: {delta: 5}}],
});
await idler.next();
expect(store.getState().pluginStates).toEqual({
[pluginKey]: {
count: 9,
},
});
expect(store.getState().pluginMessageQueue).toEqual({
[pluginKey]: [],
});
// don't idle anymore
idler.run();
await p;
});
},
);
});
test('queue - messages that arrive during processing will be queued', async () => {
await createMockFlipperWithPlugin(
TestPlugin,
async ({client, device, store, sendMessage}) => {
await GK.withWhitelistedGK('flipper_event_queue', async () => {
// select a different plugin
store.dispatch(
selectPlugin({
selectedPlugin: 'DeviceLogs',
selectedApp: null,
deepLinkPayload: null,
}),
);
sendMessage('inc', {});
sendMessage('inc', {delta: 2});
sendMessage('inc', {delta: 3});
// process the message
const pluginKey = getPluginKey(client.id, device, TestPlugin.id);
const idler = new TestIdler();
const p = processMessageQueue(
client,
TestPlugin,
pluginKey,
store,
undefined,
idler,
);
// first message is consumed
expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(1);
expect(store.getState().pluginStates[pluginKey].count).toBe(3);
// Select the current plugin as active, still, messages should end up in the queue
store.dispatch(
selectPlugin({
selectedPlugin: TestPlugin.id,
selectedApp: client.id,
deepLinkPayload: null,
}),
);
expect(store.getState().connections.selectedPlugin).toBe('TestPlugin');
sendMessage('inc', {delta: 4});
// should not be processed yet
expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(2);
expect(store.getState().pluginStates[pluginKey].count).toBe(3);
await idler.next();
expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(0);
expect(store.getState().pluginStates[pluginKey].count).toBe(10);
idler.run();
await p;
});
},
);
});
test('queue - processing can be cancelled', async () => {
await createMockFlipperWithPlugin(
TestPlugin,
async ({client, device, store, sendMessage}) => {
await GK.withWhitelistedGK('flipper_event_queue', async () => {
// select a different plugin
store.dispatch(
selectPlugin({
selectedPlugin: 'DeviceLogs',
selectedApp: null,
deepLinkPayload: null,
}),
);
sendMessage('inc', {});
sendMessage('inc', {delta: 2});
sendMessage('inc', {delta: 3});
sendMessage('inc', {delta: 4});
sendMessage('inc', {delta: 5});
// process the message
const pluginKey = getPluginKey(client.id, device, TestPlugin.id);
const idler = new TestIdler();
const p = processMessageQueue(
client,
TestPlugin,
pluginKey,
store,
undefined,
idler,
);
// first message is consumed
await idler.next();
expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(1);
expect(store.getState().pluginStates[pluginKey].count).toBe(10);
idler.cancel();
expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(1);
expect(store.getState().pluginStates[pluginKey].count).toBe(10);
await p;
});
},
);
});
test('queue - make sure resetting plugin state clears the message queue', async () => {
await createMockFlipperWithPlugin(
TestPlugin,
async ({client, device, store, sendMessage}) => {
await GK.withWhitelistedGK('flipper_event_queue', async () => {
// select a different plugin
store.dispatch(
selectPlugin({
selectedPlugin: 'DeviceLogs',
selectedApp: null,
deepLinkPayload: null,
}),
);
sendMessage('inc', {});
sendMessage('inc', {delta: 2});
const pluginKey = getPluginKey(client.id, device, TestPlugin.id);
expect(store.getState().pluginMessageQueue[pluginKey].length).toBe(2);
store.dispatch({
type: 'CLEAR_PLUGIN_STATE',
payload: {clientId: client.id, devicePlugins: new Set()},
});
expect(store.getState().pluginMessageQueue[pluginKey]).toBe(undefined);
});
},
);
});

215
src/utils/messageQueue.tsx Normal file
View File

@@ -0,0 +1,215 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @format
*/
import {PersistedStateReducer} from '../plugin';
import {Store, State} from '../reducers/index';
import {setPluginState} from '../reducers/pluginStates';
import {flipperRecorderAddEvent} from './pluginStateRecorder';
import {
clearMessageQueue,
queueMessage,
Message,
} from '../reducers/pluginMessageQueue';
import {Idler, BaseIdler} from './Idler';
import Client from '../Client';
import {getPluginKey} from './pluginUtils';
const MAX_BACKGROUND_TASK_TIME = 25;
const pluginBackgroundStats = new Map<
string,
{
cpuTime: number; // Total time spend in persisted Reducer
messages: number; // amount of message received for this plugin
maxTime: number; // maximum time spend in a single reducer call
}
>();
if (window) {
// @ts-ignore
window.flipperPrintPluginBackgroundStats = () => {
console.table(
Array.from(pluginBackgroundStats.entries()).map(
([plugin, {cpuTime, messages, maxTime}]) => ({
plugin,
cpuTime,
messages,
maxTime,
}),
),
);
};
}
function addBackgroundStat(plugin: string, cpuTime: number) {
if (!pluginBackgroundStats.has(plugin)) {
pluginBackgroundStats.set(plugin, {cpuTime: 0, messages: 0, maxTime: 0});
}
const stat = pluginBackgroundStats.get(plugin)!;
stat.cpuTime += cpuTime;
stat.messages += 1;
stat.maxTime = Math.max(stat.maxTime, cpuTime);
if (cpuTime > MAX_BACKGROUND_TASK_TIME) {
console.warn(
`Plugin ${plugin} took too much time while doing background: ${cpuTime}ms. Handling background messages should take less than ${MAX_BACKGROUND_TASK_TIME}ms.`,
);
}
}
function processMessage(
state: State,
pluginKey: string,
plugin: {
name: string;
persistedStateReducer: PersistedStateReducer | null;
},
message: {method: string; params?: any},
): State {
const statName = `${plugin.name}.${message.method}`;
const reducerStartTime = Date.now();
flipperRecorderAddEvent(pluginKey, message.method, message.params);
const newPluginState = plugin.persistedStateReducer!(
state,
message.method,
message.params,
);
addBackgroundStat(statName, Date.now() - reducerStartTime);
return newPluginState;
}
export function processMessageImmediately(
store: Store,
pluginKey: string,
plugin: {
defaultPersistedState: any;
name: string;
persistedStateReducer: PersistedStateReducer | null;
},
message: {method: string; params?: any},
) {
const persistedState: any =
store.getState().pluginStates[pluginKey] ??
plugin.defaultPersistedState ??
{};
const newPluginState = processMessage(
persistedState,
pluginKey,
plugin,
message,
);
if (persistedState !== newPluginState) {
store.dispatch(
setPluginState({
pluginKey,
state: newPluginState,
}),
);
}
}
export function processMessageLater(
store: Store,
pluginKey: string,
plugin: {
defaultPersistedState: any;
name: string;
persistedStateReducer: PersistedStateReducer | null;
},
message: {method: string; params?: any},
) {
// TODO: can we make this better?
const selection = store.getState().connections;
const selectedPlugin =
selection.selectedPlugin &&
getPluginKey(
selection.selectedApp,
selection.selectedDevice,
selection.selectedPlugin,
);
// if the plugin is active, and has no queued messaged, process the message immediately
if (
selectedPlugin === pluginKey &&
getMessages(store, pluginKey).length === 0
) {
processMessageImmediately(store, pluginKey, plugin, message);
} else {
// TODO: possible optimization: drop all messages for non-favorited plugins
// TODO: possible optimization: drop messages if queue is too large
store.dispatch(queueMessage(pluginKey, message.method, message.params));
}
}
export async function processMessageQueue(
client: Client,
plugin: {
defaultPersistedState: any;
name: string;
persistedStateReducer: PersistedStateReducer;
},
pluginKey: string,
store: Store,
progressCallback?: (progress: string) => void,
idler: BaseIdler = new Idler(),
) {
const total = getMessages(store, pluginKey).length;
let progress = 0;
do {
const messages = getMessages(store, pluginKey);
if (!messages.length) {
break;
}
// there are messages to process! lets do so until we have to idle
const persistedState =
store.getState().pluginStates[pluginKey] ??
plugin.defaultPersistedState ??
{};
let offset = 0;
let newPluginState = persistedState;
do {
newPluginState = plugin.persistedStateReducer!(
newPluginState,
messages[offset].method,
messages[offset].params,
);
offset++;
progress++;
progressCallback?.(
`Processing events ${progress} / ${Math.max(
total,
progress,
)} (${Math.min(100, 100 * (progress / total))}%)`,
);
} while (offset < messages.length && !idler.shouldIdle());
// save progress
// by writing progress away first and then idling, we make sure this logic is
// resistent to kicking off this process twice; grabbing, processing messages, saving state is done synchronosly
// until the idler has to break
store.dispatch(clearMessageQueue(pluginKey, offset));
if (newPluginState !== persistedState) {
store.dispatch(
setPluginState({
pluginKey,
state: newPluginState,
}),
);
}
if (idler.isCancelled()) {
return;
}
await idler.idle();
// new messages might have arrived, so keep looping
} while (getMessages(store, pluginKey).length);
}
function getMessages(store: Store, pluginKey: string): Message[] {
return store.getState().pluginMessageQueue[pluginKey] || [];
}