Skip to content

Commit f9d5ec2

Browse files
stephentoubTomFinley
authored andcommitted
Use custom BlockingQueue to significantly improve F5 perf with SDCA without caching (#2595)
1 parent 369a9b6 commit f9d5ec2

File tree

2 files changed

+233
-4
lines changed

2 files changed

+233
-4
lines changed
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Collections.Concurrent;
7+
using System.Collections.Generic;
8+
using System.Threading;
9+
10+
namespace Microsoft.ML.Data
11+
{
12+
// NOTE:
13+
// This is a temporary workaround for https://github.com/dotnet/corefx/issues/34602 or until TextLoader is rearchitected.
14+
// BlockingCollection is fairly efficient for blocking producer/consumer scenarios, but it was optimized for scenarios where
15+
// they're not created/destroyed quickly. Its CompleteAdding mechanism is implemented internally in such a way that if a
16+
// taker is currently blocked when CompleteAdding is called, that taker thread will incur an OperationCanceledException that's
17+
// eaten internally. If such an exception is only happening rarely, it's not a big deal, but the way TextLoader uses
18+
// BlockingCollections, they can end up being created and destroyed very frequently, tens of thousands of times during the
19+
// course of an algorithm like SDCA (when no caching is employed). That in turn can result in tens of thousands of exceptions
20+
// getting thrown and caught. While in normal processing even that number of exceptions results in an overhead that's not
21+
// particularly impactful, things change when a debugger is attached, as that makes the overhead of exceptions several orders
22+
// of magnitude higher (e.g. 1000x). Until either TextLoader is rearchitected to not create so many BlockingCollections in these
23+
// situations, or until this implementation detail in BlockingCollection is changed, we use a replacement BlockingQueue implementation,
24+
// that's similar in nature to BlockingCollection (albeit without many of its bells and whistles) but that specifically doesn't
25+
// rely on cancellation for its CompleteAdding mechanism, and thus doesn't incur this impactful exception. Other than type name, this
26+
// type is designed to expose the exact surface area required by the existing usage of BlockingCollection<T>, no more, no less,
27+
// making it easy to swap in and out.
28+
29+
/// <summary>Provides a thread-safe queue that supports blocking takes when empty and blocking adds when full.</summary>
30+
/// <typeparam name="T">Specifies the type of data contained.</typeparam>
31+
internal sealed class BlockingQueue<T> : IDisposable
32+
{
33+
/// <summary>The underlying queue storing all elements.</summary>
34+
private readonly ConcurrentQueue<T> _queue;
35+
/// <summary>A semaphore that can be waited on to know when an item is available for taking.</summary>
36+
private readonly CompletableSemaphore _itemsAvailable;
37+
/// <summary>A semaphore that can be waited on to know when space is available for adding.</summary>
38+
private readonly CompletableSemaphore _spaceAvailable;
39+
40+
/// <summary>Initializes the blocking queue.</summary>
41+
/// <param name="boundedCapacity">The maximum number of items the queue may contain.</param>
42+
public BlockingQueue(int boundedCapacity)
43+
{
44+
Contracts.Assert(boundedCapacity > 0);
45+
46+
_queue = new ConcurrentQueue<T>();
47+
_itemsAvailable = new CompletableSemaphore(0);
48+
_spaceAvailable = new CompletableSemaphore(boundedCapacity);
49+
}
50+
51+
/// <summary>Cleans up all resources used by the blocking collection.</summary>
52+
public void Dispose()
53+
{
54+
// This method/IDisposable implementation is here for API compat with BlockingCollection<T>,
55+
// but there's nothing to actually dispose.
56+
}
57+
58+
/// <summary>Adds an item to the blocking collection.</summary>
59+
/// <param name="item">The item to add.</param>
60+
/// <param name="millisecondsTimeout">The time to wait, in milliseconds, or -1 to wait indefinitely.</param>
61+
/// <returns>
62+
/// true if the item was successfully added; false if the timeout expired or if the collection were marked
63+
/// as complete for adding before the item could be added.
64+
/// </returns>
65+
public bool TryAdd(T item, int millisecondsTimeout = 0)
66+
{
67+
Contracts.Assert(!_itemsAvailable.Completed);
68+
69+
// Wait for space to be available, then once it is, enqueue the item,
70+
// and notify anyone waiting that another item is available.
71+
if (_spaceAvailable.Wait(millisecondsTimeout))
72+
{
73+
_queue.Enqueue(item);
74+
_itemsAvailable.Release();
75+
return true;
76+
}
77+
78+
return false;
79+
}
80+
81+
/// <summary>Tries to take an item from the blocking collection.</summary>
82+
/// <param name="item">The item removed, or default if none could be taken.</param>
83+
/// <param name="millisecondsTimeout">The time to wait, in milliseconds, or -1 to wait indefinitely.</param>
84+
/// <returns>
85+
/// true if the item was successfully taken; false if the timeout expired or if the collection is empty
86+
/// and has been marked as complete for adding.
87+
/// </returns>
88+
public bool TryTake(out T item, int millisecondsTimeout = 0)
89+
{
90+
// Wait for an item to be available, and once one is, dequeue it,
91+
// and assuming we got one, notify anyone waiting that space is available.
92+
if (_itemsAvailable.Wait(millisecondsTimeout))
93+
{
94+
bool gotItem = _queue.TryDequeue(out item);
95+
Contracts.Assert(gotItem || _itemsAvailable.Completed);
96+
if (gotItem)
97+
{
98+
_spaceAvailable.Release();
99+
return true;
100+
}
101+
}
102+
103+
item = default;
104+
return false;
105+
}
106+
107+
/// <summary>
108+
/// Gets an enumerable for taking all items out of the collection until
109+
/// the collection has been marked as complete for adding and is empty.
110+
/// </summary>
111+
public IEnumerable<T> GetConsumingEnumerable()
112+
{
113+
// Block waiting for each additional item, yielding each as we take it,
114+
// and exiting only when the collection is and will forever be empty.
115+
while (TryTake(out T item, Timeout.Infinite))
116+
{
117+
yield return item;
118+
}
119+
}
120+
121+
/// <summary>Mark the collection as complete for adding.</summary>
122+
/// <remarks>After this is called, no calls made on this queue will block.</remarks>
123+
public void CompleteAdding()
124+
{
125+
_itemsAvailable.Complete();
126+
_spaceAvailable.Complete();
127+
}
128+
129+
/// <summary>
130+
/// A basic monitor-based semaphore that, in addition to standard Wait/Release semantics,
131+
/// also supports marking the semaphore as completed, in which case all waiters immediately
132+
/// fail if there's no count remaining.
133+
/// </summary>
134+
private sealed class CompletableSemaphore
135+
{
136+
/// <summary>The remaining count in the semaphore.</summary>
137+
private int _count;
138+
/// <summary>The number of threads currently waiting in Wait.</summary>
139+
private int _waiters;
140+
141+
/// <summary>Initializes the semaphore with the specified initial count.</summary>
142+
/// <param name="initialCount">The initial count.</param>
143+
public CompletableSemaphore(int initialCount)
144+
{
145+
Contracts.Assert(initialCount >= 0);
146+
_count = initialCount;
147+
}
148+
149+
/// <summary>Gets whether the semaphore has been marked as completed.</summary>
150+
/// <remarks>
151+
/// If completed, no calls to Wait will block; if no count remains, regardless of timeout, Waits will
152+
/// return immediately with a result of false.
153+
/// </remarks>
154+
public bool Completed { get; private set; }
155+
156+
/// <summary>Releases the semaphore once.</summary>
157+
public void Release()
158+
{
159+
lock (this)
160+
{
161+
// Increment the count, and if anyone is waiting, notify one of them.
162+
_count++;
163+
if (_waiters > 0)
164+
{
165+
Monitor.Pulse(this);
166+
}
167+
}
168+
}
169+
170+
/// <summary>Blocks the current thread until it can enter the semaphore once.</summary>
171+
/// <param name="millisecondsTimeout">The maximum amount of time to wait to enter the semaphore, or -1 to wait indefinitely.</param>
172+
/// <returns>true if the semaphore was entered; otherwise, false.</returns>
173+
public bool Wait(int millisecondsTimeout = Timeout.Infinite)
174+
{
175+
lock (this)
176+
{
177+
while (true)
178+
{
179+
// If the count is greater than 0, take one, and we're done.
180+
Contracts.Assert(_count >= 0);
181+
if (_count > 0)
182+
{
183+
_count--;
184+
return true;
185+
}
186+
187+
// If the count is 0 but we've been marked as completed, fail.
188+
if (Completed)
189+
{
190+
return false;
191+
}
192+
193+
// Wait until either there's a count available or the timeout expires.
194+
// In practice we should never have a case where the timeout occurs
195+
// and we need to wait again, so we don't bother doing any manual
196+
// tracking of the timeout.
197+
_waiters++;
198+
try
199+
{
200+
if (!Monitor.Wait(this, millisecondsTimeout))
201+
{
202+
return false;
203+
}
204+
}
205+
finally
206+
{
207+
_waiters--;
208+
Contracts.Assert(_waiters >= 0);
209+
}
210+
}
211+
}
212+
}
213+
214+
/// <summary>Marks the semaphore as completed, such that no further operations will block.</summary>
215+
public void Complete()
216+
{
217+
lock (this)
218+
{
219+
// Mark the semaphore as completed and wake up all waiters.
220+
Completed = true;
221+
if (_waiters > 0)
222+
{
223+
Monitor.PulseAll(this);
224+
}
225+
}
226+
}
227+
}
228+
}
229+
}

src/Microsoft.ML.Data/DataLoadSave/Text/TextLoaderCursor.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ private sealed class LineReader
395395

396396
// The line reader can be referenced by multiple workers. This is the reference count.
397397
private int _cref;
398-
private BlockingCollection<LineBatch> _queue;
398+
private BlockingQueue<LineBatch> _queue;
399399
private Task _thdRead;
400400
private volatile bool _abort;
401401

@@ -415,7 +415,7 @@ public LineReader(IMultiStreamSource files, int batchSize, int bufSize, bool has
415415
_files = files;
416416
_cref = cref;
417417

418-
_queue = new BlockingCollection<LineBatch>(bufSize);
418+
_queue = new BlockingQueue<LineBatch>(bufSize);
419419
_thdRead = Utils.RunOnBackgroundThread(ThreadProc);
420420
}
421421

@@ -638,7 +638,7 @@ private sealed class ParallelState : IDisposable
638638
private readonly OrderedWaiter _waiterPublish;
639639

640640
// A small capacity blocking collection that the main cursor thread consumes.
641-
private readonly BlockingCollection<RowBatch> _queue;
641+
private readonly BlockingQueue<RowBatch> _queue;
642642

643643
private readonly Task[] _threads;
644644

@@ -673,7 +673,7 @@ public ParallelState(Cursor curs, out RowSet rows, int cthd)
673673

674674
// The size limit here ensures that worker threads are never writing to
675675
// a range that is being served up by the cursor.
676-
_queue = new BlockingCollection<RowBatch>(2);
676+
_queue = new BlockingQueue<RowBatch>(2);
677677

678678
_threads = new Task[cthd];
679679
_threadsRunning = cthd;

0 commit comments

Comments
 (0)