Skip to content

Commit 173ecaf

Browse files
glbrnttpinlin168
authored andcommitted
Don't ack pings twice (grpc#1534)
Motivation: gRPC Swift is emitting two acks per ping. NIOHTTP2 is emitting one and the keepalive handler is emitting the other. Modifications: - Don't emit ping acks from the keep alive handler; just let the H2 handler do it. Result: - No unnecessary ping acks are emitted. - Resolves grpc#1520
1 parent d0d1f52 commit 173ecaf

File tree

3 files changed

+106
-34
lines changed

3 files changed

+106
-34
lines changed

Sources/GRPC/GRPCIdleHandler.swift

+4
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,10 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
184184
case .none:
185185
()
186186

187+
case .ack:
188+
// NIO's HTTP2 handler acks for us so this is a no-op.
189+
()
190+
187191
case .cancelScheduledTimeout:
188192
self.scheduledClose?.cancel()
189193
self.scheduledClose = nil

Sources/GRPC/GRPCKeepaliveHandlers.swift

+6-6
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ struct PingHandler {
9090

9191
enum Action {
9292
case none
93+
case ack
9394
case schedulePing(delay: TimeAmount, timeout: TimeAmount)
9495
case cancelScheduledTimeout
9596
case reply(HTTP2Frame.FramePayload)
@@ -170,35 +171,34 @@ struct PingHandler {
170171
// This is a valid ping, reset our strike count and reply with a pong.
171172
self.pingStrikes = 0
172173
self.lastReceivedPingDate = self.now()
173-
return .reply(self.generatePingFrame(data: pingData, ack: true))
174+
return .ack
174175
}
175176
} else {
176177
// We don't support ping strikes. We'll just reply with a pong.
177178
//
178179
// Note: we don't need to update `pingStrikes` or `lastReceivedPingDate` as we don't
179180
// support ping strikes.
180-
return .reply(self.generatePingFrame(data: pingData, ack: true))
181+
return .ack
181182
}
182183
}
183184

184185
mutating func pingFired() -> Action {
185186
if self.shouldBlockPing {
186187
return .none
187188
} else {
188-
return .reply(self.generatePingFrame(data: self.pingData, ack: false))
189+
return .reply(self.generatePingFrame(data: self.pingData))
189190
}
190191
}
191192

192193
private mutating func generatePingFrame(
193-
data: HTTP2PingData,
194-
ack: Bool
194+
data: HTTP2PingData
195195
) -> HTTP2Frame.FramePayload {
196196
if self.activeStreams == 0 {
197197
self.sentPingsWithoutData += 1
198198
}
199199

200200
self.lastSentPingDate = self.now()
201-
return HTTP2Frame.FramePayload.ping(data, ack: ack)
201+
return HTTP2Frame.FramePayload.ping(data, ack: false)
202202
}
203203

204204
/// Returns true if, on receipt of a ping, the ping should be regarded as a ping strike.

Tests/GRPCTests/GRPCPingHandlerTests.swift

+96-28
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
@testable import GRPC
1717
import NIOCore
18+
import NIOEmbedded
1819
import NIOHTTP2
1920
import XCTest
2021

@@ -249,35 +250,23 @@ class GRPCPingHandlerTests: GRPCTestCase {
249250
pingData: HTTP2PingData(withInteger: 1),
250251
ack: false
251252
)
252-
XCTAssertEqual(
253-
response,
254-
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
255-
)
253+
XCTAssertEqual(response, .ack)
256254

257255
// Received another ping, response should be a pong (ping strikes not in effect)
258256
response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
259-
XCTAssertEqual(
260-
response,
261-
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
262-
)
257+
XCTAssertEqual(response, .ack)
263258

264259
// Received another ping, response should be a pong (ping strikes not in effect)
265260
response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
266-
XCTAssertEqual(
267-
response,
268-
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
269-
)
261+
XCTAssertEqual(response, .ack)
270262
}
271263

272264
func testPingWithoutDataResultsInPongForClient() {
273265
// Don't allow _sending_ pings when no calls are active (receiving pings should be tolerated).
274266
self.setupPingHandler(permitWithoutCalls: false)
275267

276268
let action = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
277-
XCTAssertEqual(
278-
action,
279-
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
280-
)
269+
XCTAssertEqual(action, .ack)
281270
}
282271

283272
func testPingWithoutDataResultsInPongForServer() {
@@ -291,10 +280,7 @@ class GRPCPingHandlerTests: GRPCTestCase {
291280
)
292281

293282
let action = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
294-
XCTAssertEqual(
295-
action,
296-
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
297-
)
283+
XCTAssertEqual(action, .ack)
298284
}
299285

300286
func testPingStrikesOnServer() {
@@ -312,10 +298,7 @@ class GRPCPingHandlerTests: GRPCTestCase {
312298
pingData: HTTP2PingData(withInteger: 1),
313299
ack: false
314300
)
315-
XCTAssertEqual(
316-
response,
317-
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
318-
)
301+
XCTAssertEqual(response, .ack)
319302

320303
// Received another ping, which is invalid (ping strike), response should be no action
321304
response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
@@ -326,10 +309,7 @@ class GRPCPingHandlerTests: GRPCTestCase {
326309

327310
// Received another ping, which is valid now, response should be a pong
328311
response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
329-
XCTAssertEqual(
330-
response,
331-
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
332-
)
312+
XCTAssertEqual(response, .ack)
333313

334314
// Received another ping, which is invalid (ping strike), response should be no action
335315
response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
@@ -381,6 +361,8 @@ extension PingHandler.Action: Equatable {
381361
switch (lhs, rhs) {
382362
case (.none, .none):
383363
return true
364+
case (.ack, .ack):
365+
return true
384366
case (let .schedulePing(lhsDelay, lhsTimeout), let .schedulePing(rhsDelay, rhsTimeout)):
385367
return lhsDelay == rhsDelay && lhsTimeout == rhsTimeout
386368
case (.cancelScheduledTimeout, .cancelScheduledTimeout):
@@ -401,3 +383,89 @@ extension PingHandler.Action: Equatable {
401383
}
402384
}
403385
}
386+
387+
extension GRPCPingHandlerTests {
388+
func testSingleAckIsEmittedOnPing() throws {
389+
let client = EmbeddedChannel()
390+
let _ = try client.configureHTTP2Pipeline(mode: .client) { _ in
391+
fatalError("Unexpected inbound stream")
392+
}.wait()
393+
394+
let server = EmbeddedChannel()
395+
let serverMux = try server.configureHTTP2Pipeline(mode: .server) { _ in
396+
fatalError("Unexpected inbound stream")
397+
}.wait()
398+
399+
let idleHandler = GRPCIdleHandler(
400+
idleTimeout: .minutes(5),
401+
keepalive: .init(),
402+
logger: self.serverLogger
403+
)
404+
try server.pipeline.syncOperations.addHandler(idleHandler, position: .before(serverMux))
405+
try server.connect(to: .init(unixDomainSocketPath: "/ignored")).wait()
406+
try client.connect(to: .init(unixDomainSocketPath: "/ignored")).wait()
407+
408+
func interact(client: EmbeddedChannel, server: EmbeddedChannel) throws {
409+
var didRead = true
410+
while didRead {
411+
didRead = false
412+
413+
if let data = try client.readOutbound(as: ByteBuffer.self) {
414+
didRead = true
415+
try server.writeInbound(data)
416+
}
417+
418+
if let data = try server.readOutbound(as: ByteBuffer.self) {
419+
didRead = true
420+
try client.writeInbound(data)
421+
}
422+
}
423+
}
424+
425+
try interact(client: client, server: server)
426+
427+
// Settings.
428+
let f1 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self))
429+
f1.payload.assertSettings(ack: false)
430+
431+
// Settings ack.
432+
let f2 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self))
433+
f2.payload.assertSettings(ack: true)
434+
435+
// Send a ping.
436+
let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(.init(withInteger: 42), ack: false))
437+
try client.writeOutbound(ping)
438+
try interact(client: client, server: server)
439+
440+
// Ping ack.
441+
let f3 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self))
442+
f3.payload.assertPing(ack: true)
443+
444+
XCTAssertNil(try client.readInbound(as: HTTP2Frame.self))
445+
}
446+
}
447+
448+
extension HTTP2Frame.FramePayload {
449+
func assertSettings(ack: Bool, file: StaticString = #file, line: UInt = #line) {
450+
switch self {
451+
case let .settings(settings):
452+
switch settings {
453+
case .ack:
454+
XCTAssertTrue(ack, file: file, line: line)
455+
case .settings:
456+
XCTAssertFalse(ack, file: file, line: line)
457+
}
458+
default:
459+
XCTFail("Expected .settings got \(self)", file: file, line: line)
460+
}
461+
}
462+
463+
func assertPing(ack: Bool, file: StaticString = #file, line: UInt = #line) {
464+
switch self {
465+
case let .ping(_, ack: pingAck):
466+
XCTAssertEqual(pingAck, ack, file: file, line: line)
467+
default:
468+
XCTFail("Expected .ping got \(self)", file: file, line: line)
469+
}
470+
}
471+
}

0 commit comments

Comments
 (0)