-
Notifications
You must be signed in to change notification settings - Fork 301
/
Copy pathByteBuffer.cs
335 lines (292 loc) · 11.1 KB
/
ByteBuffer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
using System.Buffers;
using System.Diagnostics;
namespace k8s
{
// There may be already an async implementation that we can use:
// https://github.com/StephenCleary/AsyncEx/wiki/AsyncProducerConsumerQueue
// However, they focus on individual objects and may not be a good choice for use with fixed-with byte buffers
/// <summary>
/// Represents a bounded buffer. A dedicated thread can send bytes to this buffer (the producer); while another thread can
/// read bytes from this buffer (the consumer).
/// </summary>
/// <remarks>
/// This is a producer-consumer problem (or bounded-buffer problem), see https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
/// </remarks>
public class ByteBuffer : IDisposable
{
private const int DefaultBufferSize = 4 * 1024; // 4 KB
private const int DefaultMaximumSize = 40 * 1024 * 1024; // 40 MB
private readonly int maximumSize;
private readonly AutoResetEvent dataAvailable = new AutoResetEvent(false);
private readonly object lockObject = new object();
private byte[] buffer;
private int bytesWritten;
private int bytesRead;
/// <summary>
/// Used by a writer to indicate the end of the file. When set, the reader will be notified that no
/// more data is available.
/// </summary>
private bool endOfFile;
private bool disposedValue;
/// <summary>
/// Initializes a new instance of the <see cref="ByteBuffer"/> class using the default buffer size and limit.
/// </summary>
public ByteBuffer()
: this(DefaultBufferSize, DefaultMaximumSize)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="ByteBuffer"/> class.
/// </summary>
/// <param name="bufferSize">
/// The initial buffer size.
/// </param>
/// <param name="maximumSize">
/// The maximum buffer size.
/// </param>
public ByteBuffer(int bufferSize, int maximumSize)
{
this.maximumSize = maximumSize;
buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
endOfFile = false;
}
/// <summary>
/// Gets the current buffer size.
/// </summary>
public int Size
{
get { return buffer.Length; }
}
/// <summary>
/// Gets the maximum allowed size of the buffer.
/// </summary>
public int MaximumSize
{
get { return maximumSize; }
}
/// <summary>
/// Gets the offset from which the next byte will be read. Increased every time a caller reads data.
/// </summary>
public int ReadWaterMark { get; private set; }
/// <summary>
/// Gets the offset to which the next byte will be written. Increased every time a caller writes data.
/// </summary>
public int WriteWaterMark { get; private set; }
/// <summary>
/// Gets the amount of bytes availble for reading.
/// </summary>
public int AvailableReadableBytes
{
get
{
lock (lockObject)
{
if (ReadWaterMark == WriteWaterMark)
{
return 0;
}
else if (ReadWaterMark < WriteWaterMark)
{
return WriteWaterMark - ReadWaterMark;
}
else
{
return
// Bytes available at the end of the array
buffer.Length - ReadWaterMark
// Bytes available at the start of the array
+ WriteWaterMark;
}
}
}
}
/// <summary>
/// Gets the amount of bytes available for writing.
/// </summary>
public int AvailableWritableBytes
{
get
{
lock (lockObject)
{
if (WriteWaterMark > ReadWaterMark)
{
return
/* Available bytes at the end of the buffer */
buffer.Length - WriteWaterMark
/* Available bytes at the start of the buffer */
+ ReadWaterMark;
}
else if (WriteWaterMark == ReadWaterMark)
{
return buffer.Length;
}
else
{
return ReadWaterMark - WriteWaterMark;
}
}
}
}
/// <summary>
/// Writes bytes to the buffer.
/// </summary>
/// <param name="data">
/// The source byte array from which to read the bytes.
/// </param>
/// <param name="offset">
/// The offset of the first byte to copy.
/// </param>
/// <param name="length">
/// The amount of bytes to copy.
/// </param>
public void Write(byte[] data, int offset, int length)
{
lock (lockObject)
{
// Does the data fit?
// We must make sure that ReadWaterMark != WriteWaterMark; that would indicate 'all data has been read' instead
// instead of 'all data must be read'
if (AvailableWritableBytes <= length)
{
// Grow the buffer
Grow(buffer.Length + length - AvailableWritableBytes + 1);
}
// Write the data; first the data that fits between the write watermark and the end of the buffer
var availableBeforeWrapping = buffer.Length - WriteWaterMark;
Array.Copy(data, offset, buffer, WriteWaterMark, Math.Min(availableBeforeWrapping, length));
WriteWaterMark += Math.Min(availableBeforeWrapping, length);
if (length > availableBeforeWrapping)
{
Array.Copy(data, offset + availableBeforeWrapping, buffer, 0,
length - availableBeforeWrapping);
WriteWaterMark = length - availableBeforeWrapping;
}
bytesWritten += length;
Debug.Assert(bytesRead + AvailableReadableBytes == bytesWritten);
}
dataAvailable.Set();
}
/// <summary>
/// Stops writing data to the buffer, indicating that the end of file has been reached.
/// </summary>
public void WriteEnd()
{
lock (lockObject)
{
endOfFile = true;
dataAvailable.Set();
}
}
/// <summary>
/// Reads bytes from the buffer.
/// </summary>
/// <param name="data">
/// The byte array into which to read the data.
/// </param>
/// <param name="offset">
/// The offset at which to start writing the bytes.
/// </param>
/// <param name="count">
/// The amount of bytes to read.
/// </param>
/// <returns>
/// The total number of bytes read.
/// </returns>
public int Read(byte[] data, int offset, int count)
{
while (AvailableReadableBytes == 0 && !endOfFile)
{
dataAvailable.WaitOne();
}
var toRead = 0;
lock (lockObject)
{
// Signal the end of file to the caller.
if (AvailableReadableBytes == 0 && endOfFile)
{
return 0;
}
toRead = Math.Min(AvailableReadableBytes, count);
var availableBeforeWrapping = buffer.Length - ReadWaterMark;
Array.Copy(buffer, ReadWaterMark, data, offset, Math.Min(availableBeforeWrapping, toRead));
ReadWaterMark += Math.Min(availableBeforeWrapping, toRead);
if (toRead > availableBeforeWrapping)
{
Array.Copy(buffer, 0, data, offset + availableBeforeWrapping,
toRead - availableBeforeWrapping);
ReadWaterMark = toRead - availableBeforeWrapping;
}
bytesRead += toRead;
Debug.Assert(bytesRead + AvailableReadableBytes == bytesWritten);
}
return toRead;
}
/// <summary>
/// The event which is raised when the buffer is resized.
/// </summary>
public event EventHandler OnResize;
/// <summary>
/// Increases the buffer size. Any call to this method must be protected with a lock.
/// </summary>
/// <param name="size">
/// The new buffer size.
/// </param>
private void Grow(int size)
{
if (size > maximumSize)
{
throw new OutOfMemoryException();
}
var newBuffer = ArrayPool<byte>.Shared.Rent(size);
if (WriteWaterMark < ReadWaterMark)
{
// Copy the data at the start
Array.Copy(buffer, 0, newBuffer, 0, WriteWaterMark);
var trailingDataLength = buffer.Length - ReadWaterMark;
Array.Copy(
buffer,
ReadWaterMark,
newBuffer,
newBuffer.Length - trailingDataLength,
trailingDataLength);
ReadWaterMark += newBuffer.Length - buffer.Length;
}
else
{
// ... [Read WM] ... [Write WM] ... [newly available space]
Array.Copy(buffer, 0, newBuffer, 0, buffer.Length);
}
ArrayPool<byte>.Shared.Return(buffer);
buffer = newBuffer;
Debug.Assert(bytesRead + AvailableReadableBytes == bytesWritten);
OnResize?.Invoke(this, EventArgs.Empty);
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
ArrayPool<byte>.Shared.Return(buffer);
dataAvailable.Dispose();
}
// TODO: free unmanaged resources (unmanaged objects) and override finalizer
// TODO: set large fields to null
disposedValue = true;
}
}
// // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
// ~ByteBuffer()
// {
// // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
// Dispose(disposing: false);
// }
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(true);
GC.SuppressFinalize(this);
}
}
}