@@ -266,7 +266,7 @@ public actor WebPushManager: Sendable {
266
266
case . httpClient( let httpClient, let privateKeyProvider) :
267
267
var logger = logger ?? backgroundActivityLogger
268
268
logger [ metadataKey: " message " ] = " .data( \( message. base64URLEncodedString ( ) ) ) "
269
- try await execute (
269
+ try await encryptPushMessage (
270
270
httpClient: httpClient,
271
271
privateKeyProvider: privateKeyProvider,
272
272
data: message,
@@ -458,7 +458,7 @@ public actor WebPushManager: Sendable {
458
458
logger [ metadataKey: " message " ] = " \( message) "
459
459
switch executor {
460
460
case . httpClient( let httpClient, let privateKeyProvider) :
461
- try await execute (
461
+ try await encryptPushMessage (
462
462
httpClient: httpClient,
463
463
privateKeyProvider: privateKeyProvider,
464
464
data: message. data,
@@ -482,14 +482,14 @@ public actor WebPushManager: Sendable {
482
482
/// Send a message via HTTP Client, mocked or otherwise, encrypting it on the way.
483
483
/// - Parameters:
484
484
/// - httpClient: The protocol implementing HTTP-like functionality.
485
- /// - applicationServerECDHPrivateKey : The private key to use for the key exchange. If nil, one will be generated.
485
+ /// - privateKeyProvider : The private key to use for the key exchange. If nil, one will be generated.
486
486
/// - message: The message to send as raw data.
487
487
/// - subscriber: The subscriber to sign the message against.
488
488
/// - deduplicationTopic: The topic to use when deduplicating messages stored on a Push Service.
489
489
/// - expiration: The expiration of the message.
490
490
/// - urgency: The urgency of the message.
491
491
/// - logger: The logger to use for status updates.
492
- func execute (
492
+ func encryptPushMessage (
493
493
httpClient: some HTTPClientProtocol ,
494
494
privateKeyProvider: Executor . KeyProvider ,
495
495
data message: some DataProtocol ,
@@ -499,6 +499,9 @@ public actor WebPushManager: Sendable {
499
499
urgency: Urgency ,
500
500
logger: Logger
501
501
) async throws {
502
+ let clock = ContinuousClock ( )
503
+ let startTime = clock. now
504
+
502
505
var logger = logger
503
506
logger [ metadataKey: " subscriber " ] = [
504
507
" vapidKeyID " : " \( subscriber. vapidKeyID) " ,
@@ -508,6 +511,11 @@ public actor WebPushManager: Sendable {
508
511
logger [ metadataKey: " urgency " ] = " \( urgency) "
509
512
logger [ metadataKey: " origin " ] = " \( subscriber. endpoint. origin) "
510
513
logger [ metadataKey: " messageSize " ] = " \( message. count) "
514
+ logger [ metadataKey: " topic " ] = " \( topic? . description ?? " nil " ) "
515
+
516
+ /// Force a random topic so any retries don't get duplicated.
517
+ // let topic = topic ?? Topic()
518
+ // logger[metadataKey: "resolvedTopic"] = "\(topic)"
511
519
logger. trace ( " Sending notification " )
512
520
513
521
guard let signingKey = vapidKeyLookup [ subscriber. vapidKeyID] else {
@@ -589,8 +597,60 @@ public actor WebPushManager: Sendable {
589
597
logger. warning ( " The message expiration should be less than \( Expiration . recommendedMaximum) seconds. " )
590
598
}
591
599
600
+ let expirationDeadline : ContinuousClock . Instant ? = if expiration == . dropIfUndeliverable || expiration == . recommendedMaximum {
601
+ nil
602
+ } else {
603
+ startTime. advanced ( by: . seconds( max ( expiration, . dropIfUndeliverable) . seconds) )
604
+ }
605
+
606
+ let retryDurations : [ Duration ] = [ . milliseconds( 500 ) , . seconds( 2 ) , . seconds( 10 ) ]
607
+
608
+ /// Build and send the request.
609
+ try await executeRequest (
610
+ httpClient: httpClient,
611
+ endpointURLString: subscriber. endpoint. absoluteURL. absoluteString,
612
+ authorization: authorization,
613
+ expiration: expiration,
614
+ urgency: urgency,
615
+ topic: topic,
616
+ requestContent: requestContent,
617
+ clock: clock,
618
+ expirationDeadline: expirationDeadline,
619
+ retryDurations: retryDurations [ ... ] ,
620
+ logger: logger
621
+ )
622
+ }
623
+
624
+ func executeRequest(
625
+ httpClient: some HTTPClientProtocol ,
626
+ endpointURLString: String ,
627
+ authorization: String ,
628
+ expiration: Expiration ,
629
+ urgency: Urgency ,
630
+ topic: Topic ? ,
631
+ requestContent: [ UInt8 ] ,
632
+ clock: ContinuousClock ,
633
+ expirationDeadline: ContinuousClock . Instant ? ,
634
+ retryDurations: ArraySlice < Duration > ,
635
+ logger: Logger
636
+ ) async throws {
637
+ var logger = logger
638
+ logger [ metadataKey: " retryDurationsRemaining " ] = . array( retryDurations. map { " \( $0. components. seconds) seconds " } )
639
+
640
+ var expiration = expiration
641
+ var requestDeadline = NIODeadline . distantFuture
642
+ if let expirationDeadline {
643
+ let remainingDuration = clock. now. duration ( to: expirationDeadline)
644
+ expiration = Expiration ( seconds: Int ( remainingDuration. components. seconds) )
645
+ requestDeadline = . now( ) + TimeAmount( remainingDuration)
646
+ logger [ metadataKey: " resolvedExpiration " ] = " \( expiration) "
647
+ logger [ metadataKey: " expirationDeadline " ] = " \( expirationDeadline) "
648
+ }
649
+
650
+ logger. trace ( " Preparing to send push message. " )
651
+
592
652
/// Add the VAPID authorization and corrent content encoding and type.
593
- var request = HTTPClientRequest ( url: subscriber . endpoint . absoluteURL . absoluteString )
653
+ var request = HTTPClientRequest ( url: endpointURLString )
594
654
request. method = . POST
595
655
request. headers. add ( name: " Authorization " , value: authorization)
596
656
request. headers. add ( name: " Content-Encoding " , value: " aes128gcm " )
@@ -603,10 +663,10 @@ public actor WebPushManager: Sendable {
603
663
request. body = . bytes( ByteBuffer ( bytes: requestContent) )
604
664
605
665
/// Send the request to the push endpoint.
606
- let response = try await httpClient. execute ( request, deadline: . distantFuture , logger: logger)
666
+ let response = try await httpClient. execute ( request, deadline: requestDeadline , logger: logger)
607
667
logger [ metadataKey: " response " ] = " \( response) "
608
668
logger [ metadataKey: " statusCode " ] = " \( response. status) "
609
- logger. trace ( " Sent notification " )
669
+ logger. trace ( " Sent push message. " )
610
670
611
671
/// Check the response and determine if the subscription should be removed from our records, or if the notification should just be skipped.
612
672
switch response. status {
@@ -615,10 +675,31 @@ public actor WebPushManager: Sendable {
615
675
case . payloadTooLarge:
616
676
logger. error ( " The encrypted payload was too large and was rejected by the push service. " )
617
677
throw MessageTooLargeError ( )
618
- // TODO: 429 too many requests, 500 internal server error, 503 server shutting down - check config and perform a retry after a delay?
678
+ case . tooManyRequests, . internalServerError, . serviceUnavailable:
679
+ /// 429 too many requests, 500 internal server error, 503 server shutting down are all opportunities to just retry if we can, otherwise throw the error
680
+ guard let retryDuration = retryDurations. first else {
681
+ logger. trace ( " Message was rejected, no retries remaining. " )
682
+ throw PushServiceError ( response: response)
683
+ }
684
+ logger. trace ( " Message was rejected, but can be retried. " )
685
+
686
+ try await Task . sleep ( for: retryDuration)
687
+ try await executeRequest (
688
+ httpClient: httpClient,
689
+ endpointURLString: endpointURLString,
690
+ authorization: authorization,
691
+ expiration: expiration,
692
+ urgency: urgency,
693
+ topic: topic,
694
+ requestContent: requestContent,
695
+ clock: clock,
696
+ expirationDeadline: expirationDeadline,
697
+ retryDurations: retryDurations. dropFirst ( ) ,
698
+ logger: logger
699
+ )
619
700
default : throw PushServiceError ( response: response)
620
701
}
621
- logger. trace ( " Successfully sent notification " )
702
+ logger. trace ( " Successfully sent push message. " )
622
703
}
623
704
}
624
705
0 commit comments