-
Notifications
You must be signed in to change notification settings - Fork 301
/
Copy pathMuxedStream.cs
104 lines (91 loc) · 3.22 KB
/
MuxedStream.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
namespace k8s
{
/// <summary>
/// A <see cref="Stream"/> which reads/writes from a specific channel using a <see cref="StreamDemuxer" />.
/// </summary>
public class MuxedStream : Stream
{
private readonly ByteBuffer inputBuffer;
private readonly byte? outputIndex;
private readonly StreamDemuxer muxer;
/// <summary>
/// Initializes a new instance of the <see cref="MuxedStream"/> class.
/// </summary>
/// <param name="muxer">
/// The <see cref="StreamDemuxer"/> to use to read from/write to the underlying stream.
/// </param>
/// <param name="inputBuffer">
/// The <see cref="inputBuffer"/> to read from.
/// </param>
/// <param name="outputIndex">
/// The index of the channel to which to write.
/// </param>
public MuxedStream(StreamDemuxer muxer, ByteBuffer inputBuffer, byte? outputIndex)
{
this.inputBuffer = inputBuffer;
this.outputIndex = outputIndex;
if (this.inputBuffer == null && outputIndex == null)
{
throw new ArgumentException("You must specify at least inputBuffer or outputIndex");
}
if (outputIndex != null)
{
this.muxer = muxer ?? throw new ArgumentNullException(nameof(muxer));
}
}
/// <inheritdoc/>
public override bool CanRead => inputBuffer != null;
/// <inheritdoc/>
public override bool CanSeek => false;
/// <inheritdoc/>
public override bool CanWrite => outputIndex != null;
/// <inheritdoc/>
public override long Length => throw new NotSupportedException();
/// <inheritdoc/>
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc/>
public override void Write(byte[] buffer, int offset, int count)
{
if (outputIndex == null)
{
throw new InvalidOperationException();
}
else
{
muxer.Write(outputIndex.Value, buffer, offset, count).GetAwaiter().GetResult();
}
}
/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count)
{
if (inputBuffer == null)
{
throw new InvalidOperationException();
}
else
{
return inputBuffer.Read(buffer, offset, count);
}
}
/// <inheritdoc/>
public override void Flush()
{
// Whenever we call muxer.Write, a message is immediately sent over the wire, so we don't need/support flushing.
// Implement flushing as a no-op operation as opposed to throwing a NotSupportedException.
}
/// <inheritdoc/>
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
/// <inheritdoc/>
public override void SetLength(long value)
{
throw new NotSupportedException();
}
}
}