Skip to content

Add additional test and clarification for channels doc #130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions coherence/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,19 @@ EntrySet, KeySet, Values, InvokeAll and InvokeAllFilter.
// we can also do more complex filtering such as looking for people > 30 and where there name begins with 'T'
ch := namedMap.EntrySetFilter(ctx, filters.Greater(age, 20).And(filters.Like(name, "T%", true)))

NOTE: With any of the functions returning a channel, if you break out of the range loop before potentially reading all
entries from the channel, you must drain the channel in a go routine. This ensures that the underlying unbuffered
channel does not block waiting to write. A simplified drain function could be the following:

func drain[T any](ch <-chan T) {
go func() {
for range ch {
}
}()
}

The above is a sample only and should implement your own with a timeout or using context with cancel to suit your needs.

If you want to sort the results from the EntrySetFilter command you can use the following function
[EntrySetFilterWithComparator]. Due generics limitations in Go, this is not a function call off the [NamedMap]
or [NamedCache] interface, but a function call that takes a [NamedMap] or [NamedCache].
Expand Down
87 changes: 87 additions & 0 deletions test/e2e/standalone/named_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func TestBasicOperationsAgainstMapAndCache(t *testing.T) {
{"NamedCacheRunTestEntrySetFilter", utils.GetNamedCache[int, utils.Person](g, session, "entryset-filter-cache"), RunTestEntrySetFilter},
{"NamedMapRunTestEntrySetFilterWithComparator", utils.GetNamedMap[int, utils.Person](g, session, "entryset-filter-map-comparator"), RunTestEntrySetFilterWithComparator},
{"NamedCacheRunTestEntrySetFilterWithComparator", utils.GetNamedCache[int, utils.Person](g, session, "entryset-filter-cache-comparator"), RunTestEntrySetFilterWithComparator},
{"NamedCacheRunTestEntrySetFilterWithComparatorStream", utils.GetNamedCache[int, utils.Person](g, session, "entryset-filter-cache-comparator-stream"), RunTestEntrySetFilterWithComparatorStream},
{"NamedMapRunTestKeySetFilter", utils.GetNamedMap[int, utils.Person](g, session, "keyset-map"), RunTestKeySetFilter},
{"NamedCacheRunTestKeySetFilter", utils.GetNamedCache[int, utils.Person](g, session, "keyset-cache"), RunTestKeySetFilter},
{"NamedMapRunTestGetAll", utils.GetNamedMap[int, utils.Person](g, session, "getall-filter-map"), RunTestGetAll},
Expand Down Expand Up @@ -901,6 +902,92 @@ func RunTestEntrySetFilterWithComparator(t *testing.T, namedMap coherence.NamedM
g.Expect(len(results)).To(gomega.Equal(len(peopleData)))
}

func RunTestEntrySetFilterWithComparatorStream(t *testing.T, namedMap coherence.NamedMap[int, utils.Person]) {
var (
g = gomega.NewWithT(t)
comparatorAscending = extractors.ExtractorComparator(extractors.Extract[int]("age"), true)
count = 0
maxPeople = 10_000
)

if namedMap.GetSession().GetProtocolVersion() == 0 {
// skip as not supported in V0
return
}

utils.ClearNamedMap(g, namedMap)

// populate the cache
for i := 1; i <= maxPeople; i++ {
_, err := namedMap.Put(ctx, i, utils.Person{
ID: i,
Name: fmt.Sprintf("name-%d", i),
Age: 10 + (i % 100),
})
g.Expect(err).NotTo(gomega.HaveOccurred())

}
utils.AssertSize(g, namedMap, maxPeople)

// only retrieve 300 entries and then skip out
ch := coherence.EntrySetFilterWithComparator(ctx, namedMap, filters.Always(), comparatorAscending)
for se := range ch {
g.Expect(se.Err).ShouldNot(gomega.HaveOccurred())
g.Expect(se.Value).ShouldNot(gomega.BeNil())
count++
if count == 100 {
// exit before all entries read
break
}
}

// because we exited potentially before all entries were read, drain the channel
drainWithIdleTimeout(ch, 2*time.Second)

// run a go routine to access the same cache again
var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
count = 0
// tun the same query again
ch2 := coherence.EntrySetFilterWithComparator(ctx, namedMap, filters.Always(), comparatorAscending)
for se := range ch2 {
g.Expect(se.Err).ShouldNot(gomega.HaveOccurred())
count++
}

g.Expect(count).To(gomega.Equal(maxPeople))
}()

wg.Wait()
}

func drainWithIdleTimeout[T any](ch <-chan T, timeout time.Duration) {
go func() {
timer := time.NewTimer(timeout)
defer timer.Stop()

for {
select {
case val, ok := <-ch:
if !ok {
return
}
if !timer.Stop() {
<-timer.C
}
timer.Reset(timeout)
_ = val // discard

case <-timer.C:
return
}
}
}()
}

func RunTestKeySetFilter(t *testing.T, namedMap coherence.NamedMap[int, utils.Person]) {
var (
g = gomega.NewWithT(t)
Expand Down
Loading