@@ -529,29 +529,13 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
529
529
// Feed the request message into the state machine:
530
530
let result = self . stateMachine. sendRequest (
531
531
request. message,
532
- compressed: request. compressed
532
+ compressed: request. compressed,
533
+ promise: promise
533
534
)
534
- switch result {
535
- case let . success( ( buffer, maybeBuffer) ) :
536
- let frame1 = HTTP2Frame . FramePayload. data ( . init( data: . byteBuffer( buffer) ) )
537
- self . logger. trace ( " writing HTTP2 frame " , metadata: [
538
- MetadataKey . h2Payload: " DATA " ,
539
- MetadataKey . h2DataBytes: " \( buffer. readableBytes) " ,
540
- MetadataKey . h2EndStream: " false " ,
541
- ] )
542
- // If there's a second buffer, attach the promise to the second write.
543
- let promise1 = maybeBuffer == nil ? promise : nil
544
- context. write ( self . wrapOutboundOut ( frame1) , promise: promise1)
545
535
546
- if let actuallyBuffer = maybeBuffer {
547
- let frame2 = HTTP2Frame . FramePayload. data ( . init( data: . byteBuffer( actuallyBuffer) ) )
548
- self . logger. trace ( " writing HTTP2 frame " , metadata: [
549
- MetadataKey . h2Payload: " DATA " ,
550
- MetadataKey . h2DataBytes: " \( actuallyBuffer. readableBytes) " ,
551
- MetadataKey . h2EndStream: " false " ,
552
- ] )
553
- context. write ( self . wrapOutboundOut ( frame2) , promise: promise)
554
- }
536
+ switch result {
537
+ case . success:
538
+ ( )
555
539
556
540
case let . failure( writeError) :
557
541
switch writeError {
@@ -572,13 +556,37 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
572
556
}
573
557
574
558
case . end:
559
+ // About to send end: write any outbound messages first.
560
+ while let ( result, promise) = self . stateMachine. nextRequest ( ) {
561
+ switch result {
562
+ case let . success( buffer) :
563
+ let framePayload : HTTP2Frame . FramePayload = . data(
564
+ . init( data: . byteBuffer( buffer) , endStream: false )
565
+ )
566
+
567
+ self . logger. trace ( " writing HTTP2 frame " , metadata: [
568
+ MetadataKey . h2Payload: " DATA " ,
569
+ MetadataKey . h2DataBytes: " \( buffer. readableBytes) " ,
570
+ MetadataKey . h2EndStream: " false " ,
571
+ ] )
572
+ context. write ( self . wrapOutboundOut ( framePayload) , promise: promise)
573
+
574
+ case let . failure( error) :
575
+ context. fireErrorCaught ( error)
576
+ promise? . fail ( error)
577
+ return
578
+ }
579
+ }
580
+
575
581
// Okay: can we close the request stream?
576
582
switch self . stateMachine. sendEndOfRequestStream ( ) {
577
583
case . success:
578
584
// We can. Send an empty DATA frame with end-stream set.
579
585
let empty = context. channel. allocator. buffer ( capacity: 0 )
580
- let framePayload = HTTP2Frame . FramePayload
581
- . data ( . init( data: . byteBuffer( empty) , endStream: true ) )
586
+ let framePayload : HTTP2Frame . FramePayload = . data(
587
+ . init( data: . byteBuffer( empty) , endStream: true )
588
+ )
589
+
582
590
self . logger. trace ( " writing HTTP2 frame " , metadata: [
583
591
MetadataKey . h2Payload: " DATA " ,
584
592
MetadataKey . h2DataBytes: " 0 " ,
@@ -605,4 +613,30 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
605
613
}
606
614
}
607
615
}
616
+
617
+ func flush( context: ChannelHandlerContext ) {
618
+ // Drain any requests.
619
+ while let ( result, promise) = self . stateMachine. nextRequest ( ) {
620
+ switch result {
621
+ case let . success( buffer) :
622
+ let framePayload : HTTP2Frame . FramePayload = . data(
623
+ . init( data: . byteBuffer( buffer) , endStream: false )
624
+ )
625
+
626
+ self . logger. trace ( " writing HTTP2 frame " , metadata: [
627
+ MetadataKey . h2Payload: " DATA " ,
628
+ MetadataKey . h2DataBytes: " \( buffer. readableBytes) " ,
629
+ MetadataKey . h2EndStream: " false " ,
630
+ ] )
631
+ context. write ( self . wrapOutboundOut ( framePayload) , promise: promise)
632
+
633
+ case let . failure( error) :
634
+ context. fireErrorCaught ( error)
635
+ promise? . fail ( error)
636
+ return
637
+ }
638
+ }
639
+
640
+ context. flush ( )
641
+ }
608
642
}
0 commit comments