@@ -6,7 +6,7 @@ import Queue from 'p-queue'
6
6
import { createTopology } from '@libp2p/topology'
7
7
import { codes } from './errors.js'
8
8
import { PeerStreams as PeerStreamsImpl } from './peer-streams.js'
9
- import { toMessage , ensureArray , randomSeqno , noSignMsgId , msgId , toRpcMessage } from './utils.js'
9
+ import { toMessage , ensureArray , noSignMsgId , msgId , toRpcMessage , randomSeqno } from './utils.js'
10
10
import {
11
11
signMessage ,
12
12
verifySignature
@@ -17,6 +17,7 @@ import type { Connection } from '@libp2p/interface-connection'
17
17
import type { PubSub , Message , StrictNoSign , StrictSign , PubSubInit , PubSubEvents , PeerStreams , PubSubRPCMessage , PubSubRPC , PubSubRPCSubscription , SubscriptionChangeData , PublishResult } from '@libp2p/interface-pubsub'
18
18
import { PeerMap , PeerSet } from '@libp2p/peer-collections'
19
19
import { Components , Initializable } from '@libp2p/components'
20
+ import type { Uint8ArrayList } from 'uint8arraylist'
20
21
21
22
const log = logger ( 'libp2p:pubsub' )
22
23
@@ -284,7 +285,7 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
284
285
/**
285
286
* Responsible for processing each RPC message received by other peers.
286
287
*/
287
- async processMessages ( peerId : PeerId , stream : AsyncIterable < Uint8Array > , peerStreams : PeerStreams ) {
288
+ async processMessages ( peerId : PeerId , stream : AsyncIterable < Uint8ArrayList > , peerStreams : PeerStreams ) {
288
289
try {
289
290
await pipe (
290
291
stream ,
@@ -446,6 +447,10 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
446
447
const signaturePolicy = this . globalSignaturePolicy
447
448
switch ( signaturePolicy ) {
448
449
case 'StrictSign' :
450
+ if ( msg . type !== 'signed' ) {
451
+ throw errcode ( new Error ( 'Message type should be "signed" when signature policy is StrictSign but it was not' ) , codes . ERR_MISSING_SIGNATURE )
452
+ }
453
+
449
454
if ( msg . sequenceNumber == null ) {
450
455
throw errcode ( new Error ( 'Need seqno when signature policy is StrictSign but it was missing' ) , codes . ERR_MISSING_SEQNO )
451
456
}
@@ -474,19 +479,19 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
474
479
* Decode Uint8Array into an RPC object.
475
480
* This can be override to use a custom router protobuf.
476
481
*/
477
- abstract decodeRpc ( bytes : Uint8Array ) : PubSubRPC
482
+ abstract decodeRpc ( bytes : Uint8Array | Uint8ArrayList ) : PubSubRPC
478
483
479
484
/**
480
485
* Encode RPC object into a Uint8Array.
481
486
* This can be override to use a custom router protobuf.
482
487
*/
483
- abstract encodeRpc ( rpc : PubSubRPC ) : Uint8Array
488
+ abstract encodeRpc ( rpc : PubSubRPC ) : Uint8ArrayList
484
489
485
490
/**
486
491
* Encode RPC object into a Uint8Array.
487
492
* This can be override to use a custom router protobuf.
488
493
*/
489
- abstract encodeMessage ( rpc : PubSubRPCMessage ) : Uint8Array
494
+ abstract encodeMessage ( rpc : PubSubRPCMessage ) : Uint8ArrayList
490
495
491
496
/**
492
497
* Send an rpc object to a peer
@@ -523,26 +528,42 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
523
528
const signaturePolicy = this . globalSignaturePolicy
524
529
switch ( signaturePolicy ) {
525
530
case 'StrictNoSign' :
531
+ if ( message . type !== 'unsigned' ) {
532
+ throw errcode ( new Error ( 'Message type should be "unsigned" when signature policy is StrictNoSign but it was not' ) , codes . ERR_MISSING_SIGNATURE )
533
+ }
534
+
535
+ // @ts -expect-error should not be present
526
536
if ( message . signature != null ) {
527
537
throw errcode ( new Error ( 'StrictNoSigning: signature should not be present' ) , codes . ERR_UNEXPECTED_SIGNATURE )
528
538
}
539
+
540
+ // @ts -expect-error should not be present
529
541
if ( message . key != null ) {
530
542
throw errcode ( new Error ( 'StrictNoSigning: key should not be present' ) , codes . ERR_UNEXPECTED_KEY )
531
543
}
544
+
545
+ // @ts -expect-error should not be present
532
546
if ( message . sequenceNumber != null ) {
533
547
throw errcode ( new Error ( 'StrictNoSigning: seqno should not be present' ) , codes . ERR_UNEXPECTED_SEQNO )
534
548
}
535
549
break
536
550
case 'StrictSign' :
551
+ if ( message . type !== 'signed' ) {
552
+ throw errcode ( new Error ( 'Message type should be "signed" when signature policy is StrictSign but it was not' ) , codes . ERR_MISSING_SIGNATURE )
553
+ }
554
+
537
555
if ( message . signature == null ) {
538
556
throw errcode ( new Error ( 'StrictSigning: Signing required and no signature was present' ) , codes . ERR_MISSING_SIGNATURE )
539
557
}
558
+
540
559
if ( message . sequenceNumber == null ) {
541
- throw errcode ( new Error ( 'StrictSigning: Signing required and no seqno was present' ) , codes . ERR_MISSING_SEQNO )
560
+ throw errcode ( new Error ( 'StrictSigning: Signing required and no sequenceNumber was present' ) , codes . ERR_MISSING_SEQNO )
542
561
}
562
+
543
563
if ( ! ( await verifySignature ( message , this . encodeMessage . bind ( this ) ) ) ) {
544
564
throw errcode ( new Error ( 'StrictSigning: Invalid message signature' ) , codes . ERR_INVALID_SIGNATURE )
545
565
}
566
+
546
567
break
547
568
default :
548
569
throw errcode ( new Error ( 'Cannot validate message: unhandled signature policy' ) , codes . ERR_UNHANDLED_SIGNATURE_POLICY )
@@ -559,14 +580,16 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
559
580
* Normalizes the message and signs it, if signing is enabled.
560
581
* Should be used by the routers to create the message to send.
561
582
*/
562
- async buildMessage ( message : Message ) {
583
+ async buildMessage ( message : { from : PeerId , topic : string , data : Uint8Array , sequenceNumber : bigint } ) : Promise < Message > {
563
584
const signaturePolicy = this . globalSignaturePolicy
564
585
switch ( signaturePolicy ) {
565
586
case 'StrictSign' :
566
- message . sequenceNumber = randomSeqno ( )
567
587
return await signMessage ( this . components . getPeerId ( ) , message , this . encodeMessage . bind ( this ) )
568
588
case 'StrictNoSign' :
569
- return await Promise . resolve ( message )
589
+ return await Promise . resolve ( {
590
+ type : 'unsigned' ,
591
+ ...message
592
+ } )
570
593
default :
571
594
throw errcode ( new Error ( 'Cannot build message: unhandled signature policy' ) , codes . ERR_UNHANDLED_SIGNATURE_POLICY )
572
595
}
@@ -603,10 +626,11 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
603
626
throw new Error ( 'Pubsub has not started' )
604
627
}
605
628
606
- const message : Message = {
629
+ const message = {
607
630
from : this . components . getPeerId ( ) ,
608
631
topic,
609
- data : data ?? new Uint8Array ( 0 )
632
+ data : data ?? new Uint8Array ( 0 ) ,
633
+ sequenceNumber : randomSeqno ( )
610
634
}
611
635
612
636
log ( 'publish topic: %s from: %p data: %m' , topic , message . from , message . data )
0 commit comments