Skip to content

Results.Close should return error #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions keytransform/keytransform.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error)
}
return r, true
},
Close: func() {
cqr.Close()
Close: func() error {
return cqr.Close()
},
})
return dsq.NaiveQueryApply(nq, qr), nil
Expand Down
3 changes: 2 additions & 1 deletion mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,12 @@ func (h *querySet) Pop() interface{} {
return last
}

func (h *querySet) close() {
func (h *querySet) close() error {
for _, qr := range h.heads {
qr.results.Close()
}
h.heads = nil
return nil
}

func (h *querySet) addResults(mount ds.Key, results query.Results) {
Expand Down
14 changes: 8 additions & 6 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type Results interface {
Next() <-chan Result // returns a channel to wait for the next result
NextSync() (Result, bool) // blocks and waits to return the next result, second parameter returns false when results are exhausted
Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once.
Close() // client may call Close to signal early exit
Close() error // client may call Close to signal early exit
Done() <-chan struct{} // signals that Results is closed
}

Expand Down Expand Up @@ -187,9 +187,10 @@ func (r *results) Rest() ([]Entry, error) {
return es, nil
}

func (r *results) Close() {
func (r *results) Close() error {
r.cancel()
<-r.closed
return nil
}

func (r *results) Query() Query {
Expand Down Expand Up @@ -273,17 +274,17 @@ func ResultsFromIterator(q Query, iter Iterator) Results {
}
}

func noopClose() {}
func noopClose() error { return nil }

type Iterator struct {
Next func() (Result, bool)
Close func() // note: might be called more than once
Close func() error // note: might be called more than once
}

type resultsIter struct {
query Query
next func() (Result, bool)
close func()
close func() error
results *results
}

Expand Down Expand Up @@ -318,14 +319,15 @@ func (r *resultsIter) Rest() ([]Entry, error) {
return es, nil
}

func (r *resultsIter) Close() {
func (r *resultsIter) Close() error {
if r.results != nil {
// Close results collector. It will call r.close().
r.results.Close()
} else {
// Call r.close() since there is no collector to call it when closed.
r.close()
}
return nil
}

func (r *resultsIter) Query() Query {
Expand Down
14 changes: 7 additions & 7 deletions query/query_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func NaiveFilter(qr Results, filter Filter) Results {
}
}
},
Close: func() {
qr.Close()
Close: func() error {
return qr.Close()
},
})
}
Expand All @@ -43,12 +43,12 @@ func NaiveLimit(qr Results, limit int) Results {
limit--
return qr.NextSync()
},
Close: func() {
Close: func() error {
if closed {
return
return nil
}
closed = true
qr.Close()
return qr.Close()
},
})
}
Expand All @@ -65,8 +65,8 @@ func NaiveOffset(qr Results, offset int) Results {
}
return qr.NextSync()
},
Close: func() {
qr.Close()
Close: func() error {
return qr.Close()
},
})
}
Expand Down
5 changes: 3 additions & 2 deletions query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestResultsFromIteratorNoClose(t *testing.T) {
testResultsFromIterator(t, getKeysViaChan, nil)
}

func testResultsFromIterator(t *testing.T, getKeys func(rs Results) []string, close func()) {
func testResultsFromIterator(t *testing.T, getKeys func(rs Results) []string, close func() error) {
i := 0
results := ResultsFromIterator(Query{}, Iterator{
Next: func() (Result, bool) {
Expand All @@ -221,8 +221,9 @@ func testResultsFromIterator(t *testing.T, getKeys func(rs Results) []string, cl

func testResultsFromIteratorWClose(t *testing.T, getKeys func(rs Results) []string) {
closeCalled := 0
testResultsFromIterator(t, getKeys, func() {
testResultsFromIterator(t, getKeys, func() error {
closeCalled++
return nil
})
if closeCalled != 1 {
t.Errorf("close called %d times, expect it to be called just once", closeCalled)
Expand Down