Skip to content

Commit 75d8843

Browse files
Correct data transfer Sent stats (#106)
* correct data transfer stats * feat(channels): add queued event Co-authored-by: hannahhoward <[email protected]>
1 parent 1bde983 commit 75d8843

14 files changed

+132
-40
lines changed

channels/channel_state.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type channelState struct {
3434
status datatransfer.Status
3535
// isPull indicates if this is a push or pull request
3636
isPull bool
37+
// total bytes read from this node and queued for sending (0 if receiver)
38+
queued uint64
3739
// total bytes sent from this node (0 if receiver)
3840
sent uint64
3941
// total bytes received by this node (0 if sender)
@@ -56,6 +58,9 @@ var EmptyChannelState = channelState{}
5658
// Status is the current status of this channel
5759
func (c channelState) Status() datatransfer.Status { return c.status }
5860

61+
// Received returns the number of bytes received
62+
func (c channelState) Queued() uint64 { return c.queued }
63+
5964
// Sent returns the number of bytes sent
6065
func (c channelState) Sent() uint64 { return c.sent }
6166

@@ -171,6 +176,7 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
171176
recipient: c.Recipient,
172177
totalSize: c.TotalSize,
173178
status: c.Status,
179+
queued: c.Queued,
174180
sent: c.Sent,
175181
received: c.Received,
176182
message: c.Message,

channels/channels.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ func (c *Channels) DataSent(chid datatransfer.ChannelID, cid cid.Cid, delta uint
190190
return c.send(chid, datatransfer.DataSent, delta, cid)
191191
}
192192

193+
func (c *Channels) DataQueued(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
194+
return c.send(chid, datatransfer.DataQueued, delta, cid)
195+
}
196+
193197
func (c *Channels) DataReceived(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
194198
return c.send(chid, datatransfer.DataReceived, delta, cid)
195199
}

channels/channels_fsm.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,17 @@ var ChannelEvents = fsm.Events{
4848
chst.Sent += delta
4949
return nil
5050
}),
51-
51+
fsm.Event(datatransfer.DataQueued).FromMany(
52+
datatransfer.Requested,
53+
datatransfer.Ongoing,
54+
datatransfer.InitiatorPaused,
55+
datatransfer.ResponderPaused,
56+
datatransfer.BothPaused,
57+
datatransfer.ResponderCompleted,
58+
datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64, c cid.Cid) error {
59+
chst.Queued += delta
60+
return nil
61+
}),
5262
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
5363
chst.Message = datatransfer.ErrDisconnected.Error()
5464
return nil

channels/internal/internalchannel.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ type ChannelState struct {
4848
TotalSize uint64
4949
// current status of this deal
5050
Status datatransfer.Status
51+
// total bytes read from this node and queued for sending (0 if receiver)
52+
Queued uint64
5153
// total bytes sent from this node (0 if receiver)
5254
Sent uint64
5355
// total bytes received by this node (0 if sender)

channels/internal/internalchannel_cbor_gen.go

Lines changed: 32 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

events.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ const (
7070
// CompleteCleanupOnRestart is emitted when a data transfer channel is restarted to signal
7171
// that channels that were cleaning up should finish cleanup
7272
CompleteCleanupOnRestart
73+
74+
// DataQueued is emmited is read and queued for sending to the remote peer
75+
DataQueued
7376
)
7477

7578
// Events are human readable names for data transfer events
@@ -93,6 +96,7 @@ var Events = map[EventCode]string{
9396
BeginFinalizing: "BeginFinalizing",
9497
Complete: "Complete",
9598
CompleteCleanupOnRestart: "CompleteCleanupOnRestart",
99+
DataQueued: "DataQueued",
96100
}
97101

98102
// Event is a struct containing information about a data transfer event

impl/events.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,8 @@ func (m *manager) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, si
5858
return nil
5959
}
6060

61-
func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64) (datatransfer.Message, error) {
62-
err := m.channels.DataSent(chid, link.(cidlink.Link).Cid, size)
63-
if err != nil {
61+
func (m *manager) OnDataQueued(chid datatransfer.ChannelID, link ipld.Link, size uint64) (datatransfer.Message, error) {
62+
if err := m.channels.DataQueued(chid, link.(cidlink.Link).Cid, size); err != nil {
6463
return nil, err
6564
}
6665
if chid.Initiator != m.peerID {
@@ -79,9 +78,14 @@ func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size u
7978
return m.processRevalidationResult(chid, result, err)
8079
}
8180
}
81+
8282
return nil, nil
8383
}
8484

85+
func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size uint64) error {
86+
return m.channels.DataSent(chid, link.(cidlink.Link).Cid, size)
87+
}
88+
8589
func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatransfer.Request) (datatransfer.Response, error) {
8690
if request.IsRestart() {
8791
return m.receiveRestartRequest(chid, request)

impl/integration_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ func TestRoundTrip(t *testing.T) {
117117
sent := make(chan uint64, 21)
118118
received := make(chan uint64, 21)
119119
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
120-
if event.Code == datatransfer.DataSent {
121-
if channelState.Sent() > 0 {
122-
sent <- channelState.Sent()
120+
if event.Code == datatransfer.DataQueued {
121+
if channelState.Queued() > 0 {
122+
sent <- channelState.Queued()
123123
}
124124
}
125125

@@ -776,7 +776,7 @@ func TestPauseAndResume(t *testing.T) {
776776
}
777777
for testCase, isPull := range testCases {
778778
t.Run(testCase, func(t *testing.T) {
779-
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
779+
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
780780
defer cancel()
781781

782782
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
@@ -805,9 +805,9 @@ func TestPauseAndResume(t *testing.T) {
805805
resumeResponder := make(chan struct{}, 2)
806806
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
807807

808-
if event.Code == datatransfer.DataSent {
809-
if channelState.Sent() > 0 {
810-
sent <- channelState.Sent()
808+
if event.Code == datatransfer.DataQueued {
809+
if channelState.Queued() > 0 {
810+
sent <- channelState.Queued()
811811
}
812812
}
813813

impl/responding_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ func TestDataTransferResponding(t *testing.T) {
395395
datatransfer.Open,
396396
datatransfer.NewVoucherResult,
397397
datatransfer.Accept,
398-
datatransfer.DataSent,
398+
datatransfer.DataQueued,
399399
datatransfer.NewVoucherResult,
400400
datatransfer.PauseResponder,
401401
datatransfer.NewVoucher,
@@ -415,7 +415,7 @@ func TestDataTransferResponding(t *testing.T) {
415415
verify: func(t *testing.T, h *receiverHarness) {
416416
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
417417
require.NoError(t, err)
418-
msg, err := h.transport.EventHandler.OnDataSent(
418+
msg, err := h.transport.EventHandler.OnDataQueued(
419419
channelID(h.id, h.peers),
420420
cidlink.Link{Cid: testutil.GenerateCids(1)[0]},
421421
12345)

impl/restart_integration_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ func TestRestartPush(t *testing.T) {
101101
disConnChan := make(chan struct{})
102102

103103
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
104-
if event.Code == datatransfer.DataSent {
105-
if channelState.Sent() > 0 {
106-
sent <- channelState.Sent()
104+
if event.Code == datatransfer.DataQueued {
105+
if channelState.Queued() > 0 {
106+
sent <- channelState.Queued()
107107
}
108108
}
109109

@@ -289,9 +289,9 @@ func TestRestartPull(t *testing.T) {
289289
disConnChan := make(chan struct{})
290290

291291
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
292-
if event.Code == datatransfer.DataSent {
293-
if channelState.Sent() > 0 {
294-
sent <- channelState.Sent()
292+
if event.Code == datatransfer.DataQueued {
293+
if channelState.Queued() > 0 {
294+
sent <- channelState.Queued()
295295
}
296296
}
297297

transport.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,19 @@ type EventsHandler interface {
2626
// - error = cancel this request
2727
// - err == ErrPause - pause this request
2828
OnDataReceived(chid ChannelID, link ipld.Link, size uint64) error
29-
// OnDataSent is called when we send data for the given channel ID
29+
30+
// OnDataQueued is called when data is queued for sending for the given channel ID
3031
// return values are:
3132
// message = data transfer message along with data
3233
// err = error
3334
// - nil = proceed with sending data
3435
// - error = cancel this request
3536
// - err == ErrPause - pause this request
36-
OnDataSent(chid ChannelID, link ipld.Link, size uint64) (Message, error)
37+
OnDataQueued(chid ChannelID, link ipld.Link, size uint64) (Message, error)
38+
39+
// OnDataSent is called when we send data for the given channel ID
40+
OnDataSent(chid ChannelID, link ipld.Link, size uint64) error
41+
3742
// OnRequestReceived is called when we receive a new request to send data
3843
// for the given channel ID
3944
// return values are:

transport/graphsync/graphsync.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,9 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error {
297297
t.gs.RegisterCompletedResponseListener(t.gsCompletedResponseListener)
298298
t.gs.RegisterIncomingBlockHook(t.gsIncomingBlockHook)
299299
t.gs.RegisterOutgoingBlockHook(t.gsOutgoingBlockHook)
300+
301+
t.gs.RegisterBlockSentListener(t.gsBlockSentHook)
302+
300303
t.gs.RegisterOutgoingRequestHook(t.gsOutgoingRequestHook)
301304
t.gs.RegisterIncomingResponseHook(t.gsIncomingResponseHook)
302305
t.gs.RegisterRequestUpdatedHook(t.gsRequestUpdatedHook)
@@ -374,6 +377,19 @@ func (t *Transport) gsIncomingBlockHook(p peer.ID, response graphsync.ResponseDa
374377
}
375378
}
376379

380+
func (t *Transport) gsBlockSentHook(p peer.ID, request graphsync.RequestData, block graphsync.BlockData) {
381+
t.dataLock.RLock()
382+
chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}]
383+
t.dataLock.RUnlock()
384+
if !ok {
385+
return
386+
}
387+
388+
if err := t.events.OnDataSent(chid, block.Link(), block.BlockSize()); err != nil {
389+
log.Errorf("failed to process data sent: %+v", err)
390+
}
391+
}
392+
377393
func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData, block graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
378394
t.dataLock.RLock()
379395
chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}]
@@ -389,7 +405,7 @@ func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData
389405
}
390406
rp.maximumSent = rp.currentSent
391407

392-
msg, err := t.events.OnDataSent(chid, block.Link(), block.BlockSize())
408+
msg, err := t.events.OnDataQueued(chid, block.Link(), block.BlockSize())
393409
if err != nil && err != datatransfer.ErrPause {
394410
hookActions.TerminateWithError(err)
395411
return

0 commit comments

Comments
 (0)