Skip to content

Commit ae8a908

Browse files
committed
feat!(requestmanager): remove request allocation backpressure
Closes: #241 Ref: 9171ce6
1 parent 1697d47 commit ae8a908

File tree

5 files changed

+12
-74
lines changed

5 files changed

+12
-74
lines changed

impl/graphsync.go

+1-35
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,11 @@ type GraphSync struct {
7070
ctx context.Context
7171
cancel context.CancelFunc
7272
responseAllocator *allocator.Allocator
73-
requestAllocator *allocator.Allocator
7473
}
7574

7675
type graphsyncConfigOptions struct {
7776
totalMaxMemoryResponder uint64
7877
maxMemoryPerPeerResponder uint64
79-
totalMaxMemoryRequestor uint64
80-
maxMemoryPerPeerRequestor uint64
8178
maxInProgressIncomingRequests uint64
8279
maxInProgressIncomingRequestsPerPeer uint64
8380
maxInProgressOutgoingRequests uint64
@@ -116,22 +113,6 @@ func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
116113
}
117114
}
118115

119-
// MaxMemoryRequestor defines the maximum amount of memory the responder
120-
// may consume queueing up messages for a response in total
121-
func MaxMemoryRequestor(totalMaxMemory uint64) Option {
122-
return func(gs *graphsyncConfigOptions) {
123-
gs.totalMaxMemoryRequestor = totalMaxMemory
124-
}
125-
}
126-
127-
// MaxMemoryPerPeerRequestor defines the maximum amount of memory a peer
128-
// may consume queueing up messages for a response
129-
func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option {
130-
return func(gs *graphsyncConfigOptions) {
131-
gs.maxMemoryPerPeerRequestor = maxMemoryPerPeer
132-
}
133-
}
134-
135116
// MaxInProgressIncomingRequests changes the maximum number of
136117
// incoming graphsync requests that are processed in parallel (default 6)
137118
func MaxInProgressIncomingRequests(maxInProgressIncomingRequests uint64) Option {
@@ -214,8 +195,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
214195
gsConfig := &graphsyncConfigOptions{
215196
totalMaxMemoryResponder: defaultTotalMaxMemory,
216197
maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer,
217-
totalMaxMemoryRequestor: defaultTotalMaxMemory,
218-
maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer,
219198
maxInProgressIncomingRequests: defaultMaxInProgressRequests,
220199
maxInProgressOutgoingRequests: defaultMaxInProgressRequests,
221200
registerDefaultValidator: true,
@@ -247,9 +226,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
247226
return messagequeue.New(ctx, p, network, responseAllocator, gsConfig.messageSendRetries, gsConfig.sendMessageTimeout)
248227
}
249228
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
250-
requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor)
251229

252-
asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator)
230+
asyncLoader := asyncloader.New(ctx, linkSystem)
253231
requestQueue := taskqueue.NewTaskQueue(ctx)
254232
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, outgoingRequestProcessingListeners, requestQueue, network.ConnectionManager(), gsConfig.maxLinksPerOutgoingRequest)
255233
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad)
@@ -313,7 +291,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
313291
ctx: ctx,
314292
cancel: cancel,
315293
responseAllocator: responseAllocator,
316-
requestAllocator: requestAllocator,
317294
}
318295

319296
requestManager.SetDelegate(peerManager)
@@ -453,7 +430,6 @@ func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.Requ
453430
// Stats produces insight on the current state of a graphsync exchange
454431
func (gs *GraphSync) Stats() graphsync.Stats {
455432
outgoingRequestStats := gs.requestQueue.Stats()
456-
incomingResponseStats := gs.requestAllocator.Stats()
457433

458434
ptqstats := gs.peerTaskQueue.Stats()
459435
incomingRequestStats := graphsync.RequestStats{
@@ -465,8 +441,6 @@ func (gs *GraphSync) Stats() graphsync.Stats {
465441

466442
return graphsync.Stats{
467443
OutgoingRequests: outgoingRequestStats,
468-
IncomingResponses: incomingResponseStats,
469-
470444
IncomingRequests: incomingRequestStats,
471445
OutgoingResponses: outgoingResponseStats,
472446
}
@@ -485,14 +459,6 @@ func (gsr *graphSyncReceiver) ReceiveMessage(
485459
sender peer.ID,
486460
incoming gsmsg.GraphSyncMessage) {
487461
gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
488-
totalMemoryAllocated := uint64(0)
489-
for _, blk := range incoming.Blocks() {
490-
totalMemoryAllocated += uint64(len(blk.RawData()))
491-
}
492-
select {
493-
case <-gsr.graphSync().requestAllocator.AllocateBlockMemory(sender, totalMemoryAllocated):
494-
case <-gsr.ctx.Done():
495-
}
496462
gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
497463
}
498464

requestmanager/asyncloader/asyncloader.go

+7-24
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,11 @@ type alternateQueue struct {
2727
loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
2828
}
2929

30-
// Allocator indicates a mechanism for tracking memory used by a given peer
31-
type Allocator interface {
32-
ReleaseBlockMemory(p peer.ID, amount uint64) error
33-
}
34-
3530
// AsyncLoader manages loading links asynchronously in as new responses
3631
// come in from the network
3732
type AsyncLoader struct {
38-
ctx context.Context
39-
cancel context.CancelFunc
40-
allocator Allocator
33+
ctx context.Context
34+
cancel context.CancelFunc
4135

4236
// this mutex protects access to the state of the async loader, which covers all data fields below below
4337
stateLk sync.Mutex
@@ -50,8 +44,8 @@ type AsyncLoader struct {
5044

5145
// New initializes a new link loading manager for asynchronous loads from the given context
5246
// and local store loading and storing function
53-
func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) *AsyncLoader {
54-
responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem, allocator)
47+
func New(ctx context.Context, linkSystem ipld.LinkSystem) *AsyncLoader {
48+
responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem)
5549
ctx, cancel := context.WithCancel(ctx)
5650
return &AsyncLoader{
5751
ctx: ctx,
@@ -61,7 +55,6 @@ func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) *
6155
alternateQueues: make(map[string]alternateQueue),
6256
responseCache: responseCache,
6357
loadAttemptQueue: loadAttemptQueue,
64-
allocator: allocator,
6558
}
6659
}
6760

@@ -73,7 +66,7 @@ func (al *AsyncLoader) RegisterPersistenceOption(name string, lsys ipld.LinkSyst
7366
if existing {
7467
return errors.New("already registerd a persistence option with this name")
7568
}
76-
responseCache, loadAttemptQueue := setupAttemptQueue(lsys, al.allocator)
69+
responseCache, loadAttemptQueue := setupAttemptQueue(lsys)
7770
al.alternateQueues[name] = alternateQueue{responseCache, loadAttemptQueue}
7871
return nil
7972
}
@@ -170,13 +163,7 @@ func (al *AsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID)
170163
responseCache = al.alternateQueues[aq].responseCache
171164
delete(al.requestQueues, requestID)
172165
}
173-
toFree := responseCache.FinishRequest(requestID)
174-
if toFree > 0 {
175-
err := al.allocator.ReleaseBlockMemory(p, toFree)
176-
if err != nil {
177-
log.Infow("Error deallocating requestor memory", "p", p, "toFree", toFree, "err", err)
178-
}
179-
}
166+
responseCache.FinishRequest(requestID)
180167
}
181168

182169
func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue {
@@ -193,7 +180,7 @@ func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCac
193180
return al.alternateQueues[queue].responseCache
194181
}
195182

196-
func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {
183+
func setupAttemptQueue(lsys ipld.LinkSystem) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {
197184

198185
unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener)
199186
responseCache := responsecache.New(unverifiedBlockStore)
@@ -204,10 +191,6 @@ func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecach
204191
return types.AsyncLoadResult{Err: err, Local: false}
205192
}
206193
if data != nil {
207-
err = allocator.ReleaseBlockMemory(p, uint64(len(data)))
208-
if err != nil {
209-
log.Warningf("releasing block memory: %s", err.Error())
210-
}
211194
return types.AsyncLoadResult{Data: data, Local: false}
212195
}
213196
// fall back to local store

requestmanager/asyncloader/asyncloader_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/stretchr/testify/require"
1414

1515
"github.com/ipfs/go-graphsync"
16-
"github.com/ipfs/go-graphsync/allocator"
1716
"github.com/ipfs/go-graphsync/metadata"
1817
"github.com/ipfs/go-graphsync/requestmanager/types"
1918
"github.com/ipfs/go-graphsync/testutil"
@@ -385,8 +384,7 @@ func withLoader(st *store, exec func(ctx context.Context, asyncLoader *AsyncLoad
385384
ctx := context.Background()
386385
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
387386
defer cancel()
388-
allocator := allocator.NewAllocator(256*(1<<20), 16*(1<<20))
389-
asyncLoader := New(ctx, st.lsys, allocator)
387+
asyncLoader := New(ctx, st.lsys)
390388
exec(ctx, asyncLoader)
391389
}
392390

requestmanager/asyncloader/responsecache/responsecache.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,14 @@ func New(unverifiedBlockStore UnverifiedBlockStore) *ResponseCache {
4444
// FinishRequest indicate there is no more need to track blocks tied to this
4545
// response. It returns the total number of bytes in blocks that were being
4646
// tracked but are no longer in memory
47-
func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) uint64 {
47+
func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) {
4848
rc.responseCacheLk.Lock()
4949
rc.linkTracker.FinishRequest(requestID)
5050

51-
toFree := uint64(0)
5251
rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link, amt uint64) bool {
53-
shouldPrune := rc.linkTracker.BlockRefCount(link) == 0
54-
if shouldPrune {
55-
toFree += amt
56-
}
57-
return shouldPrune
52+
return rc.linkTracker.BlockRefCount(link) == 0
5853
})
5954
rc.responseCacheLk.Unlock()
60-
return toFree
6155
}
6256

6357
// AttemptLoad attempts to laod the given block from the cache

requestmanager/asyncloader/responsecache/responsecache_test.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,13 @@ func TestResponseCacheManagingLinks(t *testing.T) {
134134
require.NoError(t, err)
135135
require.Nil(t, data, "no data should be returned for unknown block")
136136

137-
toFree := responseCache.FinishRequest(requestID1)
137+
responseCache.FinishRequest(requestID1)
138138
// should remove only block 0, since it now has no refering outstanding requests
139139
require.Len(t, fubs.blocks(), len(blks)-4, "should prune block when it is orphaned")
140140
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[0])
141-
require.Equal(t, toFree, uint64(len(blks[0].RawData())))
142141

143142
responseCache.FinishRequest(requestID2)
144143
// should remove last block since are no remaining references
145144
require.Len(t, fubs.blocks(), 0, "should prune block when it is orphaned")
146145
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[3])
147-
require.Equal(t, toFree, uint64(len(blks[3].RawData())))
148-
149146
}

0 commit comments

Comments
 (0)