Skip to content

Use custom BlockingQueue to significantly improve F5 perf with SDCA without caching #2595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions src/Microsoft.ML.Data/DataLoadSave/Text/BlockingQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;

namespace Microsoft.ML.Data
{
// NOTE:
// This is a temporary workaround for https://github.com/dotnet/corefx/issues/34602 or until TextLoader is rearchitected.
// BlockingCollection is fairly efficient for blocking producer/consumer scenarios, but it was optimized for scenarios where
// they're not created/destroyed quickly. Its CompleteAdding mechanism is implemented internally in such a way that if a
// taker is currently blocked when CompleteAdding is called, that taker thread will incur an OperationCanceledException that's
// eaten internally. If such an exception is only happening rarely, it's not a big deal, but the way TextLoader uses
// BlockingCollections, they can end up being created and destroyed very frequently, tens of thousands of times during the
// course of an algorithm like SDCA (when no caching is employed). That in turn can result in tens of thousands of exceptions
// getting thrown and caught. While in normal processing even that number of exceptions results in an overhead that's not
// particularly impactful, things change when a debugger is attached, as that makes the overhead of exceptions several orders
// of magnitude higher (e.g. 1000x). Until either TextLoader is rearchitected to not create so many BlockingCollections in these
// situations, or until this implementation detail in BlockingCollection is changed, we use a replacement BlockingQueue implementation,
// that's similar in nature to BlockingCollection (albeit without many of its bells and whistles) but that specifically doesn't
// rely on cancellation for its CompleteAdding mechanism, and thus doesn't incur this impactful exception. Other than type name, this
// type is designed to expose the exact surface area required by the existing usage of BlockingCollection<T>, no more, no less,
// making it easy to swap in and out.

/// <summary>Provides a thread-safe queue that supports blocking takes when empty and blocking adds when full.</summary>
/// <typeparam name="T">Specifies the type of data contained.</typeparam>
internal sealed class BlockingQueue<T> : IDisposable
{
/// <summary>The underlying queue storing all elements.</summary>
private readonly ConcurrentQueue<T> _queue;
/// <summary>A semaphore that can be waited on to know when an item is available for taking.</summary>
private readonly CompletableSemaphore _itemsAvailable;
/// <summary>A semaphore that can be waited on to know when space is available for adding.</summary>
private readonly CompletableSemaphore _spaceAvailable;

/// <summary>Initializes the blocking queue.</summary>
/// <param name="boundedCapacity">The maximum number of items the queue may contain.</param>
public BlockingQueue(int boundedCapacity)
{
Contracts.Assert(boundedCapacity > 0);

_queue = new ConcurrentQueue<T>();
_itemsAvailable = new CompletableSemaphore(0);
_spaceAvailable = new CompletableSemaphore(boundedCapacity);
}

/// <summary>Cleans up all resources used by the blocking collection.</summary>
public void Dispose()
{
// This method/IDisposable implementation is here for API compat with BlockingCollection<T>,
// but there's nothing to actually dispose.
}

/// <summary>Adds an item to the blocking collection.</summary>
/// <param name="item">The item to add.</param>
/// <param name="millisecondsTimeout">The time to wait, in milliseconds, or -1 to wait indefinitely.</param>
/// <returns>
/// true if the item was successfully added; false if the timeout expired or if the collection were marked
/// as complete for adding before the item could be added.
/// </returns>
public bool TryAdd(T item, int millisecondsTimeout = 0)
{
Contracts.Assert(!_itemsAvailable.Completed);

// Wait for space to be available, then once it is, enqueue the item,
// and notify anyone waiting that another item is available.
if (_spaceAvailable.Wait(millisecondsTimeout))
{
_queue.Enqueue(item);
_itemsAvailable.Release();
return true;
}

return false;
}

/// <summary>Tries to take an item from the blocking collection.</summary>
/// <param name="item">The item removed, or default if none could be taken.</param>
/// <param name="millisecondsTimeout">The time to wait, in milliseconds, or -1 to wait indefinitely.</param>
/// <returns>
/// true if the item was successfully taken; false if the timeout expired or if the collection is empty
/// and has been marked as complete for adding.
/// </returns>
public bool TryTake(out T item, int millisecondsTimeout = 0)
{
// Wait for an item to be available, and once one is, dequeue it,
// and assuming we got one, notify anyone waiting that space is available.
if (_itemsAvailable.Wait(millisecondsTimeout))
{
bool gotItem = _queue.TryDequeue(out item);
Contracts.Assert(gotItem || _itemsAvailable.Completed);
if (gotItem)
{
_spaceAvailable.Release();
return true;
}
}

item = default;
return false;
}

/// <summary>
/// Gets an enumerable for taking all items out of the collection until
/// the collection has been marked as complete for adding and is empty.
/// </summary>
public IEnumerable<T> GetConsumingEnumerable()
{
// Block waiting for each additional item, yielding each as we take it,
// and exiting only when the collection is and will forever be empty.
while (TryTake(out T item, Timeout.Infinite))
{
yield return item;
}
}

/// <summary>Mark the collection as complete for adding.</summary>
/// <remarks>After this is called, no calls made on this queue will block.</remarks>
public void CompleteAdding()
{
_itemsAvailable.Complete();
_spaceAvailable.Complete();
}

/// <summary>
/// A basic monitor-based semaphore that, in addition to standard Wait/Release semantics,
/// also supports marking the semaphore as completed, in which case all waiters immediately
/// fail if there's no count remaining.
/// </summary>
private sealed class CompletableSemaphore
{
/// <summary>The remaining count in the semaphore.</summary>
private int _count;
/// <summary>The number of threads currently waiting in Wait.</summary>
private int _waiters;

/// <summary>Initializes the semaphore with the specified initial count.</summary>
/// <param name="initialCount">The initial count.</param>
public CompletableSemaphore(int initialCount)
{
Contracts.Assert(initialCount >= 0);
_count = initialCount;
}

/// <summary>Gets whether the semaphore has been marked as completed.</summary>
/// <remarks>
/// If completed, no calls to Wait will block; if no count remains, regardless of timeout, Waits will
/// return immediately with a result of false.
/// </remarks>
public bool Completed { get; private set; }

/// <summary>Releases the semaphore once.</summary>
public void Release()
{
lock (this)
{
// Increment the count, and if anyone is waiting, notify one of them.
_count++;
if (_waiters > 0)
{
Monitor.Pulse(this);
}
}
}

/// <summary>Blocks the current thread until it can enter the semaphore once.</summary>
/// <param name="millisecondsTimeout">The maximum amount of time to wait to enter the semaphore, or -1 to wait indefinitely.</param>
/// <returns>true if the semaphore was entered; otherwise, false.</returns>
public bool Wait(int millisecondsTimeout = Timeout.Infinite)
{
lock (this)
{
while (true)
{
// If the count is greater than 0, take one, and we're done.
Contracts.Assert(_count >= 0);
if (_count > 0)
{
_count--;
return true;
}

// If the count is 0 but we've been marked as completed, fail.
if (Completed)
{
return false;
}

// Wait until either there's a count available or the timeout expires.
// In practice we should never have a case where the timeout occurs
// and we need to wait again, so we don't bother doing any manual
// tracking of the timeout.
_waiters++;
try
{
if (!Monitor.Wait(this, millisecondsTimeout))
{
return false;
}
}
finally
{
_waiters--;
Contracts.Assert(_waiters >= 0);
}
}
}
}

/// <summary>Marks the semaphore as completed, such that no further operations will block.</summary>
public void Complete()
{
lock (this)
{
// Mark the semaphore as completed and wake up all waiters.
Completed = true;
if (_waiters > 0)
{
Monitor.PulseAll(this);
}
}
}
}
}
}
8 changes: 4 additions & 4 deletions src/Microsoft.ML.Data/DataLoadSave/Text/TextLoaderCursor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ private sealed class LineReader

// The line reader can be referenced by multiple workers. This is the reference count.
private int _cref;
private BlockingCollection<LineBatch> _queue;
private BlockingQueue<LineBatch> _queue;
private Task _thdRead;
private volatile bool _abort;

Expand All @@ -415,7 +415,7 @@ public LineReader(IMultiStreamSource files, int batchSize, int bufSize, bool has
_files = files;
_cref = cref;

_queue = new BlockingCollection<LineBatch>(bufSize);
_queue = new BlockingQueue<LineBatch>(bufSize);
_thdRead = Utils.RunOnBackgroundThread(ThreadProc);
}

Expand Down Expand Up @@ -638,7 +638,7 @@ private sealed class ParallelState : IDisposable
private readonly OrderedWaiter _waiterPublish;

// A small capacity blocking collection that the main cursor thread consumes.
private readonly BlockingCollection<RowBatch> _queue;
private readonly BlockingQueue<RowBatch> _queue;

private readonly Task[] _threads;

Expand Down Expand Up @@ -673,7 +673,7 @@ public ParallelState(Cursor curs, out RowSet rows, int cthd)

// The size limit here ensures that worker threads are never writing to
// a range that is being served up by the cursor.
_queue = new BlockingCollection<RowBatch>(2);
_queue = new BlockingQueue<RowBatch>(2);

_threads = new Task[cthd];
_threadsRunning = cthd;
Expand Down