diff --git a/coherence/doc.go b/coherence/doc.go index f350889..e7020ce 100644 --- a/coherence/doc.go +++ b/coherence/doc.go @@ -277,6 +277,19 @@ EntrySet, KeySet, Values, InvokeAll and InvokeAllFilter. // we can also do more complex filtering such as looking for people > 30 and where there name begins with 'T' ch := namedMap.EntrySetFilter(ctx, filters.Greater(age, 20).And(filters.Like(name, "T%", true))) +NOTE: With any of the functions returning a channel, if you break out of the range loop before potentially reading all +entries from the channel, you must drain the channel in a go routine. This ensures that the underlying unbuffered +channel does not block waiting to write. A simplified drain function could be the following: + + func drain[T any](ch <-chan T) { + go func() { + for range ch { + } + }() + } + +The above is a sample only and should implement your own with a timeout or using context with cancel to suit your needs. + If you want to sort the results from the EntrySetFilter command you can use the following function [EntrySetFilterWithComparator]. Due generics limitations in Go, this is not a function call off the [NamedMap] or [NamedCache] interface, but a function call that takes a [NamedMap] or [NamedCache]. diff --git a/test/e2e/standalone/named_map_test.go b/test/e2e/standalone/named_map_test.go index 2da59b2..3ad5f37 100644 --- a/test/e2e/standalone/named_map_test.go +++ b/test/e2e/standalone/named_map_test.go @@ -254,6 +254,7 @@ func TestBasicOperationsAgainstMapAndCache(t *testing.T) { {"NamedCacheRunTestEntrySetFilter", utils.GetNamedCache[int, utils.Person](g, session, "entryset-filter-cache"), RunTestEntrySetFilter}, {"NamedMapRunTestEntrySetFilterWithComparator", utils.GetNamedMap[int, utils.Person](g, session, "entryset-filter-map-comparator"), RunTestEntrySetFilterWithComparator}, {"NamedCacheRunTestEntrySetFilterWithComparator", utils.GetNamedCache[int, utils.Person](g, session, "entryset-filter-cache-comparator"), RunTestEntrySetFilterWithComparator}, + {"NamedCacheRunTestEntrySetFilterWithComparatorStream", utils.GetNamedCache[int, utils.Person](g, session, "entryset-filter-cache-comparator-stream"), RunTestEntrySetFilterWithComparatorStream}, {"NamedMapRunTestKeySetFilter", utils.GetNamedMap[int, utils.Person](g, session, "keyset-map"), RunTestKeySetFilter}, {"NamedCacheRunTestKeySetFilter", utils.GetNamedCache[int, utils.Person](g, session, "keyset-cache"), RunTestKeySetFilter}, {"NamedMapRunTestGetAll", utils.GetNamedMap[int, utils.Person](g, session, "getall-filter-map"), RunTestGetAll}, @@ -901,6 +902,92 @@ func RunTestEntrySetFilterWithComparator(t *testing.T, namedMap coherence.NamedM g.Expect(len(results)).To(gomega.Equal(len(peopleData))) } +func RunTestEntrySetFilterWithComparatorStream(t *testing.T, namedMap coherence.NamedMap[int, utils.Person]) { + var ( + g = gomega.NewWithT(t) + comparatorAscending = extractors.ExtractorComparator(extractors.Extract[int]("age"), true) + count = 0 + maxPeople = 10_000 + ) + + if namedMap.GetSession().GetProtocolVersion() == 0 { + // skip as not supported in V0 + return + } + + utils.ClearNamedMap(g, namedMap) + + // populate the cache + for i := 1; i <= maxPeople; i++ { + _, err := namedMap.Put(ctx, i, utils.Person{ + ID: i, + Name: fmt.Sprintf("name-%d", i), + Age: 10 + (i % 100), + }) + g.Expect(err).NotTo(gomega.HaveOccurred()) + + } + utils.AssertSize(g, namedMap, maxPeople) + + // only retrieve 300 entries and then skip out + ch := coherence.EntrySetFilterWithComparator(ctx, namedMap, filters.Always(), comparatorAscending) + for se := range ch { + g.Expect(se.Err).ShouldNot(gomega.HaveOccurred()) + g.Expect(se.Value).ShouldNot(gomega.BeNil()) + count++ + if count == 100 { + // exit before all entries read + break + } + } + + // because we exited potentially before all entries were read, drain the channel + drainWithIdleTimeout(ch, 2*time.Second) + + // run a go routine to access the same cache again + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + count = 0 + // tun the same query again + ch2 := coherence.EntrySetFilterWithComparator(ctx, namedMap, filters.Always(), comparatorAscending) + for se := range ch2 { + g.Expect(se.Err).ShouldNot(gomega.HaveOccurred()) + count++ + } + + g.Expect(count).To(gomega.Equal(maxPeople)) + }() + + wg.Wait() +} + +func drainWithIdleTimeout[T any](ch <-chan T, timeout time.Duration) { + go func() { + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case val, ok := <-ch: + if !ok { + return + } + if !timer.Stop() { + <-timer.C + } + timer.Reset(timeout) + _ = val // discard + + case <-timer.C: + return + } + } + }() +} + func RunTestKeySetFilter(t *testing.T, namedMap coherence.NamedMap[int, utils.Person]) { var ( g = gomega.NewWithT(t)