@@ -86,7 +86,7 @@ public async Task SetupAsync()
86
86
}
87
87
88
88
// Process the SETTINGS frame. This will send an ACK.
89
- await ProcessSettingsFrame ( frameHeader ) . ConfigureAwait ( false ) ;
89
+ ProcessSettingsFrame ( frameHeader ) ;
90
90
91
91
ProcessIncomingFrames ( ) ;
92
92
}
@@ -108,8 +108,19 @@ private async Task FlushOutgoingBytesAsync()
108
108
{
109
109
Debug . Assert ( _outgoingBuffer . ActiveSpan . Length > 0 ) ;
110
110
111
- await SendFramesAsync ( _outgoingBuffer . ActiveMemory ) . ConfigureAwait ( false ) ;
112
- _outgoingBuffer . Discard ( _outgoingBuffer . ActiveMemory . Length ) ;
111
+ try
112
+ {
113
+ await _stream . WriteAsync ( _outgoingBuffer . ActiveMemory ) . ConfigureAwait ( false ) ;
114
+ }
115
+ catch ( Exception )
116
+ {
117
+ Abort ( ) ;
118
+ throw ;
119
+ }
120
+ finally
121
+ {
122
+ _outgoingBuffer . Discard ( _outgoingBuffer . ActiveMemory . Length ) ;
123
+ }
113
124
}
114
125
115
126
private async Task < FrameHeader > ReadFrameAsync ( )
@@ -145,19 +156,19 @@ private async void ProcessIncomingFrames()
145
156
break ;
146
157
147
158
case FrameType . Data :
148
- await ProcessDataFrame ( frameHeader ) . ConfigureAwait ( false ) ;
159
+ ProcessDataFrame ( frameHeader ) ;
149
160
break ;
150
161
151
162
case FrameType . Settings :
152
- await ProcessSettingsFrame ( frameHeader ) . ConfigureAwait ( false ) ;
163
+ ProcessSettingsFrame ( frameHeader ) ;
153
164
break ;
154
165
155
166
case FrameType . Priority :
156
167
ProcessPriorityFrame ( frameHeader ) ;
157
168
break ;
158
169
159
170
case FrameType . Ping :
160
- await ProcessPingFrame ( frameHeader ) . ConfigureAwait ( false ) ;
171
+ ProcessPingFrame ( frameHeader ) ;
161
172
break ;
162
173
163
174
case FrameType . WindowUpdate :
@@ -181,7 +192,7 @@ private async void ProcessIncomingFrames()
181
192
}
182
193
catch ( Exception )
183
194
{
184
- AbortStreams ( 0 ) ;
195
+ Abort ( ) ;
185
196
}
186
197
}
187
198
@@ -205,19 +216,6 @@ private Http2Stream GetStream(int streamId)
205
216
}
206
217
}
207
218
208
- private async Task SendRstStreamFrameAsync ( int streamId , Http2ProtocolErrorCode errorCode )
209
- {
210
- WriteFrameHeader ( new FrameHeader ( FrameHeader . RstStreamLength , FrameType . RstStream , FrameFlags . None , streamId ) ) ;
211
-
212
- _outgoingBuffer . AvailableSpan [ 0 ] = ( byte ) ( ( ( int ) errorCode & 0xFF000000 ) >> 24 ) ;
213
- _outgoingBuffer . AvailableSpan [ 1 ] = ( byte ) ( ( ( int ) errorCode & 0x00FF0000 ) >> 16 ) ;
214
- _outgoingBuffer . AvailableSpan [ 2 ] = ( byte ) ( ( ( int ) errorCode & 0x0000FF00 ) >> 8 ) ;
215
- _outgoingBuffer . AvailableSpan [ 3 ] = ( byte ) ( ( int ) errorCode & 0x000000FF ) ;
216
- _outgoingBuffer . Commit ( FrameHeader . RstStreamLength ) ;
217
-
218
- await FlushOutgoingBytesAsync ( ) . ConfigureAwait ( false ) ;
219
- }
220
-
221
219
private async Task ProcessHeadersFrame ( FrameHeader frameHeader )
222
220
{
223
221
Debug . Assert ( frameHeader . Type == FrameType . Headers ) ;
@@ -229,7 +227,9 @@ private async Task ProcessHeadersFrame(FrameHeader frameHeader)
229
227
if ( http2Stream == null )
230
228
{
231
229
_incomingBuffer . Discard ( frameHeader . Length ) ;
232
- await SendRstStreamFrameAsync ( streamId , Http2ProtocolErrorCode . StreamClosed ) ;
230
+
231
+ // Don't wait for completion, which could happen asynchronously.
232
+ ValueTask ignored = SendRstStreamAsync ( streamId , Http2ProtocolErrorCode . StreamClosed ) ;
233
233
return ;
234
234
}
235
235
@@ -297,15 +297,18 @@ private ReadOnlySpan<byte> GetFrameData(ReadOnlySpan<byte> frameData, bool hasPa
297
297
return frameData ;
298
298
}
299
299
300
- private Task ProcessDataFrame ( FrameHeader frameHeader )
300
+ private void ProcessDataFrame ( FrameHeader frameHeader )
301
301
{
302
302
Debug . Assert ( frameHeader . Type == FrameType . Data ) ;
303
303
304
304
Http2Stream http2Stream = GetStream ( frameHeader . StreamId ) ;
305
305
if ( http2Stream == null )
306
306
{
307
307
_incomingBuffer . Discard ( frameHeader . Length ) ;
308
- return SendRstStreamFrameAsync ( frameHeader . StreamId , Http2ProtocolErrorCode . StreamClosed ) ;
308
+
309
+ // Don't wait for completion, which could happen asynchronously.
310
+ ValueTask ignored = SendRstStreamAsync ( frameHeader . StreamId , Http2ProtocolErrorCode . StreamClosed ) ;
311
+ return ;
309
312
}
310
313
311
314
ReadOnlySpan < byte > frameData = GetFrameData ( _incomingBuffer . ActiveSpan . Slice ( 0 , frameHeader . Length ) , hasPad : frameHeader . PaddedFlag , hasPriority : false ) ;
@@ -320,11 +323,9 @@ private Task ProcessDataFrame(FrameHeader frameHeader)
320
323
{
321
324
RemoveStream ( http2Stream ) ;
322
325
}
323
-
324
- return Task . CompletedTask ;
325
326
}
326
327
327
- private async Task ProcessSettingsFrame ( FrameHeader frameHeader )
328
+ private void ProcessSettingsFrame ( FrameHeader frameHeader )
328
329
{
329
330
Debug . Assert ( frameHeader . Type == FrameType . Settings ) ;
330
331
@@ -363,8 +364,8 @@ private async Task ProcessSettingsFrame(FrameHeader frameHeader)
363
364
_incomingBuffer . Discard ( frameHeader . Length ) ;
364
365
365
366
// Send acknowledgement
366
- WriteFrameHeader ( new FrameHeader ( 0 , FrameType . Settings , FrameFlags . Ack , 0 ) ) ;
367
- await FlushOutgoingBytesAsync ( ) . ConfigureAwait ( false ) ;
367
+ // Don't wait for completion, which could happen asynchronously.
368
+ ValueTask ignored = SendSettingsAckAsync ( ) ;
368
369
}
369
370
}
370
371
@@ -382,7 +383,7 @@ private void ProcessPriorityFrame(FrameHeader frameHeader)
382
383
_incomingBuffer . Discard ( frameHeader . Length ) ;
383
384
}
384
385
385
- private async Task ProcessPingFrame ( FrameHeader frameHeader )
386
+ private void ProcessPingFrame ( FrameHeader frameHeader )
386
387
{
387
388
Debug . Assert ( frameHeader . Type == FrameType . Ping ) ;
388
389
@@ -403,11 +404,8 @@ private async Task ProcessPingFrame(FrameHeader frameHeader)
403
404
}
404
405
405
406
// Send PING ACK
406
- WriteFrameHeader ( new FrameHeader ( FrameHeader . PingLength , FrameType . Ping , FrameFlags . Ack , 0 ) ) ;
407
- _outgoingBuffer . EnsureAvailableSpace ( FrameHeader . PingLength ) ;
408
- _incomingBuffer . ActiveSpan . Slice ( 0 , FrameHeader . PingLength ) . CopyTo ( _outgoingBuffer . AvailableSpan ) ;
409
- _outgoingBuffer . Commit ( FrameHeader . PingLength ) ;
410
- await FlushOutgoingBytesAsync ( ) . ConfigureAwait ( false ) ;
407
+ // Don't wait for completion, which could happen asynchronously.
408
+ ValueTask ignored = SendPingAckAsync ( _incomingBuffer . ActiveMemory . Slice ( 0 , FrameHeader . PingLength ) ) ;
411
409
412
410
_incomingBuffer . Discard ( frameHeader . Length ) ;
413
411
}
@@ -480,13 +478,148 @@ private void ProcessGoAwayFrame(FrameHeader frameHeader)
480
478
_incomingBuffer . Discard ( frameHeader . Length ) ;
481
479
}
482
480
481
+ private async ValueTask AcquireWriteLockAsync ( )
482
+ {
483
+ await _writerLock . WaitAsync ( ) . ConfigureAwait ( false ) ;
484
+
485
+ // If the connection has been aborted, then fail now instead of trying to send more data.
486
+ if ( IsAborted ( ) )
487
+ {
488
+ throw new IOException ( SR . net_http_invalid_response ) ;
489
+ }
490
+ }
491
+
492
+ private void ReleaseWriteLock ( )
493
+ {
494
+ // Currently, we always flush the write buffer before releasing the lock.
495
+ // If we change this in the future, we will need to revisit this assert.
496
+ Debug . Assert ( _outgoingBuffer . ActiveMemory . IsEmpty ) ;
497
+
498
+ _writerLock . Release ( ) ;
499
+ }
500
+
501
+ private async ValueTask SendSettingsAckAsync ( )
502
+ {
503
+ await AcquireWriteLockAsync ( ) . ConfigureAwait ( false ) ;
504
+ try
505
+ {
506
+ WriteFrameHeader ( new FrameHeader ( 0 , FrameType . Settings , FrameFlags . Ack , 0 ) ) ;
507
+
508
+ await FlushOutgoingBytesAsync ( ) . ConfigureAwait ( false ) ;
509
+ }
510
+ finally
511
+ {
512
+ ReleaseWriteLock ( ) ;
513
+ }
514
+ }
515
+
516
+ private async ValueTask SendPingAckAsync ( ReadOnlyMemory < byte > pingContent )
517
+ {
518
+ Debug . Assert ( pingContent . Length == FrameHeader . PingLength ) ;
519
+
520
+ await AcquireWriteLockAsync ( ) . ConfigureAwait ( false ) ;
521
+ try
522
+ {
523
+ WriteFrameHeader ( new FrameHeader ( FrameHeader . PingLength , FrameType . Ping , FrameFlags . Ack , 0 ) ) ;
524
+ _outgoingBuffer . EnsureAvailableSpace ( FrameHeader . PingLength ) ;
525
+ pingContent . CopyTo ( _outgoingBuffer . AvailableMemory ) ;
526
+ _outgoingBuffer . Commit ( FrameHeader . PingLength ) ;
527
+
528
+ await FlushOutgoingBytesAsync ( ) . ConfigureAwait ( false ) ;
529
+ }
530
+ finally
531
+ {
532
+ ReleaseWriteLock ( ) ;
533
+ }
534
+ }
535
+
536
+ private async ValueTask SendRstStreamAsync ( int streamId , Http2ProtocolErrorCode errorCode )
537
+ {
538
+ await AcquireWriteLockAsync ( ) . ConfigureAwait ( false ) ;
539
+ try
540
+ {
541
+ WriteFrameHeader ( new FrameHeader ( FrameHeader . RstStreamLength , FrameType . RstStream , FrameFlags . None , streamId ) ) ;
542
+
543
+ _outgoingBuffer . EnsureAvailableSpace ( FrameHeader . RstStreamLength ) ;
544
+ _outgoingBuffer . AvailableSpan [ 0 ] = ( byte ) ( ( ( int ) errorCode & 0xFF000000 ) >> 24 ) ;
545
+ _outgoingBuffer . AvailableSpan [ 1 ] = ( byte ) ( ( ( int ) errorCode & 0x00FF0000 ) >> 16 ) ;
546
+ _outgoingBuffer . AvailableSpan [ 2 ] = ( byte ) ( ( ( int ) errorCode & 0x0000FF00 ) >> 8 ) ;
547
+ _outgoingBuffer . AvailableSpan [ 3 ] = ( byte ) ( ( int ) errorCode & 0x000000FF ) ;
548
+ _outgoingBuffer . Commit ( FrameHeader . RstStreamLength ) ;
549
+
550
+ await FlushOutgoingBytesAsync ( ) . ConfigureAwait ( false ) ;
551
+ }
552
+ finally
553
+ {
554
+ ReleaseWriteLock ( ) ;
555
+ }
556
+ }
557
+
558
+ private ( ReadOnlyMemory < byte > first , ReadOnlyMemory < byte > rest ) SplitBufferForFraming ( ReadOnlyMemory < byte > buffer ) =>
559
+ buffer . Length > FrameHeader . MaxLength ?
560
+ ( buffer . Slice ( 0 , FrameHeader . MaxLength ) , buffer . Slice ( FrameHeader . MaxLength ) ) :
561
+ ( buffer , Memory < byte > . Empty ) ;
562
+
563
+ private async ValueTask SendStreamDataAsync ( int streamId , ReadOnlyMemory < byte > buffer )
564
+ {
565
+ ReadOnlyMemory < byte > remaining = buffer ;
566
+
567
+ while ( remaining . Length > 0 )
568
+ {
569
+ ReadOnlyMemory < byte > current ;
570
+ ( current , remaining ) = SplitBufferForFraming ( remaining ) ;
571
+
572
+ await AcquireWriteLockAsync ( ) . ConfigureAwait ( false ) ;
573
+ try
574
+ {
575
+ _outgoingBuffer . EnsureAvailableSpace ( FrameHeader . Size + current . Length ) ;
576
+ WriteFrameHeader ( new FrameHeader ( current . Length , FrameType . Data , FrameFlags . None , streamId ) ) ;
577
+ current . CopyTo ( _outgoingBuffer . AvailableMemory ) ;
578
+ _outgoingBuffer . Commit ( current . Length ) ;
579
+
580
+ await FlushOutgoingBytesAsync ( ) . ConfigureAwait ( false ) ;
581
+ }
582
+ finally
583
+ {
584
+ ReleaseWriteLock ( ) ;
585
+ }
586
+ }
587
+ }
588
+
589
+ private async ValueTask SendEndStreamAsync ( int streamId )
590
+ {
591
+ await AcquireWriteLockAsync ( ) . ConfigureAwait ( false ) ;
592
+ try
593
+ {
594
+ _outgoingBuffer . EnsureAvailableSpace ( FrameHeader . Size ) ;
595
+ WriteFrameHeader ( new FrameHeader ( 0 , FrameType . Data , FrameFlags . EndStream , streamId ) ) ;
596
+ await FlushOutgoingBytesAsync ( ) . ConfigureAwait ( false ) ;
597
+ }
598
+ finally
599
+ {
600
+ ReleaseWriteLock ( ) ;
601
+ }
602
+ }
603
+
483
604
private void WriteFrameHeader ( FrameHeader frameHeader )
484
605
{
485
606
_outgoingBuffer . EnsureAvailableSpace ( FrameHeader . Size ) ;
486
607
frameHeader . WriteTo ( _outgoingBuffer . AvailableSpan ) ;
487
608
_outgoingBuffer . Commit ( FrameHeader . Size ) ;
488
609
}
489
610
611
+ private void Abort ( )
612
+ {
613
+ // The connection has failed, e.g. failed IO or a connection-level frame error.
614
+ // Abort all streams and cause further processing to fail.
615
+ AbortStreams ( 0 ) ;
616
+ }
617
+
618
+ private bool IsAborted ( )
619
+ {
620
+ return _disposed ;
621
+ }
622
+
490
623
private void AbortStreams ( int lastValidStream )
491
624
{
492
625
lock ( _syncObject )
0 commit comments