From 53857bcdd983497f20b8004cb70c6ff3b52eb34e Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Sun, 17 Feb 2019 23:56:12 -0500 Subject: [PATCH] Use custom BlockingQueue to significantly improve F5 perf with SDCA without caching --- .../DataLoadSave/Text/BlockingQueue.cs | 229 ++++++++++++++++++ .../DataLoadSave/Text/TextLoaderCursor.cs | 8 +- 2 files changed, 233 insertions(+), 4 deletions(-) create mode 100644 src/Microsoft.ML.Data/DataLoadSave/Text/BlockingQueue.cs diff --git a/src/Microsoft.ML.Data/DataLoadSave/Text/BlockingQueue.cs b/src/Microsoft.ML.Data/DataLoadSave/Text/BlockingQueue.cs new file mode 100644 index 0000000000..511702a9ee --- /dev/null +++ b/src/Microsoft.ML.Data/DataLoadSave/Text/BlockingQueue.cs @@ -0,0 +1,229 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; + +namespace Microsoft.ML.Data +{ + // NOTE: + // This is a temporary workaround for https://github.com/dotnet/corefx/issues/34602 or until TextLoader is rearchitected. + // BlockingCollection is fairly efficient for blocking producer/consumer scenarios, but it was optimized for scenarios where + // they're not created/destroyed quickly. Its CompleteAdding mechanism is implemented internally in such a way that if a + // taker is currently blocked when CompleteAdding is called, that taker thread will incur an OperationCanceledException that's + // eaten internally. If such an exception is only happening rarely, it's not a big deal, but the way TextLoader uses + // BlockingCollections, they can end up being created and destroyed very frequently, tens of thousands of times during the + // course of an algorithm like SDCA (when no caching is employed). That in turn can result in tens of thousands of exceptions + // getting thrown and caught. While in normal processing even that number of exceptions results in an overhead that's not + // particularly impactful, things change when a debugger is attached, as that makes the overhead of exceptions several orders + // of magnitude higher (e.g. 1000x). Until either TextLoader is rearchitected to not create so many BlockingCollections in these + // situations, or until this implementation detail in BlockingCollection is changed, we use a replacement BlockingQueue implementation, + // that's similar in nature to BlockingCollection (albeit without many of its bells and whistles) but that specifically doesn't + // rely on cancellation for its CompleteAdding mechanism, and thus doesn't incur this impactful exception. Other than type name, this + // type is designed to expose the exact surface area required by the existing usage of BlockingCollection, no more, no less, + // making it easy to swap in and out. + + /// Provides a thread-safe queue that supports blocking takes when empty and blocking adds when full. + /// Specifies the type of data contained. + internal sealed class BlockingQueue : IDisposable + { + /// The underlying queue storing all elements. + private readonly ConcurrentQueue _queue; + /// A semaphore that can be waited on to know when an item is available for taking. + private readonly CompletableSemaphore _itemsAvailable; + /// A semaphore that can be waited on to know when space is available for adding. + private readonly CompletableSemaphore _spaceAvailable; + + /// Initializes the blocking queue. + /// The maximum number of items the queue may contain. + public BlockingQueue(int boundedCapacity) + { + Contracts.Assert(boundedCapacity > 0); + + _queue = new ConcurrentQueue(); + _itemsAvailable = new CompletableSemaphore(0); + _spaceAvailable = new CompletableSemaphore(boundedCapacity); + } + + /// Cleans up all resources used by the blocking collection. + public void Dispose() + { + // This method/IDisposable implementation is here for API compat with BlockingCollection, + // but there's nothing to actually dispose. + } + + /// Adds an item to the blocking collection. + /// The item to add. + /// The time to wait, in milliseconds, or -1 to wait indefinitely. + /// + /// true if the item was successfully added; false if the timeout expired or if the collection were marked + /// as complete for adding before the item could be added. + /// + public bool TryAdd(T item, int millisecondsTimeout = 0) + { + Contracts.Assert(!_itemsAvailable.Completed); + + // Wait for space to be available, then once it is, enqueue the item, + // and notify anyone waiting that another item is available. + if (_spaceAvailable.Wait(millisecondsTimeout)) + { + _queue.Enqueue(item); + _itemsAvailable.Release(); + return true; + } + + return false; + } + + /// Tries to take an item from the blocking collection. + /// The item removed, or default if none could be taken. + /// The time to wait, in milliseconds, or -1 to wait indefinitely. + /// + /// true if the item was successfully taken; false if the timeout expired or if the collection is empty + /// and has been marked as complete for adding. + /// + public bool TryTake(out T item, int millisecondsTimeout = 0) + { + // Wait for an item to be available, and once one is, dequeue it, + // and assuming we got one, notify anyone waiting that space is available. + if (_itemsAvailable.Wait(millisecondsTimeout)) + { + bool gotItem = _queue.TryDequeue(out item); + Contracts.Assert(gotItem || _itemsAvailable.Completed); + if (gotItem) + { + _spaceAvailable.Release(); + return true; + } + } + + item = default; + return false; + } + + /// + /// Gets an enumerable for taking all items out of the collection until + /// the collection has been marked as complete for adding and is empty. + /// + public IEnumerable GetConsumingEnumerable() + { + // Block waiting for each additional item, yielding each as we take it, + // and exiting only when the collection is and will forever be empty. + while (TryTake(out T item, Timeout.Infinite)) + { + yield return item; + } + } + + /// Mark the collection as complete for adding. + /// After this is called, no calls made on this queue will block. + public void CompleteAdding() + { + _itemsAvailable.Complete(); + _spaceAvailable.Complete(); + } + + /// + /// A basic monitor-based semaphore that, in addition to standard Wait/Release semantics, + /// also supports marking the semaphore as completed, in which case all waiters immediately + /// fail if there's no count remaining. + /// + private sealed class CompletableSemaphore + { + /// The remaining count in the semaphore. + private int _count; + /// The number of threads currently waiting in Wait. + private int _waiters; + + /// Initializes the semaphore with the specified initial count. + /// The initial count. + public CompletableSemaphore(int initialCount) + { + Contracts.Assert(initialCount >= 0); + _count = initialCount; + } + + /// Gets whether the semaphore has been marked as completed. + /// + /// If completed, no calls to Wait will block; if no count remains, regardless of timeout, Waits will + /// return immediately with a result of false. + /// + public bool Completed { get; private set; } + + /// Releases the semaphore once. + public void Release() + { + lock (this) + { + // Increment the count, and if anyone is waiting, notify one of them. + _count++; + if (_waiters > 0) + { + Monitor.Pulse(this); + } + } + } + + /// Blocks the current thread until it can enter the semaphore once. + /// The maximum amount of time to wait to enter the semaphore, or -1 to wait indefinitely. + /// true if the semaphore was entered; otherwise, false. + public bool Wait(int millisecondsTimeout = Timeout.Infinite) + { + lock (this) + { + while (true) + { + // If the count is greater than 0, take one, and we're done. + Contracts.Assert(_count >= 0); + if (_count > 0) + { + _count--; + return true; + } + + // If the count is 0 but we've been marked as completed, fail. + if (Completed) + { + return false; + } + + // Wait until either there's a count available or the timeout expires. + // In practice we should never have a case where the timeout occurs + // and we need to wait again, so we don't bother doing any manual + // tracking of the timeout. + _waiters++; + try + { + if (!Monitor.Wait(this, millisecondsTimeout)) + { + return false; + } + } + finally + { + _waiters--; + Contracts.Assert(_waiters >= 0); + } + } + } + } + + /// Marks the semaphore as completed, such that no further operations will block. + public void Complete() + { + lock (this) + { + // Mark the semaphore as completed and wake up all waiters. + Completed = true; + if (_waiters > 0) + { + Monitor.PulseAll(this); + } + } + } + } + } +} diff --git a/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoaderCursor.cs b/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoaderCursor.cs index 534497505a..566ad84cdc 100644 --- a/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoaderCursor.cs +++ b/src/Microsoft.ML.Data/DataLoadSave/Text/TextLoaderCursor.cs @@ -395,7 +395,7 @@ private sealed class LineReader // The line reader can be referenced by multiple workers. This is the reference count. private int _cref; - private BlockingCollection _queue; + private BlockingQueue _queue; private Task _thdRead; private volatile bool _abort; @@ -415,7 +415,7 @@ public LineReader(IMultiStreamSource files, int batchSize, int bufSize, bool has _files = files; _cref = cref; - _queue = new BlockingCollection(bufSize); + _queue = new BlockingQueue(bufSize); _thdRead = Utils.RunOnBackgroundThread(ThreadProc); } @@ -638,7 +638,7 @@ private sealed class ParallelState : IDisposable private readonly OrderedWaiter _waiterPublish; // A small capacity blocking collection that the main cursor thread consumes. - private readonly BlockingCollection _queue; + private readonly BlockingQueue _queue; private readonly Task[] _threads; @@ -673,7 +673,7 @@ public ParallelState(Cursor curs, out RowSet rows, int cthd) // The size limit here ensures that worker threads are never writing to // a range that is being served up by the cursor. - _queue = new BlockingCollection(2); + _queue = new BlockingQueue(2); _threads = new Task[cthd]; _threadsRunning = cthd;