From 707bbe895880271843dd0ead0ed7f26f231e0f88 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 4 Sep 2018 10:56:16 +0200 Subject: [PATCH] Split all the BulkAll and ScrollAll related tests into their own files and classes making it easier to read what they are doing --- .../Multiple/BulkAll/BulkAllApiTests.cs | 321 ------------------ .../Multiple/BulkAll/BulkAllApiTestsBase.cs | 52 +++ .../BulkAllCancellationTokenApiTests.cs | 63 ++++ .../BulkAll/BulkAllDeallocationApiTests.cs | 58 ++++ .../BulkAll/BulkAllDisposeApiTests.cs | 62 ++++ .../BulkAll/BulkAllExceptionApiTests.cs | 52 +++ .../BulkAll/BulkAllForEachAsyncApiTests.cs | 46 +++ .../Multiple/BulkAll/BulkAndScrollApiTests.cs | 77 +++++ 8 files changed, 410 insertions(+), 321 deletions(-) delete mode 100644 src/Tests/Tests/Document/Multiple/BulkAll/BulkAllApiTests.cs create mode 100644 src/Tests/Tests/Document/Multiple/BulkAll/BulkAllApiTestsBase.cs create mode 100644 src/Tests/Tests/Document/Multiple/BulkAll/BulkAllCancellationTokenApiTests.cs create mode 100644 src/Tests/Tests/Document/Multiple/BulkAll/BulkAllDeallocationApiTests.cs create mode 100644 src/Tests/Tests/Document/Multiple/BulkAll/BulkAllDisposeApiTests.cs create mode 100644 src/Tests/Tests/Document/Multiple/BulkAll/BulkAllExceptionApiTests.cs create mode 100644 src/Tests/Tests/Document/Multiple/BulkAll/BulkAllForEachAsyncApiTests.cs create mode 100644 src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs diff --git a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllApiTests.cs b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllApiTests.cs deleted file mode 100644 index a5f4fb53e9d..00000000000 --- a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllApiTests.cs +++ /dev/null @@ -1,321 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Reactive.Linq; -using System.Threading; -using System.Threading.Tasks; -using Elastic.Xunit.Sdk; -using Elastic.Xunit.XunitPlumbing; -using FluentAssertions; -using Nest; -using Tests.Core.Extensions; -using Tests.Core.ManagedElasticsearch.Clusters; -using Tests.Core.Xunit; -using Tests.Framework; -using Tests.Framework.Integration; -using Tests.Framework.ManagedElasticsearch.Clusters; -using Xunit; - -namespace Tests.Document.Multiple.BulkAll -{ - public class BulkAllApiTests : IClusterFixture, IClassFixture - { - private class SmallObject - { - public int Id { get; set; } - } - - private readonly IElasticClient _client; - - private static string CreateIndexName() => $"project-copy-{Guid.NewGuid().ToString("N").Substring(8)}"; - - private IEnumerable CreateLazyStreamOfDocuments(int count) - { - for (var i = 0; i < count; i++) - yield return new SmallObject() { Id = i }; - } - - public BulkAllApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage) - { - this._client = cluster.Client; - } - - [I] public async Task BulkAllAndScrollAll() - { - var index = CreateIndexName(); - - const int size = 1000, pages = 100, numberOfDocuments = size * pages, numberOfShards = 10; - var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments); - - await this.CreateIndexAsync(index, numberOfShards); - - this.BulkAll(index, documents, size, pages, numberOfDocuments); - this.ScrollAll(index, numberOfShards, numberOfDocuments); - } - - private void ScrollAll(string index, int numberOfShards, int numberOfDocuments) - { - var seenDocuments = 0; - var seenSlices = new ConcurrentBag(); - var scrollObserver = this._client.ScrollAll("1m", numberOfShards, s => s - .MaxDegreeOfParallelism(numberOfShards / 2) - .Search(search => search - .Index(index) - .AllTypes() - .MatchAll() - ) - ).Wait(TimeSpan.FromMinutes(5), r => - { - seenSlices.Add(r.Slice); - Interlocked.Add(ref seenDocuments, r.SearchResponse.Hits.Count); - }); - - seenDocuments.Should().Be(numberOfDocuments); - var groups = seenSlices.GroupBy(s => s).ToList(); - groups.Count().Should().Be(numberOfShards); - groups.Should().OnlyContain(g => g.Count() > 1); - } - - private void BulkAll(string index, IEnumerable documents, int size, int pages, int numberOfDocuments) - { - var seenPages = 0; - //first we setup our cold observable - var observableBulk = this._client.BulkAll(documents, f => f - .MaxDegreeOfParallelism(8) - .BackOffTime(TimeSpan.FromSeconds(10)) - .BackOffRetries(2) - .Size(size) - .RefreshOnCompleted() - .Index(index) - ); - //we set up an observer - var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b => Interlocked.Increment(ref seenPages)); - - seenPages.Should().Be(pages); - var count = this._client.Count(f => f.Index(index)); - count.Count.Should().Be(numberOfDocuments); - bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0); - } - - private async Task CreateIndexAsync(string indexName, int numberOfShards) - { - var result = await this._client.CreateIndexAsync(indexName, s => s - .Settings(settings => settings - .NumberOfShards(numberOfShards) - .NumberOfReplicas(0) - ) - ); - result.Should().NotBeNull(); - result.ShouldBeValid(); - } - - private static void OnError(ref Exception ex, Exception e, EventWaitHandle handle) - { - ex = e; - handle.Set(); - throw e; - } - - [I, SkipOnTeamCity] - public void DisposingObservableCancelsBulkAll() - { - var index = CreateIndexName(); - var handle = new ManualResetEvent(false); - - var size = 1000; - var pages = 1000; - var seenPages = 0; - var numberOfDocuments = size * pages; - var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments); - - //first we setup our cold observable - var observableBulk = this._client.BulkAll(documents, f => f - .MaxDegreeOfParallelism(8) - .BackOffTime(TimeSpan.FromSeconds(10)) - .BackOffRetries(2) - .Size(size) - .RefreshOnCompleted() - .Index(index) - ); - //we set up an observer - Exception ex = null; - var bulkObserver = new BulkAllObserver( - onError: (e) => OnError(ref ex, e, handle), - onCompleted: () => handle.Set(), - onNext: (b) => Interlocked.Increment(ref seenPages) - ); - - //when we subscribe the observable becomes hot - observableBulk.Subscribe(bulkObserver); - - //we wait N seconds to see some bulks - handle.WaitOne(TimeSpan.FromSeconds(3)); - observableBulk.Dispose(); - //we wait N seconds to give in flight request a chance to cancel - handle.WaitOne(TimeSpan.FromSeconds(3)); - if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex; - - seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0); - var count = this._client.Count(f => f.Index(index)); - count.Count.Should().BeLessThan(numberOfDocuments).And.BeGreaterThan(0); - bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0); - } - - [I, SkipOnTeamCity] - public void CancelBulkAll() - { - var index = CreateIndexName(); - var handle = new ManualResetEvent(false); - - var size = 1000; - var pages = 1000; - var seenPages = 0; - var numberOfDocuments = size * pages; - var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments); - - //first we setup our cold observable - var tokenSource = new CancellationTokenSource(); - var observableBulk = this._client.BulkAll(documents, f => f - .MaxDegreeOfParallelism(8) - .BackOffTime(TimeSpan.FromSeconds(10)) - .BackOffRetries(2) - .Size(size) - .RefreshOnCompleted() - .Index(index) - , tokenSource.Token); - - //we set up an observer - Exception ex = null; - var bulkObserver = new BulkAllObserver( - onError: (e) => OnError(ref ex, e, handle), - onNext: (b) => Interlocked.Increment(ref seenPages) - ); - //when we subscribe the observable becomes hot - observableBulk.Subscribe(bulkObserver); - - //we wait Nseconds to see some bulks - handle.WaitOne(TimeSpan.FromSeconds(3)); - tokenSource.Cancel(); - //we wait Nseconds to give in flight request a chance to cancel - handle.WaitOne(TimeSpan.FromSeconds(3)); - if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex; - - seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0); - var count = this._client.Count(f => f.Index(index)); - count.Count.Should().BeLessThan(numberOfDocuments).And.BeGreaterThan(0); - bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0); - bulkObserver.TotalNumberOfRetries.Should().Be(0); - } - - [I] public async Task AwaitBulkAll() - { - var index = CreateIndexName(); - - var size = 1000; - var pages = 10; - var seenPages = 0; - var numberOfDocuments = size * pages; - var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments); - - var tokenSource = new CancellationTokenSource(); - var observableBulk = this._client.BulkAll(documents, f => f - .MaxDegreeOfParallelism(8) - .BackOffTime(TimeSpan.FromSeconds(10)) - .BackOffRetries(2) - .Size(size) - .RefreshOnCompleted() - .Index(index) - .BufferToBulk((r, buffer) => r.IndexMany(buffer)) - , tokenSource.Token); - - - await observableBulk - .ForEachAsync(x => Interlocked.Increment(ref seenPages), tokenSource.Token); - - seenPages.Should().Be(pages); - var count = this._client.Count(f => f.Index(index)); - count.Count.Should().Be(numberOfDocuments); - } - - [I] public void WaitBulkAllThrowsAndIsCaught() - { - var index = CreateIndexName(); - - var size = 1000; - var pages = 10; - var seenPages = 0; - var numberOfDocuments = size * pages; - var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments); - - var tokenSource = new CancellationTokenSource(); - var observableBulk = this._client.BulkAll(documents, f => f - .MaxDegreeOfParallelism(4) - .BackOffTime(TimeSpan.FromSeconds(10)) - .BackOffRetries(2) - .Size(size) - .RefreshOnCompleted() - .Index(index) - .BufferToBulk((r, buffer) => r.IndexMany(buffer)) - , tokenSource.Token); - - try - { - observableBulk.Wait(TimeSpan.FromSeconds(30), b => - { - if (seenPages == 8) throw new Exception("boom"); - Interlocked.Increment(ref seenPages); - - }); - } - catch (Exception e) - { - seenPages.Should().Be(8); - e.Message.Should().Be("boom"); - } - } - - [I] public void ForEachAsyncReleasesProcessedItemsInMemory() - { - WeakReference deallocReference = null; - SmallObject obj = null; - - var lazyCollection = GetLazyCollection( - weakRef => deallocReference = weakRef, - delegate { },//... - delegate { },//Making sure that all of the objects have gone through pipeline - delegate { },//so that the first one can be deallocated - delegate { },//Various GC roots prevent several of previous (2 or 3) - delegate { },//items in the lazy Enumerable from deallocation during forced GC - delegate { },//... - delegate { - GC.Collect(2, GCCollectionMode.Forced, true); - deallocReference.TryGetTarget(out obj); - } - ); - - var index = CreateIndexName(); - var observableBulk = this._client.BulkAll(lazyCollection, f => f - .MaxDegreeOfParallelism(1) - .Size(1) - .Index(index) - .BufferToBulk((r, buffer) => r.IndexMany(buffer))); - - observableBulk.Wait(TimeSpan.FromSeconds(30), delegate { }); - - deallocReference.Should().NotBeNull(); - obj.Should().BeNull(); - } - - private IEnumerable GetLazyCollection(params Action>[] getFirstObjectCallBack) - { - var counter = 0; - foreach (var callback in getFirstObjectCallBack) - { - var obj = new SmallObject { Id = ++counter }; - callback(new WeakReference(obj)); - yield return obj; - } - } - } -} diff --git a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllApiTestsBase.cs b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllApiTestsBase.cs new file mode 100644 index 00000000000..9a033f3c54b --- /dev/null +++ b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllApiTestsBase.cs @@ -0,0 +1,52 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Xunit.XunitPlumbing; +using FluentAssertions; +using Nest; +using Tests.Core.Extensions; +using Tests.Core.ManagedElasticsearch.Clusters; +using Tests.Framework.Integration; +using Xunit; + +namespace Tests.Document.Multiple.BulkAll +{ + public abstract class BulkAllApiTestsBase : IClusterFixture, IClassFixture + { + protected BulkAllApiTestsBase(IntrusiveOperationCluster cluster, EndpointUsage usage) => this.Client = cluster.Client; + + protected class SmallObject + { + public int Id { get; set; } + } + + protected IElasticClient Client { get; } + + protected static string CreateIndexName() => $"project-copy-{Guid.NewGuid().ToString("N").Substring(8)}"; + + protected IEnumerable CreateLazyStreamOfDocuments(int count) + { + for (var i = 0; i < count; i++) + yield return new SmallObject() { Id = i }; + } + + protected async Task CreateIndexAsync(string indexName, int numberOfShards) + { + var result = await this.Client.CreateIndexAsync(indexName, s => s + .Settings(settings => settings + .NumberOfShards(numberOfShards) + .NumberOfReplicas(0) + ) + ); + result.Should().NotBeNull(); + result.ShouldBeValid(); + } + protected static void OnError(ref Exception ex, Exception e, EventWaitHandle handle) + { + ex = e; + handle.Set(); + throw e; + } + } +} \ No newline at end of file diff --git a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllCancellationTokenApiTests.cs b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllCancellationTokenApiTests.cs new file mode 100644 index 00000000000..4ec2c43ba1a --- /dev/null +++ b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllCancellationTokenApiTests.cs @@ -0,0 +1,63 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Xunit.XunitPlumbing; +using FluentAssertions; +using Nest; +using Tests.Core.ManagedElasticsearch.Clusters; +using Tests.Core.Xunit; +using Tests.Framework.Integration; + +namespace Tests.Document.Multiple.BulkAll +{ + public class BulkAllCancellationTokenApiTests : BulkAllApiTestsBase + { + public BulkAllCancellationTokenApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage) : base(cluster, usage) { } + + [I, SkipOnTeamCity] + public void CancelBulkAll() + { + var index = CreateIndexName(); + var handle = new ManualResetEvent(false); + + var size = 1000; + var pages = 1000; + var seenPages = 0; + var numberOfDocuments = size * pages; + var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments); + + //first we setup our cold observable + var tokenSource = new CancellationTokenSource(); + var observableBulk = this.Client.BulkAll(documents, f => f + .MaxDegreeOfParallelism(8) + .BackOffTime(TimeSpan.FromSeconds(10)) + .BackOffRetries(2) + .Size(size) + .RefreshOnCompleted() + .Index(index) + , tokenSource.Token); + + //we set up an observer + Exception ex = null; + var bulkObserver = new BulkAllObserver( + onError: (e) => OnError(ref ex, e, handle), + onNext: (b) => Interlocked.Increment(ref seenPages) + ); + //when we subscribe the observable becomes hot + observableBulk.Subscribe(bulkObserver); + + //we wait Nseconds to see some bulks + handle.WaitOne(TimeSpan.FromSeconds(3)); + tokenSource.Cancel(); + //we wait Nseconds to give in flight request a chance to cancel + handle.WaitOne(TimeSpan.FromSeconds(3)); + if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex; + + seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0); + var count = this.Client.Count(f => f.Index(index)); + count.Count.Should().BeLessThan(numberOfDocuments).And.BeGreaterThan(0); + bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0); + bulkObserver.TotalNumberOfRetries.Should().Be(0); + } + } +} diff --git a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllDeallocationApiTests.cs b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllDeallocationApiTests.cs new file mode 100644 index 00000000000..49512acf854 --- /dev/null +++ b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllDeallocationApiTests.cs @@ -0,0 +1,58 @@ +using System; +using System.Collections.Generic; +using Elastic.Xunit.XunitPlumbing; +using FluentAssertions; +using Nest; +using Tests.Core.ManagedElasticsearch.Clusters; +using Tests.Framework.Integration; + +namespace Tests.Document.Multiple.BulkAll +{ + public class BulkAllDeallocationApiTests : BulkAllApiTestsBase + { + public BulkAllDeallocationApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage) : base(cluster, usage) { } + + [I] public void ForEachAsyncReleasesProcessedItemsInMemory() + { + WeakReference deallocReference = null; + SmallObject obj = null; + + var lazyCollection = this.GetLazyCollection( + weakRef => deallocReference = weakRef, + delegate { },//... + delegate { },//Making sure that all of the objects have gone through pipeline + delegate { },//so that the first one can be deallocated + delegate { },//Various GC roots prevent several of previous (2 or 3) + delegate { },//items in the lazy Enumerable from deallocation during forced GC + delegate { },//... + delegate { + GC.Collect(2, GCCollectionMode.Forced, true); + deallocReference.TryGetTarget(out obj); + } + ); + + var index = CreateIndexName(); + var observableBulk = this.Client.BulkAll(lazyCollection, f => f + .MaxDegreeOfParallelism(1) + .Size(1) + .Index(index) + .BufferToBulk((r, buffer) => r.IndexMany(buffer))); + + observableBulk.Wait(TimeSpan.FromSeconds(30), delegate { }); + + deallocReference.Should().NotBeNull(); + obj.Should().BeNull(); + } + + private IEnumerable GetLazyCollection(params Action>[] getFirstObjectCallBack) + { + var counter = 0; + foreach (var callback in getFirstObjectCallBack) + { + var obj = new SmallObject { Id = ++counter }; + callback(new WeakReference(obj)); + yield return obj; + } + } + } +} \ No newline at end of file diff --git a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllDisposeApiTests.cs b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllDisposeApiTests.cs new file mode 100644 index 00000000000..431b376bb5f --- /dev/null +++ b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllDisposeApiTests.cs @@ -0,0 +1,62 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Xunit.XunitPlumbing; +using FluentAssertions; +using Nest; +using Tests.Core.ManagedElasticsearch.Clusters; +using Tests.Core.Xunit; +using Tests.Framework.Integration; + +namespace Tests.Document.Multiple.BulkAll +{ + public class BulkAllDisposeApiTests : BulkAllApiTestsBase + { + public BulkAllDisposeApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage) : base(cluster, usage) { } + + [I, SkipOnTeamCity] + public void DisposingObservableCancelsBulkAll() + { + var index = CreateIndexName(); + var handle = new ManualResetEvent(false); + + var size = 1000; + var pages = 1000; + var seenPages = 0; + var numberOfDocuments = size * pages; + var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments); + + //first we setup our cold observable + var observableBulk = this.Client.BulkAll(documents, f => f + .MaxDegreeOfParallelism(8) + .BackOffTime(TimeSpan.FromSeconds(10)) + .BackOffRetries(2) + .Size(size) + .RefreshOnCompleted() + .Index(index) + ); + //we set up an observer + Exception ex = null; + var bulkObserver = new BulkAllObserver( + onError: (e) => OnError(ref ex, e, handle), + onCompleted: () => handle.Set(), + onNext: (b) => Interlocked.Increment(ref seenPages) + ); + + //when we subscribe the observable becomes hot + observableBulk.Subscribe(bulkObserver); + + //we wait N seconds to see some bulks + handle.WaitOne(TimeSpan.FromSeconds(3)); + observableBulk.Dispose(); + //we wait N seconds to give in flight request a chance to cancel + handle.WaitOne(TimeSpan.FromSeconds(3)); + if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex; + + seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0); + var count = this.Client.Count(f => f.Index(index)); + count.Count.Should().BeLessThan(numberOfDocuments).And.BeGreaterThan(0); + bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0); + } + } +} \ No newline at end of file diff --git a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllExceptionApiTests.cs b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllExceptionApiTests.cs new file mode 100644 index 00000000000..c2f36aaf75e --- /dev/null +++ b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllExceptionApiTests.cs @@ -0,0 +1,52 @@ +using System; +using System.Threading; +using Elastic.Xunit.XunitPlumbing; +using FluentAssertions; +using Nest; +using Tests.Core.ManagedElasticsearch.Clusters; +using Tests.Framework.Integration; + +namespace Tests.Document.Multiple.BulkAll +{ + public class BulkAllExceptionApiTests : BulkAllApiTestsBase + { + public BulkAllExceptionApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage) : base(cluster, usage) { } + + [I] public void WaitBulkAllThrowsAndIsCaught() + { + var index = CreateIndexName(); + + var size = 1000; + var pages = 10; + var seenPages = 0; + var numberOfDocuments = size * pages; + var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments); + + var tokenSource = new CancellationTokenSource(); + var observableBulk = this.Client.BulkAll(documents, f => f + .MaxDegreeOfParallelism(4) + .BackOffTime(TimeSpan.FromSeconds(10)) + .BackOffRetries(2) + .Size(size) + .RefreshOnCompleted() + .Index(index) + .BufferToBulk((r, buffer) => r.IndexMany(buffer)) + , tokenSource.Token); + + try + { + observableBulk.Wait(TimeSpan.FromSeconds(30), b => + { + if (seenPages == 8) throw new Exception("boom"); + Interlocked.Increment(ref seenPages); + + }); + } + catch (Exception e) + { + seenPages.Should().Be(8); + e.Message.Should().Be("boom"); + } + } + } +} \ No newline at end of file diff --git a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllForEachAsyncApiTests.cs b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllForEachAsyncApiTests.cs new file mode 100644 index 00000000000..e80f89e674e --- /dev/null +++ b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAllForEachAsyncApiTests.cs @@ -0,0 +1,46 @@ +using System; +using System.Reactive.Linq; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Xunit.XunitPlumbing; +using FluentAssertions; +using Tests.Core.ManagedElasticsearch.Clusters; +using Tests.Framework.Integration; + +namespace Tests.Document.Multiple.BulkAll +{ + public class BulkAllForEachAsyncApiTests : BulkAllApiTestsBase + { + public BulkAllForEachAsyncApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage) : base(cluster, usage) { } + + [I] public async Task AwaitBulkAll() + { + var index = CreateIndexName(); + + var size = 1000; + var pages = 10; + var seenPages = 0; + var numberOfDocuments = size * pages; + var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments); + + var tokenSource = new CancellationTokenSource(); + var observableBulk = this.Client.BulkAll(documents, f => f + .MaxDegreeOfParallelism(8) + .BackOffTime(TimeSpan.FromSeconds(10)) + .BackOffRetries(2) + .Size(size) + .RefreshOnCompleted() + .Index(index) + .BufferToBulk((r, buffer) => r.IndexMany(buffer)) + , tokenSource.Token); + + + await observableBulk + .ForEachAsync(x => Interlocked.Increment(ref seenPages), tokenSource.Token); + + seenPages.Should().Be(pages); + var count = this.Client.Count(f => f.Index(index)); + count.Count.Should().Be(numberOfDocuments); + } + } +} \ No newline at end of file diff --git a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs new file mode 100644 index 00000000000..35b8af9eefa --- /dev/null +++ b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs @@ -0,0 +1,77 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Xunit.XunitPlumbing; +using FluentAssertions; +using Nest; +using Tests.Core.ManagedElasticsearch.Clusters; +using Tests.Framework.Integration; + +namespace Tests.Document.Multiple.BulkAll +{ + public class BulkAndScrollApiTests : BulkAllApiTestsBase + { + public BulkAndScrollApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage) : base(cluster, usage) { } + + [I] public async Task BulkAllAndScrollAll() + { + var index = CreateIndexName(); + + const int size = 1000, pages = 100, numberOfDocuments = size * pages, numberOfShards = 10; + var documents = this.CreateLazyStreamOfDocuments(numberOfDocuments); + + await this.CreateIndexAsync(index, numberOfShards); + + this.BulkAll(index, documents, size, pages, numberOfDocuments); + this.ScrollAll(index, size, numberOfShards, numberOfDocuments); + } + + private void ScrollAll(string index, int size, int numberOfShards, int numberOfDocuments) + { + var seenDocuments = 0; + var seenSlices = new ConcurrentBag(); + var scrollObserver = this.Client.ScrollAll("1m", numberOfShards, s => s + .MaxDegreeOfParallelism(numberOfShards / 2) + .Search(search => search + .Size(size / 2) + .Index(index) + .AllTypes() + .MatchAll() + ) + ).Wait(TimeSpan.FromMinutes(5), r => + { + seenSlices.Add(r.Slice); + Interlocked.Add(ref seenDocuments, r.SearchResponse.Hits.Count); + }); + + seenDocuments.Should().Be(numberOfDocuments); + var groups = seenSlices.GroupBy(s => s).ToList(); + groups.Count().Should().Be(numberOfShards); + groups.Should().OnlyContain(g => g.Count() > 1); + } + + private void BulkAll(string index, IEnumerable documents, int size, int pages, int numberOfDocuments) + { + var seenPages = 0; + //first we setup our cold observable + var observableBulk = this.Client.BulkAll(documents, f => f + .MaxDegreeOfParallelism(8) + .BackOffTime(TimeSpan.FromSeconds(10)) + .BackOffRetries(2) + .Size(size) + .RefreshOnCompleted() + .Index(index) + ); + //we set up an observer + var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b => Interlocked.Increment(ref seenPages)); + + seenPages.Should().Be(pages); + var count = this.Client.Count(f => f.Index(index)); + count.Count.Should().Be(numberOfDocuments); + bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0); + } + } +}