-
Notifications
You must be signed in to change notification settings - Fork 68
/
Copy pathTestEventStreamReader.ts
90 lines (80 loc) · 3.06 KB
/
TestEventStreamReader.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//===----------------------------------------------------------------------===//
//
// This source file is part of the VS Code Swift open source project
//
// Copyright (c) 2024 Apple Inc. and the VS Code Swift project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of VS Code Swift project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import * as fs from "fs";
import * as net from "net";
import { Readable } from "stream";
export interface INamedPipeReader {
start(readable: Readable): Promise<void>;
}
/**
* Reads from a named pipe on Windows and forwards data to a `Readable` stream.
* Note that the path must be in the Windows named pipe format of `\\.\pipe\pipename`.
*/
export class WindowsNamedPipeReader implements INamedPipeReader {
constructor(private path: string) {}
public async start(readable: Readable) {
return new Promise<void>((resolve, reject) => {
try {
const server = net.createServer(function (stream) {
stream.on("data", data => readable.push(data));
stream.on("error", () => server.close());
stream.on("end", function () {
readable.push(null);
server.close();
});
});
server.listen(this.path, () => resolve());
} catch (error) {
reject(error);
}
});
}
}
/**
* Reads from a unix FIFO pipe and forwards data to a `Readable` stream.
* Note that the pipe at the supplied path should be created with `mkfifo`
* before calling `start()`.
*/
export class UnixNamedPipeReader implements INamedPipeReader {
constructor(private path: string) {}
public async start(readable: Readable) {
return new Promise<void>((resolve, reject) => {
fs.open(this.path, fs.constants.O_RDONLY, (err, fd) => {
if (err) {
return reject(err);
}
try {
// Create our own readable stream that handles backpressure.
// Using a net.Socket to read the pipe has an 8kb internal buffer,
// meaning we couldn't read from writes that were > 8kb.
const pipe = fs.createReadStream("", { fd });
pipe.on("data", data => {
if (!readable.push(data)) {
pipe.pause();
}
});
readable.on("drain", () => pipe.resume());
pipe.on("error", () => pipe.close());
pipe.on("end", () => {
readable.push(null);
fs.close(fd);
});
resolve();
} catch (error) {
fs.close(fd, () => reject(error));
}
});
});
}
}