Summary:
Added a new type, Tail. It observes changes made to a file and notifies the subscriber.

It is useful and used to tail flipper-server logs, for example.

Reviewed By: passy

Differential Revision: D36473529

fbshipit-source-id: b0f905f4f8419d9e13386440d2d2cdabd528295b
This commit is contained in:
Lorenzo Blasa
2022-05-26 09:44:30 -07:00
committed by Facebook GitHub Bot
parent 41f49c8ad7
commit 49eb026769
3 changed files with 146 additions and 0 deletions

View File

@@ -13,6 +13,7 @@
"eventemitter3": "^4.0.7",
"flipper-common": "0.0.0",
"flipper-plugin": "0.0.0",
"fs-extra": "^10.1.0",
"immer": "^9.0.12",
"js-base64": "^3.7.2",
"p-map": "^5.3.0",

View File

@@ -15,3 +15,4 @@ export * from './globalObject';
export * from './plugins';
export * from './flipperLibImplementation';
export * from './client/FlipperServerClient';
export * from './utils/tail';

View File

@@ -0,0 +1,144 @@
/**
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @format
*/
import EventEmitter from 'eventemitter3';
import fs, {FSWatcher} from 'fs-extra';
import path from 'path';
export type TailOptions = {
separator: RegExp;
encoding: BufferEncoding;
fromBeginning: boolean;
};
type TailBlock = {
start: number;
end: number;
};
export class Tail extends EventEmitter {
absPath: string;
dispatcher = new EventEmitter();
buffer = '';
queue: TailBlock[] = [];
isWatching = false;
currentCursorPosition = 0;
watcher: FSWatcher | undefined = undefined;
constructor(
private readonly filename: string,
private readonly options: TailOptions = {
separator: /[\r]{0,1}\n/,
fromBeginning: true,
encoding: 'utf8',
},
) {
super();
this.absPath = path.dirname(this.filename);
this.dispatcher.on('next', () => {
this.read_();
});
}
async watch() {
if (this.isWatching) {
return;
}
this.isWatching = true;
let startingCursor = 0;
if (!this.options.fromBeginning) {
startingCursor = await this.latestPosition_();
}
this.watch_(startingCursor, this.options.fromBeginning);
}
unwatch() {
if (this.watcher) {
this.watcher.close();
}
this.isWatching = false;
}
private async latestPosition_(): Promise<number> {
return (await fs.stat(this.filename)).size;
}
private read_() {
if (this.queue.length >= 1) {
const block = this.queue[0];
if (block.end > block.start) {
const stream = fs.createReadStream(this.filename, {
start: block.start,
end: block.end - 1,
encoding: this.options.encoding,
});
stream.on('error', (error) => {
this.emit('error', error);
});
stream.on('end', () => {
this.queue.shift();
if (this.queue.length > 0) {
this.dispatcher.emit('next');
}
if (this.buffer.length > 0) {
this.emit('line', this.buffer);
this.buffer = '';
}
});
stream.on('data', (d) => {
this.buffer += d;
const parts = this.buffer.split(this.options.separator);
// The last part may be an incomplete chunk, so
// push that back into the buffer. Otherwise, reset buffer.
this.buffer = parts.pop() || '';
for (const chunk of parts) {
this.emit('line', chunk);
}
});
}
}
}
private async fileUpdated_() {
const latestPosition = await this.latestPosition_();
// Case where text is not appended but it's actually a w+
if (latestPosition < this.currentCursorPosition) {
this.currentCursorPosition = latestPosition;
} else if (latestPosition > this.currentCursorPosition) {
this.queue.push({start: this.currentCursorPosition, end: latestPosition});
this.currentCursorPosition = latestPosition;
// Only emit if the queue was empty and now is not.
if (this.queue.length == 1) {
this.dispatcher.emit('next');
}
}
}
private watch_(cursor: number, readPrevious: boolean) {
this.currentCursorPosition = cursor;
if (readPrevious) {
this.fileUpdated_();
}
this.watcher = fs.watch(this.filename, {}, (eventType, filename) => {
this.watchEvent_(eventType, filename);
});
}
private watchEvent_(eventType: string, _filename: string) {
try {
if (eventType === 'change') {
this.fileUpdated_();
}
} catch (err) {}
}
}