Skip to content

Commit 288413b

Browse files
authored
Automatically restart push channel (#127)
* feat: latest go-graphsync * feat: auto-restart connection for push data channels * refactor: simplify push channel monitor config * fix: more granular interval checking of data rates * refactor: simplify push channel monitor naming
1 parent b945240 commit 288413b

File tree

7 files changed

+922
-27
lines changed

7 files changed

+922
-27
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/ipfs/go-cid v0.0.7
1414
github.com/ipfs/go-datastore v0.4.5
1515
github.com/ipfs/go-ds-badger v0.2.3
16-
github.com/ipfs/go-graphsync v0.5.0
16+
github.com/ipfs/go-graphsync v0.5.2
1717
github.com/ipfs/go-ipfs-blockstore v1.0.1
1818
github.com/ipfs/go-ipfs-blocksutil v0.0.1
1919
github.com/ipfs/go-ipfs-chunker v0.0.5

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
228228
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
229229
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
230230
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
231-
github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM=
232-
github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
231+
github.com/ipfs/go-graphsync v0.5.2 h1:USD+daaSC+7pLHCxROThSaF6SF7WYXF03sjrta0rCfA=
232+
github.com/ipfs/go-graphsync v0.5.2/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
233233
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
234234
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
235235
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=

impl/impl.go

+59-15
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,30 @@ import (
2424
"github.com/filecoin-project/go-data-transfer/encoding"
2525
"github.com/filecoin-project/go-data-transfer/message"
2626
"github.com/filecoin-project/go-data-transfer/network"
27+
"github.com/filecoin-project/go-data-transfer/pushchannelmonitor"
2728
"github.com/filecoin-project/go-data-transfer/registry"
2829
)
2930

3031
var log = logging.Logger("dt-impl")
3132

3233
type manager struct {
33-
dataTransferNetwork network.DataTransferNetwork
34-
validatedTypes *registry.Registry
35-
resultTypes *registry.Registry
36-
revalidators *registry.Registry
37-
transportConfigurers *registry.Registry
38-
pubSub *pubsub.PubSub
39-
readySub *pubsub.PubSub
40-
channels *channels.Channels
41-
peerID peer.ID
42-
transport datatransfer.Transport
43-
storedCounter *storedcounter.StoredCounter
44-
channelRemoveTimeout time.Duration
45-
reconnectsLk sync.RWMutex
46-
reconnects map[datatransfer.ChannelID]chan struct{}
47-
cidLists cidlists.CIDLists
34+
dataTransferNetwork network.DataTransferNetwork
35+
validatedTypes *registry.Registry
36+
resultTypes *registry.Registry
37+
revalidators *registry.Registry
38+
transportConfigurers *registry.Registry
39+
pubSub *pubsub.PubSub
40+
readySub *pubsub.PubSub
41+
channels *channels.Channels
42+
peerID peer.ID
43+
transport datatransfer.Transport
44+
storedCounter *storedcounter.StoredCounter
45+
channelRemoveTimeout time.Duration
46+
reconnectsLk sync.RWMutex
47+
reconnects map[datatransfer.ChannelID]chan struct{}
48+
cidLists cidlists.CIDLists
49+
pushChannelMonitor *pushchannelmonitor.Monitor
50+
pushChannelMonitorCfg *pushchannelmonitor.Config
4851
}
4952

5053
type internalEvent struct {
@@ -88,6 +91,28 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
8891
}
8992
}
9093

94+
// PushChannelRestartConfig sets the configuration options for automatically
95+
// restarting push channels
96+
// - interval is the time over which minBytesSent must have been sent
97+
// - checksPerInterval is the number of times to check per interval
98+
// - minBytesSent is the minimum amount of data that must have been sent over the interval
99+
// - restartBackoff is the time to wait before checking again for restarts
100+
func PushChannelRestartConfig(
101+
interval time.Duration,
102+
checksPerInterval uint32,
103+
minBytesSent uint64,
104+
restartBackoff time.Duration,
105+
) DataTransferOption {
106+
return func(m *manager) {
107+
m.pushChannelMonitorCfg = &pushchannelmonitor.Config{
108+
Interval: interval,
109+
ChecksPerInterval: checksPerInterval,
110+
MinBytesSent: minBytesSent,
111+
RestartBackoff: restartBackoff,
112+
}
113+
}
114+
}
115+
91116
const defaultChannelRemoveTimeout = 1 * time.Hour
92117

93118
// NewDataTransfer initializes a new instance of a data transfer manager
@@ -106,6 +131,7 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
106131
channelRemoveTimeout: defaultChannelRemoveTimeout,
107132
reconnects: make(map[datatransfer.ChannelID]chan struct{}),
108133
}
134+
109135
cidLists, err := cidlists.NewCIDLists(cidListsDir)
110136
if err != nil {
111137
return nil, err
@@ -116,9 +142,17 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
116142
return nil, err
117143
}
118144
m.channels = channels
145+
146+
// Apply config options
119147
for _, option := range options {
120148
option(m)
121149
}
150+
151+
// Start push channel monitor after applying config options as the config
152+
// options may apply to the monitor
153+
m.pushChannelMonitor = pushchannelmonitor.NewMonitor(m, m.pushChannelMonitorCfg)
154+
m.pushChannelMonitor.Start()
155+
122156
return m, nil
123157
}
124158

@@ -161,6 +195,7 @@ func (m *manager) OnReady(ready datatransfer.ReadyFunc) {
161195

162196
// Stop terminates all data transfers and ends processing
163197
func (m *manager) Stop(ctx context.Context) error {
198+
m.pushChannelMonitor.Shutdown()
164199
return m.transport.Shutdown(ctx)
165200
}
166201

@@ -196,11 +231,20 @@ func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, vo
196231
transportConfigurer(chid, voucher, m.transport)
197232
}
198233
m.dataTransferNetwork.Protect(requestTo, chid.String())
234+
monitoredChan := m.pushChannelMonitor.AddChannel(chid)
199235
if err := m.dataTransferNetwork.SendMessage(ctx, requestTo, req); err != nil {
200236
err = fmt.Errorf("Unable to send request: %w", err)
201237
_ = m.channels.Error(chid, err)
238+
239+
// If push channel monitoring is enabled, shutdown the monitor as it
240+
// wasn't possible to start the data transfer
241+
if monitoredChan != nil {
242+
monitoredChan.Shutdown()
243+
}
244+
202245
return chid, err
203246
}
247+
204248
return chid, nil
205249
}
206250

impl/integration_test.go

+101-6
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func TestRoundTrip(t *testing.T) {
209209
for opens < 2 || completes < 2 || len(sentIncrements) < 21 || len(receivedIncrements) < 21 {
210210
select {
211211
case <-ctx.Done():
212-
t.Fatal("Did not complete succcessful data transfer")
212+
t.Fatal("Did not complete successful data transfer")
213213
case <-finished:
214214
completes++
215215
case <-opened:
@@ -343,7 +343,7 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) {
343343
for opens < 2*data.requestCount || completes < 2*data.requestCount {
344344
select {
345345
case <-ctx.Done():
346-
t.Fatal("Did not complete succcessful data transfer")
346+
t.Fatal("Did not complete successful data transfer")
347347
case <-finished:
348348
completes++
349349
case <-opened:
@@ -481,7 +481,7 @@ func TestManyReceiversAtOnce(t *testing.T) {
481481
for opens < 2*data.receiverCount || completes < 2*data.receiverCount {
482482
select {
483483
case <-ctx.Done():
484-
t.Fatal("Did not complete succcessful data transfer")
484+
t.Fatal("Did not complete successful data transfer")
485485
case <-finished:
486486
completes++
487487
case <-opened:
@@ -497,6 +497,101 @@ func TestManyReceiversAtOnce(t *testing.T) {
497497
}
498498
}
499499

500+
// TestPushRequestAutoRestart tests that if the connection for a push request
501+
// goes down, it will automatically restart (given the right config options)
502+
func TestPushRequestAutoRestart(t *testing.T) {
503+
ctx := context.Background()
504+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
505+
defer cancel()
506+
507+
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
508+
netRetry := network.RetryParameters(time.Second, time.Second, 5, 1)
509+
gsData.DtNet1 = network.NewFromLibp2pHost(gsData.Host1, netRetry)
510+
host1 := gsData.Host1 // initiator, data sender
511+
host2 := gsData.Host2 // data recipient
512+
513+
tp1 := gsData.SetupGSTransportHost1()
514+
tp2 := gsData.SetupGSTransportHost2()
515+
516+
restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond)
517+
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1, restartConf)
518+
require.NoError(t, err)
519+
testutil.StartAndWaitForReady(ctx, t, dt1)
520+
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
521+
require.NoError(t, err)
522+
testutil.StartAndWaitForReady(ctx, t, dt2)
523+
524+
received := make(chan struct{})
525+
finished := make(chan struct{}, 2)
526+
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
527+
//t.Logf("%s: %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
528+
529+
if event.Code == datatransfer.DataReceived {
530+
received <- struct{}{}
531+
}
532+
533+
if channelState.Status() == datatransfer.Completed {
534+
finished <- struct{}{}
535+
}
536+
}
537+
dt1.SubscribeToEvents(subscriber)
538+
dt2.SubscribeToEvents(subscriber)
539+
voucher := testutil.FakeDTType{Data: "applesauce"}
540+
sv := testutil.NewStubbedValidator()
541+
542+
sourceDagService := gsData.DagService1
543+
destDagService := gsData.DagService2
544+
545+
root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService, loremFile)
546+
rootCid := root.(cidlink.Link).Cid
547+
548+
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
549+
require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv))
550+
chid, err := dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
551+
require.NoError(t, err)
552+
553+
// Wait for a block to be received
554+
<-received
555+
556+
// Break connection
557+
t.Logf("Breaking connection to peer")
558+
require.NoError(t, gsData.Mn.UnlinkPeers(host1.ID(), host2.ID()))
559+
require.NoError(t, gsData.Mn.DisconnectPeers(host1.ID(), host2.ID()))
560+
561+
t.Logf("Sleep for a second")
562+
time.Sleep(1 * time.Second)
563+
564+
// Restore connection
565+
t.Logf("Restore connection")
566+
require.NoError(t, gsData.Mn.LinkAll())
567+
time.Sleep(200 * time.Millisecond)
568+
conn, err := gsData.Mn.ConnectPeers(host1.ID(), host2.ID())
569+
require.NoError(t, err)
570+
require.NotNil(t, conn)
571+
572+
t.Logf("Waiting for auto-restart on push channel %s", chid)
573+
574+
(func() {
575+
finishedCount := 0
576+
for {
577+
select {
578+
case <-ctx.Done():
579+
t.Fatal("Did not complete successful data transfer")
580+
return
581+
case <-received:
582+
case <-finished:
583+
finishedCount++
584+
if finishedCount == 2 {
585+
return
586+
}
587+
}
588+
}
589+
})()
590+
591+
// Verify that the file was transferred to the destination node
592+
testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes)
593+
}
594+
500595
func TestRoundTripCancelledRequest(t *testing.T) {
501596
ctx := context.Background()
502597
testCases := map[string]struct {
@@ -751,7 +846,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
751846
for providerFinished != nil || clientFinished != nil {
752847
select {
753848
case <-ctx.Done():
754-
t.Fatal("Did not complete succcessful data transfer")
849+
t.Fatal("Did not complete successful data transfer")
755850
case <-providerFinished:
756851
providerFinished = nil
757852
case <-clientFinished:
@@ -868,7 +963,7 @@ func TestPauseAndResume(t *testing.T) {
868963
pauseInitiators < 1 || pauseResponders < 1 || resumeInitiators < 1 || resumeResponders < 1 {
869964
select {
870965
case <-ctx.Done():
871-
t.Fatal("Did not complete succcessful data transfer")
966+
t.Fatal("Did not complete successful data transfer")
872967
case <-finished:
873968
completes++
874969
case <-opened:
@@ -968,7 +1063,7 @@ func TestUnrecognizedVoucherRoundTrip(t *testing.T) {
9681063
for opens < 1 || finishes < 1 {
9691064
select {
9701065
case <-ctx.Done():
971-
t.Fatal("Did not complete succcessful data transfer")
1066+
t.Fatal("Did not complete successful data transfer")
9721067
case <-finished:
9731068
finishes++
9741069
case <-opened:

network/libp2p_impl.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I
113113
}
114114

115115
d := b.Duration()
116-
log.Warnf("failed to open stream to %s on attempt %g of %g, waiting %s to try again, err: %w",
116+
log.Warnf("failed to open stream to %s on attempt %g of %g, waiting %s to try again, err: %s",
117117
id, nAttempts, impl.maxStreamOpenAttempts, d, err)
118118

119119
select {
@@ -183,14 +183,14 @@ func (dtnet *libp2pDataTransferNetwork) handleNewStream(s network.Stream) {
183183
if err != io.EOF {
184184
s.Reset() // nolint: errcheck,gosec
185185
go dtnet.receiver.ReceiveError(err)
186-
log.Debugf("graphsync net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
186+
log.Debugf("net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
187187
}
188188
return
189189
}
190190

191191
p := s.Conn().RemotePeer()
192192
ctx := context.Background()
193-
log.Debugf("graphsync net handleNewStream from %s", s.Conn().RemotePeer())
193+
log.Debugf("net handleNewStream from %s", s.Conn().RemotePeer())
194194

195195
if received.IsRequest() {
196196
receivedRequest, ok := received.(datatransfer.Request)

0 commit comments

Comments
 (0)