batch for more efficient message processing

Summary: `unstablebatched_updates` should be used whenever a non-react originating event might affect multiple components, to make sure that React batches them optimally. Applied it to the most import events that handle incoming device events

Reviewed By: nikoant

Differential Revision: D25052937

fbshipit-source-id: b2c783fb9c43be371553db39969280f9d7c3e260
This commit is contained in:
Michel Weststrate
2020-11-18 08:49:30 -08:00
committed by Facebook GitHub Bot
parent 375a612dff
commit 2e5b52d247
7 changed files with 182 additions and 147 deletions

View File

@@ -391,111 +391,113 @@ export default class Client extends EventEmitter {
return;
}
let rawData;
try {
rawData = JSON.parse(msg);
} catch (err) {
console.error(`Invalid JSON: ${msg}`, 'clientMessage');
return;
}
const data: {
id?: number;
method?: string;
params?: Params;
success?: Object;
error?: ErrorType;
} = rawData;
const {id, method} = data;
if (
data.params?.api != 'flipper-messages' &&
flipperMessagesClientPlugin.isConnected()
) {
flipperMessagesClientPlugin.newMessage({
device: this.deviceSync?.displayTitle(),
app: this.query.app,
flipperInternalMethod: method,
plugin: data.params?.api,
pluginMethod: data.params?.method,
payload: data.params?.params,
direction: 'toFlipper:message',
});
}
if (id == null) {
const {error} = data;
if (error != null) {
console.error(
`Error received from device ${
method ? `when calling ${method}` : ''
}: ${error.message} + \nDevice Stack Trace: ${error.stacktrace}`,
'deviceError',
);
handleError(this.store, this.deviceSync, error);
} else if (method === 'refreshPlugins') {
this.refreshPlugins();
} else if (method === 'execute') {
invariant(data.params, 'expected params');
const params: Params = data.params;
const bytes = msg.length * 2; // string lengths are measured in UTF-16 units (not characters), so 2 bytes per char
emitBytesReceived(params.api, bytes);
const persistingPlugin: PluginDefinition | undefined =
this.store.getState().plugins.clientPlugins.get(params.api) ||
this.store.getState().plugins.devicePlugins.get(params.api);
let handled = false; // This is just for analysis
if (
persistingPlugin &&
((persistingPlugin as any).persistedStateReducer ||
// only send messages to enabled sandy plugins
this.sandyPluginStates.has(params.api))
) {
handled = true;
const pluginKey = getPluginKey(
this.id,
{serial: this.query.device_id},
params.api,
);
if (!this.messageBuffer[pluginKey]) {
this.messageBuffer[pluginKey] = {
plugin: (this.sandyPluginStates.get(params.api) ??
persistingPlugin) as any,
messages: [params],
};
} else {
this.messageBuffer[pluginKey].messages.push(params);
}
this.flushMessageBufferDebounced();
}
const apiCallbacks = this.broadcastCallbacks.get(params.api);
if (apiCallbacks) {
const methodCallbacks = apiCallbacks.get(params.method);
if (methodCallbacks) {
for (const callback of methodCallbacks) {
handled = true;
callback(params.params);
}
}
}
if (!handled) {
console.warn(`Unhandled message ${params.api}.${params.method}`);
}
}
return; // method === 'execute'
}
if (this.sdkVersion < 1) {
const callbacks = this.requestCallbacks.get(id);
if (!callbacks) {
batch(() => {
let rawData;
try {
rawData = JSON.parse(msg);
} catch (err) {
console.error(`Invalid JSON: ${msg}`, 'clientMessage');
return;
}
this.requestCallbacks.delete(id);
this.finishTimingRequestResponse(callbacks.metadata);
this.onResponse(data, callbacks.resolve, callbacks.reject);
}
const data: {
id?: number;
method?: string;
params?: Params;
success?: Object;
error?: ErrorType;
} = rawData;
const {id, method} = data;
if (
data.params?.api != 'flipper-messages' &&
flipperMessagesClientPlugin.isConnected()
) {
flipperMessagesClientPlugin.newMessage({
device: this.deviceSync?.displayTitle(),
app: this.query.app,
flipperInternalMethod: method,
plugin: data.params?.api,
pluginMethod: data.params?.method,
payload: data.params?.params,
direction: 'toFlipper:message',
});
}
if (id == null) {
const {error} = data;
if (error != null) {
console.error(
`Error received from device ${
method ? `when calling ${method}` : ''
}: ${error.message} + \nDevice Stack Trace: ${error.stacktrace}`,
'deviceError',
);
handleError(this.store, this.deviceSync, error);
} else if (method === 'refreshPlugins') {
this.refreshPlugins();
} else if (method === 'execute') {
invariant(data.params, 'expected params');
const params: Params = data.params;
const bytes = msg.length * 2; // string lengths are measured in UTF-16 units (not characters), so 2 bytes per char
emitBytesReceived(params.api, bytes);
const persistingPlugin: PluginDefinition | undefined =
this.store.getState().plugins.clientPlugins.get(params.api) ||
this.store.getState().plugins.devicePlugins.get(params.api);
let handled = false; // This is just for analysis
if (
persistingPlugin &&
((persistingPlugin as any).persistedStateReducer ||
// only send messages to enabled sandy plugins
this.sandyPluginStates.has(params.api))
) {
handled = true;
const pluginKey = getPluginKey(
this.id,
{serial: this.query.device_id},
params.api,
);
if (!this.messageBuffer[pluginKey]) {
this.messageBuffer[pluginKey] = {
plugin: (this.sandyPluginStates.get(params.api) ??
persistingPlugin) as any,
messages: [params],
};
} else {
this.messageBuffer[pluginKey].messages.push(params);
}
this.flushMessageBufferDebounced();
}
const apiCallbacks = this.broadcastCallbacks.get(params.api);
if (apiCallbacks) {
const methodCallbacks = apiCallbacks.get(params.method);
if (methodCallbacks) {
for (const callback of methodCallbacks) {
handled = true;
callback(params.params);
}
}
}
if (!handled) {
console.warn(`Unhandled message ${params.api}.${params.method}`);
}
}
return; // method === 'execute'
}
if (this.sdkVersion < 1) {
const callbacks = this.requestCallbacks.get(id);
if (!callbacks) {
return;
}
this.requestCallbacks.delete(id);
this.finishTimingRequestResponse(callbacks.metadata);
this.onResponse(data, callbacks.resolve, callbacks.reject);
}
});
}
onResponse(

View File

@@ -24,7 +24,7 @@ import {Idler, BaseIdler} from './Idler';
import {pluginIsStarred, getSelectedPluginKey} from '../reducers/connections';
import {deconstructPluginKey} from './clientUtils';
import {defaultEnabledBackgroundPlugins} from './pluginUtils';
import {_SandyPluginInstance} from 'flipper-plugin';
import {batch, _SandyPluginInstance} from 'flipper-plugin';
import {addBackgroundStat} from './pluginStats';
function processMessageClassic(
@@ -187,39 +187,44 @@ export async function processMessageQueue(
: getCurrentPluginState(store, plugin, pluginKey);
let offset = 0;
let newPluginState = persistedState;
do {
if (_SandyPluginInstance.is(plugin)) {
// Optimization: we could send a batch of messages here
processMessagesSandy(pluginKey, plugin, [messages[offset]]);
} else {
newPluginState = processMessageClassic(
newPluginState,
pluginKey,
plugin,
messages[offset],
batch(() => {
do {
if (_SandyPluginInstance.is(plugin)) {
// Optimization: we could send a batch of messages here
processMessagesSandy(pluginKey, plugin, [messages[offset]]);
} else {
newPluginState = processMessageClassic(
newPluginState,
pluginKey,
plugin,
messages[offset],
);
}
offset++;
progress++;
progressCallback?.({
total: Math.max(total, progress),
current: progress,
});
} while (offset < messages.length && !idler.shouldIdle());
// save progress
// by writing progress away first and then idling, we make sure this logic is
// resistent to kicking off this process twice; grabbing, processing messages, saving state is done synchronosly
// until the idler has to break
store.dispatch(clearMessageQueue(pluginKey, offset));
if (
!_SandyPluginInstance.is(plugin) &&
newPluginState !== persistedState
) {
store.dispatch(
setPluginState({
pluginKey,
state: newPluginState,
}),
);
}
offset++;
progress++;
progressCallback?.({
total: Math.max(total, progress),
current: progress,
});
} while (offset < messages.length && !idler.shouldIdle());
// save progress
// by writing progress away first and then idling, we make sure this logic is
// resistent to kicking off this process twice; grabbing, processing messages, saving state is done synchronosly
// until the idler has to break
store.dispatch(clearMessageQueue(pluginKey, offset));
if (!_SandyPluginInstance.is(plugin) && newPluginState !== persistedState) {
store.dispatch(
setPluginState({
pluginKey,
state: newPluginState,
}),
);
}
});
if (idler.isCancelled()) {
return false;