Skip to content

Commit 0d5ba9e

Browse files
authored
test: check total blocks sent when theres a restart (#140)
1 parent 745720a commit 0d5ba9e

11 files changed

+352
-79
lines changed

channels/channels.go

+81-8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/ipfs/go-cid"
99
"github.com/ipfs/go-datastore"
10+
"github.com/ipfs/go-datastore/namespace"
1011
"github.com/ipld/go-ipld-prime"
1112
peer "github.com/libp2p/go-libp2p-core/peer"
1213
cbg "github.com/whyrusleeping/cbor-gen"
@@ -20,6 +21,7 @@ import (
2021
"github.com/filecoin-project/go-data-transfer/channels/internal"
2122
"github.com/filecoin-project/go-data-transfer/channels/internal/migrations"
2223
"github.com/filecoin-project/go-data-transfer/cidlists"
24+
"github.com/filecoin-project/go-data-transfer/cidsets"
2325
"github.com/filecoin-project/go-data-transfer/encoding"
2426
)
2527

@@ -53,6 +55,7 @@ type Channels struct {
5355
stateMachines fsm.Group
5456
migrateStateMachines func(context.Context) error
5557
cidLists cidlists.CIDLists
58+
seenCIDs *cidsets.CIDSetManager
5659
}
5760

5861
// ChannelEnvironment -- just a proxy for DTNetwork for now
@@ -71,8 +74,11 @@ func New(ds datastore.Batching,
7174
voucherResultDecoder DecoderByTypeFunc,
7275
env ChannelEnvironment,
7376
selfPeer peer.ID) (*Channels, error) {
77+
78+
seenCIDsDS := namespace.Wrap(ds, datastore.NewKey("seencids"))
7479
c := &Channels{
7580
cidLists: cidLists,
81+
seenCIDs: cidsets.NewCIDSetManager(seenCIDsDS),
7682
notifier: notifier,
7783
voucherDecoder: voucherDecoder,
7884
voucherResultDecoder: voucherResultDecoder,
@@ -117,6 +123,19 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
117123
}
118124

119125
c.notifier(evt, fromInternalChannelState(realChannel, c.voucherDecoder, c.voucherResultDecoder, c.cidLists.ReadList))
126+
127+
// When the channel has been cleaned up, remove the caches of seen cids
128+
if evt.Code == datatransfer.CleanupComplete {
129+
chid := datatransfer.ChannelID{
130+
Initiator: realChannel.Initiator,
131+
Responder: realChannel.Responder,
132+
ID: realChannel.TransferID,
133+
}
134+
err := c.removeSeenCIDCaches(chid)
135+
if err != nil {
136+
log.Errorf("failed to clean up channel %s: %s", err)
137+
}
138+
}
120139
}
121140

122141
// CreateNew creates a new channel id and channel state and saves to channels.
@@ -206,20 +225,21 @@ func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error {
206225
return c.send(chid, datatransfer.CompleteCleanupOnRestart)
207226
}
208227

209-
func (c *Channels) DataSent(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
210-
return c.send(chid, datatransfer.DataSent, delta)
228+
func (c *Channels) DataSent(chid datatransfer.ChannelID, k cid.Cid, delta uint64) error {
229+
return c.fireProgressEvent(chid, datatransfer.DataSent, datatransfer.DataSentProgress, k, delta)
211230
}
212231

213-
func (c *Channels) DataQueued(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
214-
return c.send(chid, datatransfer.DataQueued, delta)
232+
func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint64) error {
233+
return c.fireProgressEvent(chid, datatransfer.DataQueued, datatransfer.DataQueuedProgress, k, delta)
215234
}
216235

217-
func (c *Channels) DataReceived(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
218-
err := c.cidLists.AppendList(chid, cid)
236+
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64) error {
237+
err := c.cidLists.AppendList(chid, k)
219238
if err != nil {
220239
return err
221240
}
222-
return c.send(chid, datatransfer.DataReceived, delta)
241+
242+
return c.fireProgressEvent(chid, datatransfer.DataReceived, datatransfer.DataReceivedProgress, k, delta)
223243
}
224244

225245
// PauseInitiator pauses the initator of this channel
@@ -304,7 +324,60 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
304324
return c.stateMachines.Has(chid)
305325
}
306326

327+
// removeSeenCIDCaches cleans up the caches of "seen" blocks, ie
328+
// blocks that have already been queued / sent / received
329+
func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
330+
progressStates := []datatransfer.EventCode{
331+
datatransfer.DataQueuedProgress,
332+
datatransfer.DataSentProgress,
333+
datatransfer.DataReceivedProgress,
334+
}
335+
for _, evt := range progressStates {
336+
sid := cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
337+
err := c.seenCIDs.DeleteSet(sid)
338+
if err != nil {
339+
return err
340+
}
341+
}
342+
return nil
343+
}
344+
345+
// onProgress fires an event indicating progress has been made in
346+
// queuing / sending / receiving blocks.
347+
// These events are fired only for new blocks (not for example if
348+
// a block is resent)
349+
func (c *Channels) fireProgressEvent(chid datatransfer.ChannelID, evt datatransfer.EventCode, progressEvt datatransfer.EventCode, k cid.Cid, delta uint64) error {
350+
if err := c.checkChannelExists(chid, evt); err != nil {
351+
return err
352+
}
353+
354+
// Check if the block has already been seen
355+
sid := cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
356+
seen, err := c.seenCIDs.InsertSetCID(sid, k)
357+
if err != nil {
358+
return err
359+
}
360+
361+
// If the block has not been seen before, fire the progress event
362+
if !seen {
363+
if err := c.stateMachines.Send(chid, progressEvt, delta); err != nil {
364+
return err
365+
}
366+
}
367+
368+
// Fire the regular event
369+
return c.stateMachines.Send(chid, evt)
370+
}
371+
307372
func (c *Channels) send(chid datatransfer.ChannelID, code datatransfer.EventCode, args ...interface{}) error {
373+
err := c.checkChannelExists(chid, code)
374+
if err != nil {
375+
return err
376+
}
377+
return c.stateMachines.Send(chid, code, args...)
378+
}
379+
380+
func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatransfer.EventCode) error {
308381
has, err := c.stateMachines.Has(chid)
309382
if err != nil {
310383
return err
@@ -313,5 +386,5 @@ func (c *Channels) send(chid datatransfer.ChannelID, code datatransfer.EventCode
313386
return xerrors.Errorf("cannot send FSM event %s to data-transfer channel %s: %w",
314387
datatransfer.Events[code], chid, NewErrNotFound(chid))
315388
}
316-
return c.stateMachines.Send(chid, code, args...)
389+
return nil
317390
}

channels/channels_fsm.go

+28-33
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@ import (
1212

1313
var log = logging.Logger("data-transfer")
1414

15+
var transferringStates = []fsm.StateKey{
16+
datatransfer.Requested,
17+
datatransfer.Ongoing,
18+
datatransfer.InitiatorPaused,
19+
datatransfer.ResponderPaused,
20+
datatransfer.BothPaused,
21+
datatransfer.ResponderCompleted,
22+
datatransfer.ResponderFinalizing,
23+
}
24+
1525
// ChannelEvents describe the events taht can
1626
var ChannelEvents = fsm.Events{
1727
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
@@ -23,40 +33,25 @@ var ChannelEvents = fsm.Events{
2333

2434
fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),
2535

26-
fsm.Event(datatransfer.DataReceived).FromMany(
27-
datatransfer.Requested,
28-
datatransfer.Ongoing,
29-
datatransfer.InitiatorPaused,
30-
datatransfer.ResponderPaused,
31-
datatransfer.BothPaused,
32-
datatransfer.ResponderCompleted,
33-
datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
34-
chst.Received += delta
35-
return nil
36-
}),
36+
fsm.Event(datatransfer.DataReceived).FromMany(transferringStates...).ToNoChange(),
37+
fsm.Event(datatransfer.DataReceivedProgress).FromMany(transferringStates...).ToNoChange().
38+
Action(func(chst *internal.ChannelState, delta uint64) error {
39+
chst.Received += delta
40+
return nil
41+
}),
3742

38-
fsm.Event(datatransfer.DataSent).FromMany(
39-
datatransfer.Requested,
40-
datatransfer.Ongoing,
41-
datatransfer.InitiatorPaused,
42-
datatransfer.ResponderPaused,
43-
datatransfer.BothPaused,
44-
datatransfer.ResponderCompleted,
45-
datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
46-
chst.Sent += delta
47-
return nil
48-
}),
49-
fsm.Event(datatransfer.DataQueued).FromMany(
50-
datatransfer.Requested,
51-
datatransfer.Ongoing,
52-
datatransfer.InitiatorPaused,
53-
datatransfer.ResponderPaused,
54-
datatransfer.BothPaused,
55-
datatransfer.ResponderCompleted,
56-
datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
57-
chst.Queued += delta
58-
return nil
59-
}),
43+
fsm.Event(datatransfer.DataSent).FromMany(transferringStates...).ToNoChange(),
44+
fsm.Event(datatransfer.DataSentProgress).FromMany(transferringStates...).ToNoChange().
45+
Action(func(chst *internal.ChannelState, delta uint64) error {
46+
chst.Sent += delta
47+
return nil
48+
}),
49+
fsm.Event(datatransfer.DataQueued).FromMany(transferringStates...).ToNoChange(),
50+
fsm.Event(datatransfer.DataQueuedProgress).FromMany(transferringStates...).ToNoChange().
51+
Action(func(chst *internal.ChannelState, delta uint64) error {
52+
chst.Queued += delta
53+
return nil
54+
}),
6055
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
6156
chst.Message = datatransfer.ErrDisconnected.Error()
6257
return nil

channels/channels_test.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,15 @@ func TestChannels(t *testing.T) {
146146

147147
err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
148148
require.NoError(t, err)
149+
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
149150
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
150151
require.Equal(t, uint64(50), state.Received())
151152
require.Equal(t, uint64(0), state.Sent())
152153
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())
153154

154155
err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 100)
155156
require.NoError(t, err)
157+
_ = checkEvent(ctx, t, received, datatransfer.DataSentProgress)
156158
state = checkEvent(ctx, t, received, datatransfer.DataSent)
157159
require.Equal(t, uint64(50), state.Received())
158160
require.Equal(t, uint64(100), state.Sent())
@@ -167,6 +169,7 @@ func TestChannels(t *testing.T) {
167169

168170
err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 50)
169171
require.NoError(t, err)
172+
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
170173
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
171174
require.Equal(t, uint64(100), state.Received())
172175
require.Equal(t, uint64(100), state.Sent())
@@ -176,14 +179,14 @@ func TestChannels(t *testing.T) {
176179
require.NoError(t, err)
177180
state = checkEvent(ctx, t, received, datatransfer.DataSent)
178181
require.Equal(t, uint64(100), state.Received())
179-
require.Equal(t, uint64(125), state.Sent())
182+
require.Equal(t, uint64(100), state.Sent())
180183
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())
181184

182185
err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
183186
require.NoError(t, err)
184187
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
185-
require.Equal(t, uint64(150), state.Received())
186-
require.Equal(t, uint64(125), state.Sent())
188+
require.Equal(t, uint64(100), state.Received())
189+
require.Equal(t, uint64(100), state.Sent())
187190
require.Equal(t, []cid.Cid{cids[0], cids[1], cids[0]}, state.ReceivedCids())
188191
})
189192

cidsets/cidsets.go

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package cidsets
2+
3+
import (
4+
"sync"
5+
6+
"github.com/ipfs/go-cid"
7+
"github.com/ipfs/go-datastore"
8+
"github.com/ipfs/go-datastore/namespace"
9+
"github.com/ipfs/go-datastore/query"
10+
)
11+
12+
// SetID is a unique ID for a CID set
13+
type SetID string
14+
15+
// CIDSetManager keeps track of several CID sets, by SetID
16+
type CIDSetManager struct {
17+
ds datastore.Datastore
18+
lk sync.Mutex
19+
sets map[SetID]*cidSet
20+
}
21+
22+
func NewCIDSetManager(ds datastore.Datastore) *CIDSetManager {
23+
return &CIDSetManager{ds: ds, sets: make(map[SetID]*cidSet)}
24+
}
25+
26+
// InsertSetCID inserts a CID into a CID set.
27+
// Returns true if the set already contained the CID.
28+
func (mgr *CIDSetManager) InsertSetCID(sid SetID, c cid.Cid) (exists bool, err error) {
29+
return mgr.getSet(sid).Insert(c)
30+
}
31+
32+
// DeleteSet deletes a CID set
33+
func (mgr *CIDSetManager) DeleteSet(sid SetID) error {
34+
return mgr.getSet(sid).Truncate()
35+
}
36+
37+
// getSet gets the cidSet for the given SetID
38+
func (mgr *CIDSetManager) getSet(sid SetID) *cidSet {
39+
mgr.lk.Lock()
40+
defer mgr.lk.Unlock()
41+
42+
s, ok := mgr.sets[sid]
43+
if !ok {
44+
s = NewCIDSet(mgr.getSetDS(sid))
45+
mgr.sets[sid] = s
46+
}
47+
return s
48+
}
49+
50+
// getSetDS gets the wrapped datastore for the given SetID
51+
func (mgr *CIDSetManager) getSetDS(sid SetID) datastore.Batching {
52+
setDSKey := datastore.NewKey(string(sid) + "/cids")
53+
return namespace.Wrap(mgr.ds, setDSKey)
54+
}
55+
56+
// cidSet persists a set of CIDs
57+
type cidSet struct {
58+
lk sync.Mutex
59+
ds datastore.Batching
60+
}
61+
62+
func NewCIDSet(ds datastore.Batching) *cidSet {
63+
return &cidSet{ds: ds}
64+
}
65+
66+
// Insert a CID into the set.
67+
// Returns true if the the CID was already in the set.
68+
func (s *cidSet) Insert(c cid.Cid) (exists bool, err error) {
69+
s.lk.Lock()
70+
defer s.lk.Unlock()
71+
72+
k := datastore.NewKey(c.String())
73+
has, err := s.ds.Has(k)
74+
if err != nil {
75+
return false, err
76+
}
77+
if has {
78+
return true, nil
79+
}
80+
return false, s.ds.Put(k, nil)
81+
}
82+
83+
// Truncate removes all CIDs in the set
84+
func (s *cidSet) Truncate() error {
85+
s.lk.Lock()
86+
defer s.lk.Unlock()
87+
88+
res, err := s.ds.Query(query.Query{KeysOnly: true})
89+
if err != nil {
90+
return err
91+
}
92+
93+
entries, err := res.Rest()
94+
if err != nil {
95+
return err
96+
}
97+
98+
batched, err := s.ds.Batch()
99+
if err != nil {
100+
return err
101+
}
102+
103+
for _, entry := range entries {
104+
err := batched.Delete(datastore.NewKey(entry.Key))
105+
if err != nil {
106+
return err
107+
}
108+
}
109+
110+
return batched.Commit()
111+
}

0 commit comments

Comments
 (0)