Support receiving messages in Sandy plugins

Summary: This diffs adds the capability to listen to messages in Sandy plugins. Although API wise it looks more like the old `this.subscribe`, semantically it behaves like the `persistedStateReducer`; messages are queued if the plugin is enabled but not active.

Reviewed By: nikoant

Differential Revision: D22282711

fbshipit-source-id: 885faa702fe779ac8d593c1d224b2be13e688d47
This commit is contained in:
Michel Weststrate
2020-07-01 08:58:40 -07:00
committed by Facebook GitHub Bot
parent 6c79408b0f
commit bb0c8e0df0
8 changed files with 841 additions and 59 deletions

View File

@@ -7,27 +7,27 @@
* @format
*/
import {
PersistedStateReducer,
FlipperDevicePlugin,
isSandyPlugin,
} from '../plugin';
import {PersistedStateReducer, FlipperDevicePlugin} from '../plugin';
import {State, MiddlewareAPI} from '../reducers/index';
import {setPluginState} from '../reducers/pluginStates';
import {flipperRecorderAddEvent} from './pluginStateRecorder';
import {
flipperRecorderAddEvent,
isRecordingEvents,
} from './pluginStateRecorder';
import {
clearMessageQueue,
queueMessages,
Message,
DEFAULT_MAX_QUEUE_SIZE,
} from '../reducers/pluginMessageQueue';
import {Idler, BaseIdler} from './Idler';
import {pluginIsStarred, getSelectedPluginKey} from '../reducers/connections';
import {deconstructPluginKey} from './clientUtils';
import {defaultEnabledBackgroundPlugins} from './pluginUtils';
import {SandyPluginDefinition} from 'flipper-plugin';
import {SandyPluginInstance} from 'flipper-plugin';
import {addBackgroundStat} from './pluginStats';
function processMessage(
function processMessageClassic(
state: State,
pluginKey: string,
plugin: {
@@ -52,28 +52,57 @@ function processMessage(
}
}
function processMessagesSandy(
pluginKey: string,
plugin: SandyPluginInstance,
messages: Message[],
) {
const reducerStartTime = Date.now();
if (isRecordingEvents(pluginKey)) {
messages.forEach((message) => {
flipperRecorderAddEvent(pluginKey, message.method, message.params);
});
}
try {
plugin.receiveMessages(messages);
addBackgroundStat(plugin.definition.id, Date.now() - reducerStartTime);
} catch (e) {
console.error(
`Failed to process event for plugin ${plugin.definition.id}`,
e,
);
}
}
export function processMessagesImmediately(
store: MiddlewareAPI,
pluginKey: string,
plugin: {
defaultPersistedState: any;
id: string;
persistedStateReducer: PersistedStateReducer | null;
},
plugin:
| {
defaultPersistedState: any;
id: string;
persistedStateReducer: PersistedStateReducer | null;
}
| SandyPluginInstance,
messages: Message[],
) {
const persistedState = getCurrentPluginState(store, plugin, pluginKey);
const newPluginState = messages.reduce(
(state, message) => processMessage(state, pluginKey, plugin, message),
persistedState,
);
if (persistedState !== newPluginState) {
store.dispatch(
setPluginState({
pluginKey,
state: newPluginState,
}),
if (plugin instanceof SandyPluginInstance) {
processMessagesSandy(pluginKey, plugin, messages);
} else {
const persistedState = getCurrentPluginState(store, plugin, pluginKey);
const newPluginState = messages.reduce(
(state, message) =>
processMessageClassic(state, pluginKey, plugin, message),
persistedState,
);
if (persistedState !== newPluginState) {
store.dispatch(
setPluginState({
pluginKey,
state: newPluginState,
}),
);
}
}
}
@@ -87,54 +116,61 @@ export function processMessagesLater(
persistedStateReducer: PersistedStateReducer | null;
maxQueueSize?: number;
}
| SandyPluginDefinition,
| SandyPluginInstance,
messages: Message[],
) {
// @ts-ignore
if (isSandyPlugin(plugin)) {
// TODO:
throw new Error(
'Receiving messages is not yet supported for Sandy plugins',
);
}
const pluginId =
plugin instanceof SandyPluginInstance ? plugin.definition.id : plugin.id;
const isSelected =
pluginKey === getSelectedPluginKey(store.getState().connections);
switch (true) {
case plugin.id === 'Navigation': // Navigation events are always processed, to make sure the navbar stays up to date
case pluginId === 'Navigation': // Navigation events are always processed, to make sure the navbar stays up to date
case isSelected && getPendingMessages(store, pluginKey).length === 0:
processMessagesImmediately(store, pluginKey, plugin, messages);
break;
// TODO: support SandyDevicePlugin T68738317
case isSelected:
case plugin instanceof SandyPluginInstance:
case plugin instanceof FlipperDevicePlugin:
case (plugin as any).prototype instanceof FlipperDevicePlugin:
case pluginIsStarred(
store.getState().connections.userStarredPlugins,
deconstructPluginKey(pluginKey).client,
plugin.id,
pluginId,
):
store.dispatch(queueMessages(pluginKey, messages, plugin.maxQueueSize));
store.dispatch(
queueMessages(
pluginKey,
messages,
plugin instanceof SandyPluginInstance
? DEFAULT_MAX_QUEUE_SIZE
: plugin.maxQueueSize,
),
);
break;
default:
// In all other cases, messages will be dropped...
if (!defaultEnabledBackgroundPlugins.includes(plugin.id))
if (!defaultEnabledBackgroundPlugins.includes(pluginId))
console.warn(
`Received message for disabled plugin ${plugin.id}, dropping..`,
`Received message for disabled plugin ${pluginId}, dropping..`,
);
}
}
export async function processMessageQueue(
plugin: {
defaultPersistedState: any;
id: string;
persistedStateReducer: PersistedStateReducer | null;
},
plugin:
| {
defaultPersistedState: any;
id: string;
persistedStateReducer: PersistedStateReducer | null;
}
| SandyPluginInstance,
pluginKey: string,
store: MiddlewareAPI,
progressCallback?: (progress: {current: number; total: number}) => void,
idler: BaseIdler = new Idler(),
): Promise<boolean> {
if (!plugin.persistedStateReducer) {
if (!SandyPluginInstance.is(plugin) && !plugin.persistedStateReducer) {
return true;
}
const total = getPendingMessages(store, pluginKey).length;
@@ -145,16 +181,24 @@ export async function processMessageQueue(
break;
}
// there are messages to process! lets do so until we have to idle
const persistedState = getCurrentPluginState(store, plugin, pluginKey);
// persistedState is irrelevant for SandyPlugins, as they store state locally
const persistedState = SandyPluginInstance.is(plugin)
? undefined
: getCurrentPluginState(store, plugin, pluginKey);
let offset = 0;
let newPluginState = persistedState;
do {
newPluginState = processMessage(
newPluginState,
pluginKey,
plugin,
messages[offset],
);
if (SandyPluginInstance.is(plugin)) {
// Optimization: we could send a batch of messages here
processMessagesSandy(pluginKey, plugin, [messages[offset]]);
} else {
newPluginState = processMessageClassic(
newPluginState,
pluginKey,
plugin,
messages[offset],
);
}
offset++;
progress++;
@@ -168,7 +212,7 @@ export async function processMessageQueue(
// resistent to kicking off this process twice; grabbing, processing messages, saving state is done synchronosly
// until the idler has to break
store.dispatch(clearMessageQueue(pluginKey, offset));
if (newPluginState !== persistedState) {
if (!SandyPluginInstance.is(plugin) && newPluginState !== persistedState) {
store.dispatch(
setPluginState({
pluginKey,