From 49eb02676983a2b480cf69195b86d9a70cf473c2 Mon Sep 17 00:00:00 2001 From: Lorenzo Blasa Date: Thu, 26 May 2022 09:44:30 -0700 Subject: [PATCH] Tail Summary: Added a new type, Tail. It observes changes made to a file and notifies the subscriber. It is useful and used to tail flipper-server logs, for example. Reviewed By: passy Differential Revision: D36473529 fbshipit-source-id: b0f905f4f8419d9e13386440d2d2cdabd528295b --- desktop/flipper-frontend-core/package.json | 1 + desktop/flipper-frontend-core/src/index.tsx | 1 + .../flipper-frontend-core/src/utils/tail.tsx | 144 ++++++++++++++++++ 3 files changed, 146 insertions(+) create mode 100644 desktop/flipper-frontend-core/src/utils/tail.tsx diff --git a/desktop/flipper-frontend-core/package.json b/desktop/flipper-frontend-core/package.json index 5ce9753a5..09cb6ae72 100644 --- a/desktop/flipper-frontend-core/package.json +++ b/desktop/flipper-frontend-core/package.json @@ -13,6 +13,7 @@ "eventemitter3": "^4.0.7", "flipper-common": "0.0.0", "flipper-plugin": "0.0.0", + "fs-extra": "^10.1.0", "immer": "^9.0.12", "js-base64": "^3.7.2", "p-map": "^5.3.0", diff --git a/desktop/flipper-frontend-core/src/index.tsx b/desktop/flipper-frontend-core/src/index.tsx index 2beca1acb..641e8ce9b 100644 --- a/desktop/flipper-frontend-core/src/index.tsx +++ b/desktop/flipper-frontend-core/src/index.tsx @@ -15,3 +15,4 @@ export * from './globalObject'; export * from './plugins'; export * from './flipperLibImplementation'; export * from './client/FlipperServerClient'; +export * from './utils/tail'; diff --git a/desktop/flipper-frontend-core/src/utils/tail.tsx b/desktop/flipper-frontend-core/src/utils/tail.tsx new file mode 100644 index 000000000..a861ad56a --- /dev/null +++ b/desktop/flipper-frontend-core/src/utils/tail.tsx @@ -0,0 +1,144 @@ +/** + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @format + */ + +import EventEmitter from 'eventemitter3'; +import fs, {FSWatcher} from 'fs-extra'; +import path from 'path'; + +export type TailOptions = { + separator: RegExp; + encoding: BufferEncoding; + fromBeginning: boolean; +}; + +type TailBlock = { + start: number; + end: number; +}; + +export class Tail extends EventEmitter { + absPath: string; + dispatcher = new EventEmitter(); + buffer = ''; + queue: TailBlock[] = []; + isWatching = false; + currentCursorPosition = 0; + watcher: FSWatcher | undefined = undefined; + + constructor( + private readonly filename: string, + private readonly options: TailOptions = { + separator: /[\r]{0,1}\n/, + fromBeginning: true, + encoding: 'utf8', + }, + ) { + super(); + + this.absPath = path.dirname(this.filename); + this.dispatcher.on('next', () => { + this.read_(); + }); + } + + async watch() { + if (this.isWatching) { + return; + } + this.isWatching = true; + + let startingCursor = 0; + if (!this.options.fromBeginning) { + startingCursor = await this.latestPosition_(); + } + + this.watch_(startingCursor, this.options.fromBeginning); + } + + unwatch() { + if (this.watcher) { + this.watcher.close(); + } + this.isWatching = false; + } + + private async latestPosition_(): Promise { + return (await fs.stat(this.filename)).size; + } + + private read_() { + if (this.queue.length >= 1) { + const block = this.queue[0]; + if (block.end > block.start) { + const stream = fs.createReadStream(this.filename, { + start: block.start, + end: block.end - 1, + encoding: this.options.encoding, + }); + stream.on('error', (error) => { + this.emit('error', error); + }); + stream.on('end', () => { + this.queue.shift(); + if (this.queue.length > 0) { + this.dispatcher.emit('next'); + } + if (this.buffer.length > 0) { + this.emit('line', this.buffer); + this.buffer = ''; + } + }); + stream.on('data', (d) => { + this.buffer += d; + const parts = this.buffer.split(this.options.separator); + // The last part may be an incomplete chunk, so + // push that back into the buffer. Otherwise, reset buffer. + this.buffer = parts.pop() || ''; + for (const chunk of parts) { + this.emit('line', chunk); + } + }); + } + } + } + + private async fileUpdated_() { + const latestPosition = await this.latestPosition_(); + // Case where text is not appended but it's actually a w+ + if (latestPosition < this.currentCursorPosition) { + this.currentCursorPosition = latestPosition; + } else if (latestPosition > this.currentCursorPosition) { + this.queue.push({start: this.currentCursorPosition, end: latestPosition}); + this.currentCursorPosition = latestPosition; + // Only emit if the queue was empty and now is not. + if (this.queue.length == 1) { + this.dispatcher.emit('next'); + } + } + } + + private watch_(cursor: number, readPrevious: boolean) { + this.currentCursorPosition = cursor; + if (readPrevious) { + this.fileUpdated_(); + } + + this.watcher = fs.watch(this.filename, {}, (eventType, filename) => { + this.watchEvent_(eventType, filename); + }); + } + + private watchEvent_(eventType: string, _filename: string) { + try { + if (eventType === 'change') { + this.fileUpdated_(); + } + } catch (err) {} + } +}