Skip to content

Commit a95720c

Browse files
fix: Close stream after sink (libp2p#23)
* Close stream after sink * Update src/index.ts Co-authored-by: Alex Potsides <[email protected]> * Update src/index.ts Co-authored-by: Alex Potsides <[email protected]> * Update src/index.ts Co-authored-by: Alex Potsides <[email protected]> * Update test/browser.ts Co-authored-by: Alex Potsides <[email protected]> * Update test/browser.ts Co-authored-by: Alex Potsides <[email protected]> * Set type for err * Don't use errCode * Wait for pong before closing write side * Update go-libp2p for tests * Undo implementing ping again * go mod tidy * Fix go.mod - don't reuire newer quic-go * Revert "Undo implementing ping again" This reverts commit e67b51f4b3bd33f876cb3f987b6498138b532cc4. * Update comment * Make aegir lint happy Co-authored-by: Alex Potsides <[email protected]>
1 parent 97de951 commit a95720c

File tree

5 files changed

+120
-18
lines changed

5 files changed

+120
-18
lines changed

go-libp2p-webtransport-server/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ module github.com/libp2p/js-libp2p-webtransport/go-libp2p-webtransport-server/m/
33
go 1.19
44

55
require (
6-
github.com/libp2p/go-libp2p v0.23.2
7-
github.com/multiformats/go-multiaddr v0.7.0
6+
github.com/libp2p/go-libp2p v0.23.4
7+
github.com/multiformats/go-multiaddr v0.8.0
88
)
99

1010
require (

go-libp2p-webtransport-server/go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,8 @@ github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38y
252252
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
253253
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
254254
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
255-
github.com/libp2p/go-libp2p v0.23.2 h1:yqyTeKQJyofWXxEv/eEVUvOrGdt/9x+0PIQ4N1kaxmE=
256-
github.com/libp2p/go-libp2p v0.23.2/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI=
255+
github.com/libp2p/go-libp2p v0.23.4 h1:hWi9XHSOVFR1oDWRk7rigfyA4XNMuYL20INNybP9LP8=
256+
github.com/libp2p/go-libp2p v0.23.4/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI=
257257
github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/1q+8WeNAsw=
258258
github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI=
259259
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
@@ -320,8 +320,8 @@ github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ8
320320
github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM=
321321
github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo=
322322
github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4=
323-
github.com/multiformats/go-multiaddr v0.7.0 h1:gskHcdaCyPtp9XskVwtvEeQOG465sCohbQIirSyqxrc=
324-
github.com/multiformats/go-multiaddr v0.7.0/go.mod h1:Fs50eBDWvZu+l3/9S6xAE7ZYj6yhxlvaVZjakWN7xRs=
323+
github.com/multiformats/go-multiaddr v0.8.0 h1:aqjksEcqK+iD/Foe1RRFsGZh8+XFiGo7FgUCZlpv3LU=
324+
github.com/multiformats/go-multiaddr v0.8.0/go.mod h1:Fs50eBDWvZu+l3/9S6xAE7ZYj6yhxlvaVZjakWN7xRs=
325325
github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2y0Mkk+QRVJbW3A=
326326
github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk=
327327
github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E=

go-libp2p-webtransport-server/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package main
22

33
import (
44
"fmt"
5+
"io"
56
"os"
67
"os/signal"
78
"time"
89

910
"github.com/libp2p/go-libp2p"
11+
"github.com/libp2p/go-libp2p/core/network"
1012
webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
1113
"github.com/multiformats/go-multiaddr"
1214
)
@@ -22,6 +24,11 @@ func main() {
2224
panic(err)
2325
}
2426

27+
h.SetStreamHandler("echo", func(s network.Stream) {
28+
io.Copy(s, s)
29+
s.Close()
30+
})
31+
2532
for _, a := range h.Addrs() {
2633
withP2p := a.Encapsulate(multiaddr.StringCast("/p2p/" + h.ID().String()))
2734
fmt.Printf("addr=%s\n", withP2p.String())

src/index.ts

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import type { StreamMuxerFactory, StreamMuxerInit, StreamMuxer } from '@libp2p/i
1212
import { Uint8ArrayList } from 'uint8arraylist'
1313

1414
const log = logger('libp2p:webtransport')
15-
1615
declare global {
1716
interface Window {
1817
WebTransport: any
@@ -63,7 +62,13 @@ async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string
6362
let writerClosed = false
6463
let readerClosed = false;
6564
(async function () {
66-
await (writer.closed.then((ok: any) => ({ ok })).catch((err: any) => ({ err })))
65+
const err: Error | undefined = await writer.closed.catch((err: Error) => err)
66+
if (err != null) {
67+
const msg = err.message
68+
if (!(msg.includes('aborted by the remote server') || msg.includes('STOP_SENDING'))) {
69+
log.error(`WebTransport writer closed unexpectedly: streamId=${streamId} err=${err.message}`)
70+
}
71+
}
6772
writerClosed = true
6873
if (writerClosed && readerClosed) {
6974
cleanupStreamFromActiveStreams()
@@ -73,7 +78,10 @@ async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string
7378
});
7479

7580
(async function () {
76-
await (reader.closed.then((ok: any) => ({ ok })).catch((err: any) => ({ err })))
81+
const err: Error | undefined = await reader.closed.catch((err: Error) => err)
82+
if (err != null) {
83+
log.error(`WebTransport reader closed unexpectedly: streamId=${streamId} err=${err.message}`)
84+
}
7785
readerClosed = true
7886
if (writerClosed && readerClosed) {
7987
cleanupStreamFromActiveStreams()
@@ -82,6 +90,7 @@ async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string
8290
log.error('WebTransport failed to cleanup closed stream')
8391
})
8492

93+
let sinkSunk = false
8594
const stream: Stream = {
8695
id: streamId,
8796
abort (_err: Error) {
@@ -148,14 +157,22 @@ async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string
148157
}
149158
})(),
150159
sink: async function (source: Source<Uint8Array | Uint8ArrayList>) {
151-
for await (const chunks of source) {
152-
if (chunks.constructor === Uint8Array) {
153-
await writer.write(chunks)
154-
} else {
155-
for (const buf of chunks) {
156-
await writer.write(buf)
160+
if (sinkSunk) {
161+
throw new Error('sink already called on stream')
162+
}
163+
sinkSunk = true
164+
try {
165+
for await (const chunks of source) {
166+
if (chunks instanceof Uint8Array) {
167+
await writer.write(chunks)
168+
} else {
169+
for (const buf of chunks) {
170+
await writer.write(buf)
171+
}
157172
}
158173
}
174+
} finally {
175+
stream.closeWrite()
159176
}
160177
}
161178
}

test/browser.ts

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,46 @@ describe('libp2p-webtransport', () => {
2424
})
2525

2626
await node.start()
27-
const res = await node.ping(ma)
28-
console.log('Ping ', res)
29-
expect(res).to.greaterThan(0)
27+
28+
// Ping many times
29+
for (let index = 0; index < 100; index++) {
30+
const now = Date.now()
31+
32+
// Note we're re-implementing the ping protocol here because as of this
33+
// writing, go-libp2p will reset the stream instead of close it. The next
34+
// version of go-libp2p v0.24.0 will have this fix. When that's released
35+
// we can use the builtin ping system
36+
const stream = await node.dialProtocol(ma, '/ipfs/ping/1.0.0')
37+
38+
const data = new Uint8Array(32)
39+
globalThis.crypto.getRandomValues(data)
40+
41+
const pong = new Promise<void>((resolve, reject) => {
42+
(async () => {
43+
for await (const chunk of stream.source) {
44+
const v = chunk.subarray()
45+
const byteMatches: boolean = v.every((byte: number, i: number) => byte === data[i])
46+
if (byteMatches) {
47+
resolve()
48+
} else {
49+
reject(new Error('Wrong pong'))
50+
}
51+
}
52+
})().catch(reject)
53+
})
54+
55+
let res = -1
56+
await stream.sink((async function * () {
57+
yield data
58+
// Wait for the pong before we close the write side
59+
await pong
60+
res = Date.now() - now
61+
})())
62+
63+
await stream.close()
64+
65+
expect(res).to.be.greaterThan(-1)
66+
}
3067

3168
await node.stop()
3269
const conns = node.connectionManager.getConnections()
@@ -48,6 +85,47 @@ describe('libp2p-webtransport', () => {
4885

4986
const err = await expect(node.dial(ma)).to.eventually.be.rejected()
5087
expect(err.errors[0].toString()).to.contain('WebTransportError: Opening handshake failed.')
88+
89+
await node.stop()
90+
})
91+
92+
it('Closes writes of streams after they have sunk a source', async () => {
93+
// This is the behavor of stream muxers: (see mplex, yamux and compliance tests: https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-stream-muxer-compliance-tests/src/close-test.ts)
94+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
95+
const maStr: string = process.env.serverAddr!
96+
const ma = multiaddr(maStr)
97+
const node = await createLibp2p({
98+
transports: [webTransport()],
99+
connectionEncryption: [() => new Noise()]
100+
})
101+
102+
async function * gen () {
103+
yield new Uint8Array([0])
104+
yield new Uint8Array([1, 2, 3, 4])
105+
yield new Uint8Array([5, 6, 7])
106+
yield new Uint8Array([8, 9, 10, 11])
107+
yield new Uint8Array([12, 13, 14, 15])
108+
}
109+
110+
await node.start()
111+
const stream = await node.dialProtocol(ma, 'echo')
112+
113+
await stream.sink(gen())
114+
115+
let expectedNextNumber = 0
116+
for await (const chunk of stream.source) {
117+
for (const byte of chunk.subarray()) {
118+
expect(byte).to.equal(expectedNextNumber++)
119+
}
120+
}
121+
expect(expectedNextNumber).to.equal(16)
122+
123+
// Close read, we've should have closed the write side during sink
124+
stream.closeRead()
125+
126+
expect(stream.stat.timeline.close).to.be.greaterThan(0)
127+
128+
await node.stop()
51129
})
52130
})
53131

0 commit comments

Comments
 (0)