Fix flipper connect / disconnect cycle

Summary: Previously we were cancelling the entire context which meant after reconnect nothing was sent. Additionally we now close / reinitiaze the channel so that any old events are not sent on reconnect

Reviewed By: lblasa

Differential Revision: D39658945

fbshipit-source-id: bb02724434aa820d811b49ab799a4643ab7e785a
This commit is contained in:
Luke De Feo
2022-09-21 07:02:48 -07:00
committed by Facebook GitHub Bot
parent cb75be7495
commit 46824f3369
2 changed files with 43 additions and 38 deletions

View File

@@ -46,6 +46,7 @@ class UIDebuggerFlipperPlugin(
@Throws(Exception::class) @Throws(Exception::class)
override fun onConnect(connection: FlipperConnection) { override fun onConnect(connection: FlipperConnection) {
Log.i(LogTag, "Connected")
this.context.connectionRef.connection = connection this.context.connectionRef.connection = connection
val rootDescriptor = val rootDescriptor =
@@ -57,15 +58,14 @@ class UIDebuggerFlipperPlugin(
InitEvent.serializer(), InitEvent(rootDescriptor.getId(context.applicationRef)))) InitEvent.serializer(), InitEvent(rootDescriptor.getId(context.applicationRef))))
context.treeObserverManager.start() context.treeObserverManager.start()
// nativeScanScheduler.start()
} }
@Throws(Exception::class) @Throws(Exception::class)
override fun onDisconnect() { override fun onDisconnect() {
this.context.connectionRef.connection = null this.context.connectionRef.connection = null
Log.e(LogTag, "disconnected") Log.i(LogTag, "Disconnected")
this.nativeScanScheduler.stop() context.treeObserverManager.stop()
} }
override fun runInBackground(): Boolean { override fun runInBackground(): Boolean {

View File

@@ -29,7 +29,8 @@ data class SubtreeUpdate(
class TreeObserverManager(val context: Context) { class TreeObserverManager(val context: Context) {
private val rootObserver = ApplicationTreeObserver(context) private val rootObserver = ApplicationTreeObserver(context)
private val treeUpdates = Channel<SubtreeUpdate>(Channel.UNLIMITED) private lateinit var treeUpdates: Channel<SubtreeUpdate>
private var job: Job? = null
private val workerScope = CoroutineScope(Dispatchers.IO) private val workerScope = CoroutineScope(Dispatchers.IO)
private val txId = AtomicInteger() private val txId = AtomicInteger()
@@ -43,52 +44,56 @@ class TreeObserverManager(val context: Context) {
*/ */
fun start() { fun start() {
treeUpdates = Channel(Channel.UNLIMITED)
rootObserver.subscribe(context.applicationRef) rootObserver.subscribe(context.applicationRef)
workerScope.launch { job =
while (isActive) { workerScope.launch {
try { while (isActive) {
try {
val treeUpdate = treeUpdates.receive() val treeUpdate = treeUpdates.receive()
val onWorkerThread = System.currentTimeMillis() val onWorkerThread = System.currentTimeMillis()
val txId = txId.getAndIncrement().toLong() val txId = txId.getAndIncrement().toLong()
val serialized = val serialized =
Json.encodeToString( Json.encodeToString(
SubtreeUpdateEvent.serializer(), SubtreeUpdateEvent.serializer(),
SubtreeUpdateEvent(txId, treeUpdate.observerType, treeUpdate.nodes)) SubtreeUpdateEvent(txId, treeUpdate.observerType, treeUpdate.nodes))
val serializationEnd = System.currentTimeMillis() val serializationEnd = System.currentTimeMillis()
context.connectionRef.connection?.send(SubtreeUpdateEvent.name, serialized) context.connectionRef.connection?.send(SubtreeUpdateEvent.name, serialized)
val socketEnd = System.currentTimeMillis() val socketEnd = System.currentTimeMillis()
Log.i(LogTag, "Sent event for ${treeUpdate.observerType} nodes ${treeUpdate.nodes.size}") Log.i(
LogTag,
"Sent event for ${treeUpdate.observerType} nodes ${treeUpdate.nodes.size}")
val perfStats = val perfStats =
PerfStatsEvent( PerfStatsEvent(
txId = txId, txId = txId,
observerType = treeUpdate.observerType, observerType = treeUpdate.observerType,
start = treeUpdate.startTime, start = treeUpdate.startTime,
traversalComplete = treeUpdate.traversalCompleteTime, traversalComplete = treeUpdate.traversalCompleteTime,
queuingComplete = onWorkerThread, queuingComplete = onWorkerThread,
serializationComplete = serializationEnd, serializationComplete = serializationEnd,
socketComplete = socketEnd, socketComplete = socketEnd,
nodesCount = treeUpdate.nodes.size) nodesCount = treeUpdate.nodes.size)
context.connectionRef.connection?.send( context.connectionRef.connection?.send(
PerfStatsEvent.name, Json.encodeToString(PerfStatsEvent.serializer(), perfStats)) PerfStatsEvent.name, Json.encodeToString(PerfStatsEvent.serializer(), perfStats))
} catch (e: java.lang.Exception) { } catch (e: CancellationException) {} catch (e: java.lang.Exception) {
Log.e(LogTag, "Error in channel ", e) Log.e(LogTag, "Unexpected Error in channel ", e)
}
}
Log.i(LogTag, "Shutting down worker")
} }
}
Log.i(LogTag, "shutting down worker")
}
} }
fun stop() { fun stop() {
rootObserver.cleanUpRecursive() rootObserver.cleanUpRecursive()
treeUpdates.close() job?.cancel()
workerScope.cancel() treeUpdates.cancel()
} }
} }