Batch subtree updates sent on the same frame

Summary:
This is needed in preperation for the next diff where we will introduce an observer per litho view. Without batching we end up with really poor performance for a few reasons:
1. There are some operations on the desktop plugin that are o(nodes) so even sending small batches
2. Flipper isnt really a high performance message bus, it seems to prefer fewer larger messages
3. Queuing time on the client builds up as you spend more time waiting on the socket

In a future diff will address:
The name of subtree update event. It should probably be called something like FrameUpdate since they are always full frames
The performance monitoring, will more to timing methods and summing the result rather than the current appraoch of time markers

Reviewed By: lblasa

Differential Revision: D42453229

fbshipit-source-id: eda9830b4420e82874717cc69b241e1689f20029
This commit is contained in:
Luke De Feo
2023-01-25 04:47:11 -08:00
committed by Facebook GitHub Bot
parent 3b65994ca6
commit b5392fb818

View File

@@ -12,6 +12,7 @@ import android.graphics.Bitmap
import android.util.Base64
import android.util.Base64OutputStream
import android.util.Log
import android.view.Choreographer
import com.facebook.flipper.plugins.uidebugger.LogTag
import com.facebook.flipper.plugins.uidebugger.common.BitmapPool
import com.facebook.flipper.plugins.uidebugger.core.Context
@@ -28,8 +29,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.serialization.json.Json
sealed interface Update
data class SubtreeUpdate(
val observerType: String,
val rootId: Id,
@@ -40,17 +39,26 @@ data class SubtreeUpdate(
val snapshot: BitmapPool.ReusableBitmap?
)
data class BatchedUpdate(val updates: List<SubtreeUpdate>, val frameTimeMs: Long)
/** Holds the root observer and manages sending updates to desktop */
class TreeObserverManager(val context: Context) {
private val rootObserver = ApplicationTreeObserver(context)
private lateinit var updates: Channel<SubtreeUpdate>
private lateinit var batchedUpdates: Channel<BatchedUpdate>
private val subtreeUpdateBuffer = SubtreeUpdateBuffer(this::enqueueBatch)
private var job: Job? = null
private val workerScope = CoroutineScope(Dispatchers.IO)
private val txId = AtomicInteger()
fun enqueueUpdate(update: SubtreeUpdate) {
updates.trySend(update)
subtreeUpdateBuffer.bufferUpdate(update)
}
private fun enqueueBatch(batchedUpdate: BatchedUpdate) {
batchedUpdates.trySend(batchedUpdate)
}
/**
@@ -60,15 +68,15 @@ class TreeObserverManager(val context: Context) {
@SuppressLint("NewApi")
fun start() {
updates = Channel(Channel.UNLIMITED)
batchedUpdates = Channel(Channel.UNLIMITED)
rootObserver.subscribe(context.applicationRef)
job =
workerScope.launch {
while (isActive) {
try {
val update = updates.receive()
sendSubtreeUpdate(update)
val update = batchedUpdates.receive()
sendBatchedUpdate(update)
} catch (e: CancellationException) {} catch (e: java.lang.Exception) {
Log.e(LogTag, "Unexpected Error in channel ", e)
}
@@ -77,60 +85,51 @@ class TreeObserverManager(val context: Context) {
}
}
private fun sendMetadata() {
val metadata = MetadataRegister.extractPendingMetadata()
if (metadata.size > 0) {
context.connectionRef.connection?.send(
MetadataUpdateEvent.name,
Json.encodeToString(MetadataUpdateEvent.serializer(), MetadataUpdateEvent(metadata)))
}
fun stop() {
rootObserver.cleanUpRecursive()
job?.cancel()
batchedUpdates.cancel()
}
private fun sendSubtreeUpdate(treeUpdate: SubtreeUpdate) {
val onWorkerThread = System.currentTimeMillis()
val txId = txId.getAndIncrement().toLong()
private fun sendBatchedUpdate(batchedUpdate: BatchedUpdate) {
val serialized: String?
val nodes = treeUpdate.deferredNodes.map { it.value() }
val onWorkerThread = System.currentTimeMillis()
val nodes = batchedUpdate.updates.flatMap { it.deferredNodes.map { it.value() } }
val snapshotUpdate = batchedUpdate.updates.find { it.snapshot != null }
val deferredComptationComplete = System.currentTimeMillis()
// send metadata needs to occur after the deferred metadata extraction since inside the deferred
// computation we may create some fresh metadata
sendMetadata()
if (treeUpdate.snapshot == null) {
serialized =
Json.encodeToString(
SubtreeUpdateEvent.serializer(),
SubtreeUpdateEvent(txId, treeUpdate.observerType, treeUpdate.rootId, nodes))
} else {
var snapshot: String? = null
if (snapshotUpdate?.snapshot != null) {
val stream = ByteArrayOutputStream()
val base64Stream = Base64OutputStream(stream, Base64.DEFAULT)
treeUpdate.snapshot.bitmap?.compress(Bitmap.CompressFormat.PNG, 100, base64Stream)
val snapshot = stream.toString()
serialized =
Json.encodeToString(
SubtreeUpdateEvent.serializer(),
SubtreeUpdateEvent(txId, treeUpdate.observerType, treeUpdate.rootId, nodes, snapshot))
treeUpdate.snapshot.readyForReuse()
snapshotUpdate.snapshot.bitmap?.compress(Bitmap.CompressFormat.PNG, 100, base64Stream)
snapshot = stream.toString()
snapshotUpdate.snapshot.readyForReuse()
}
sendMetadata()
val serialized =
Json.encodeToString(
SubtreeUpdateEvent.serializer(),
SubtreeUpdateEvent(
batchedUpdate.frameTimeMs, "batched", snapshotUpdate?.rootId ?: 1, nodes, snapshot))
val serializationEnd = System.currentTimeMillis()
context.connectionRef.connection?.send(SubtreeUpdateEvent.name, serialized)
val socketEnd = System.currentTimeMillis()
Log.i(
LogTag,
"Sent event for ${treeUpdate.observerType} root ID ${treeUpdate.rootId} nodes ${nodes.size}")
Log.i(LogTag, "Sent event for batched subtree update with nodes with ${nodes.size}")
val perfStats =
PerfStatsEvent(
txId = txId,
observerType = treeUpdate.observerType,
start = treeUpdate.startTime,
traversalComplete = treeUpdate.traversalCompleteTime,
snapshotComplete = treeUpdate.snapshotComplete,
txId = batchedUpdate.frameTimeMs,
observerType = "batched",
start = batchedUpdate.updates.minOf { it.startTime },
traversalComplete = batchedUpdate.updates.maxOf { it.traversalCompleteTime },
snapshotComplete = batchedUpdate.updates.maxOf { it.snapshotComplete },
queuingComplete = onWorkerThread,
deferredComputationComplete = deferredComptationComplete,
serializationComplete = serializationEnd,
@@ -141,9 +140,31 @@ class TreeObserverManager(val context: Context) {
PerfStatsEvent.name, Json.encodeToString(PerfStatsEvent.serializer(), perfStats))
}
fun stop() {
rootObserver.cleanUpRecursive()
job?.cancel()
updates.cancel()
private fun sendMetadata() {
val metadata = MetadataRegister.extractPendingMetadata()
if (metadata.isNotEmpty()) {
context.connectionRef.connection?.send(
MetadataUpdateEvent.name,
Json.encodeToString(MetadataUpdateEvent.serializer(), MetadataUpdateEvent(metadata)))
}
}
}
/** Buffers up subtree updates untill the frame is complete, should only be called on main thread */
private class SubtreeUpdateBuffer(private val onBatchReady: (BatchedUpdate) -> Unit) {
private val bufferedSubtreeUpdates = mutableListOf<SubtreeUpdate>()
fun bufferUpdate(update: SubtreeUpdate) {
if (bufferedSubtreeUpdates.isEmpty()) {
Choreographer.getInstance().postFrameCallback { frameTime ->
val updatesCopy = bufferedSubtreeUpdates.toList()
bufferedSubtreeUpdates.clear()
onBatchReady(BatchedUpdate(updatesCopy, frameTime / 1000000))
}
}
bufferedSubtreeUpdates.add(update)
}
}