@@ -350,12 +350,30 @@ func UseStore(store ipld.LinkSystem) datatransfer.TransportOption {
350
350
}
351
351
}
352
352
353
+ // MaxLinks sets the maximum number of links for this channelID
354
+ func MaxLinks (maxLinks uint64 ) datatransfer.TransportOption {
355
+ return func (channelID datatransfer.ChannelID , transport datatransfer.Transport ) error {
356
+ gsTransport , ok := transport .(* Transport )
357
+ if ! ok {
358
+ return datatransfer .ErrUnsupported
359
+ }
360
+ gsTransport .MaxLinks (channelID , maxLinks )
361
+ return nil
362
+ }
363
+ }
364
+
353
365
// UseStore tells the graphsync transport to use the given loader and storer for this channelID
354
366
func (t * Transport ) UseStore (channelID datatransfer.ChannelID , lsys ipld.LinkSystem ) error {
355
367
ch := t .trackDTChannel (channelID )
356
368
return ch .useStore (lsys )
357
369
}
358
370
371
+ // MaxLinks sets the maximum number of links for this channelID
372
+ func (t * Transport ) MaxLinks (channelID datatransfer.ChannelID , maxLinks uint64 ) {
373
+ ch := t .trackDTChannel (channelID )
374
+ ch .setMaxLinks (maxLinks )
375
+ }
376
+
359
377
// ChannelGraphsyncRequests describes any graphsync request IDs associated with a given channel
360
378
type ChannelGraphsyncRequests struct {
361
379
// Current is the current request ID for the transfer
@@ -905,8 +923,9 @@ type dtChannel struct {
905
923
906
924
opened chan graphsync.RequestID
907
925
908
- storeLk sync.RWMutex
926
+ optionsLk sync.RWMutex
909
927
storeRegistered bool
928
+ maxLinksOption uint64
910
929
}
911
930
912
931
// Info needed to monitor an ongoing graphsync request
@@ -1012,6 +1031,7 @@ func (c *dtChannel) gsReqOpened(requestID graphsync.RequestID, hookActions graph
1012
1031
if c .hasStore () {
1013
1032
hookActions .UsePersistenceOption ("data-transfer-" + c .channelID .String ())
1014
1033
}
1034
+ hookActions .MaxLinks (c .maxLinks ())
1015
1035
log .Infow ("outgoing graphsync request" , "peer" , c .channelID .OtherParty (c .t .peerID ), "graphsync request id" , requestID , "data transfer channel id" , c .channelID )
1016
1036
// Save a mapping from the graphsync key to the channel ID so that
1017
1037
// subsequent graphsync callbacks are associated with this channel
@@ -1043,6 +1063,8 @@ func (c *dtChannel) gsDataRequestRcvd(requestID graphsync.RequestID, hookActions
1043
1063
hookActions .UsePersistenceOption ("data-transfer-" + c .channelID .String ())
1044
1064
}
1045
1065
1066
+ hookActions .MaxLinks (c .maxLinks ())
1067
+
1046
1068
// Save a mapping from the graphsync key to the channel ID so that
1047
1069
// subsequent graphsync callbacks are associated with this channel
1048
1070
c .requestID = & requestID
@@ -1139,17 +1161,31 @@ func (c *dtChannel) onRequesterCancelled() {
1139
1161
}
1140
1162
1141
1163
func (c * dtChannel ) hasStore () bool {
1142
- c .storeLk .RLock ()
1143
- defer c .storeLk .RUnlock ()
1164
+ c .optionsLk .RLock ()
1165
+ defer c .optionsLk .RUnlock ()
1144
1166
1145
1167
return c .storeRegistered
1146
1168
}
1147
1169
1170
+ func (c * dtChannel ) maxLinks () uint64 {
1171
+ c .optionsLk .Lock ()
1172
+ defer c .optionsLk .Unlock ()
1173
+
1174
+ return c .maxLinksOption
1175
+ }
1176
+
1177
+ func (c * dtChannel ) setMaxLinks (maxLinks uint64 ) {
1178
+ c .optionsLk .Lock ()
1179
+ defer c .optionsLk .Unlock ()
1180
+
1181
+ c .maxLinksOption = maxLinks
1182
+ }
1183
+
1148
1184
// Use the given loader and storer to get / put blocks for the data-transfer.
1149
1185
// Note that each data-transfer channel uses a separate blockstore.
1150
1186
func (c * dtChannel ) useStore (lsys ipld.LinkSystem ) error {
1151
- c .storeLk .Lock ()
1152
- defer c .storeLk .Unlock ()
1187
+ c .optionsLk .Lock ()
1188
+ defer c .optionsLk .Unlock ()
1153
1189
1154
1190
// Register the channel's store with graphsync
1155
1191
err := c .t .gs .RegisterPersistenceOption ("data-transfer-" + c .channelID .String (), lsys )
0 commit comments