{
readonly IProducerConsumerCollection<T> underlyingColl;
readonly int upperBound;
- readonly Func<bool> isFull;
readonly SpinWait sw = new SpinWait ();
AtomicBoolean isComplete;
+ long completeId;
+
+ long addId = long.MinValue;
+ long removeId = long.MinValue;
#region ctors
public BlockingCollection ()
this.underlyingColl = underlyingColl;
this.upperBound = upperBound;
this.isComplete = new AtomicBoolean ();
-
- if (upperBound == -1)
- isFull = FalseIsFull;
- else
- isFull = CountBasedIsFull;
}
+ #endregion
- static bool FalseIsFull ()
+ #region Add & Remove (+ Try)
+ public void Add (T item)
{
- return false;
+ Add (item, null);
}
- bool CountBasedIsFull ()
+ public void Add (T item, CancellationToken token)
{
- return underlyingColl.Count >= upperBound;
+ Add (item, () => token.IsCancellationRequested);
}
- #endregion
- #region Add & Remove (+ Try)
- public void Add (T item)
+ void Add (T item, Func<bool> cancellationFunc)
{
while (true) {
- while (isFull ()) {
- if (isComplete.Value)
- throw new InvalidOperationException ("The BlockingCollection<T>"
- + " has been marked as complete with regards to additions.");
- Block ();
+ long cachedAddId = addId;
+ long cachedRemoveId = removeId;
+
+ if (upperBound != -1) {
+ if (cachedAddId - cachedRemoveId > upperBound) {
+ Block ();
+ continue;
+ }
}
- // Extra check. The status might have changed after Block() or if isFull() is always false
- if (isComplete.Value)
+
+ // Check our transaction id against completed stored one
+ if (isComplete.Value && cachedAddId >= completeId)
throw new InvalidOperationException ("The BlockingCollection<T> has"
+ " been marked as complete with regards to additions.");
- // Go back in main waiting loop
- if (isFull ())
- continue;
- if (underlyingColl.TryAdd (item))
+ if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
break;
+
+ if (cancellationFunc != null && cancellationFunc ())
+ throw new OperationCanceledException ("CancellationToken triggered");
}
+
+
+ if (!underlyingColl.TryAdd (item))
+ throw new InvalidOperationException ("The underlying collection didn't accept the item.");
}
- public T Remove ()
+ public T Take ()
{
- T item;
-
- while (underlyingColl.Count == 0 || !underlyingColl.TryTake (out item)) {
- if (isComplete.Value)
- throw new OperationCanceledException ("The BlockingCollection<T> is empty and has been marked as complete with regards to additions.");
- Block ();
+ return Take (null);
+ }
+
+ public T Take (CancellationToken token)
+ {
+ return Take (() => token.IsCancellationRequested);
+ }
+
+ T Take (Func<bool> cancellationFunc)
+ {
+ while (true) {
+ long cachedRemoveId = removeId;
+ long cachedAddId = addId;
+
+ // Empty case
+ if (cachedRemoveId == cachedAddId) {
+ if (isComplete.Value && cachedRemoveId >= completeId)
+ throw new OperationCanceledException ("The BlockingCollection<T> has"
+ + " been marked as complete with regards to additions.");
+
+ Block ();
+ continue;
+ }
+
+ if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
+ break;
+
+ if (cancellationFunc != null && cancellationFunc ())
+ throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
}
+ T item;
+ while (!underlyingColl.TryTake (out item));
+
return item;
}
public bool TryAdd (T item)
{
- if (isComplete.Value || isFull ()) {
- return false;
- }
+ return TryAdd (item, null, null);
+ }
+
+ bool TryAdd (T item, Func<bool> contFunc, CancellationToken? token)
+ {
+ do {
+ if (token.HasValue && token.Value.IsCancellationRequested)
+ throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
+
+ long cachedAddId = addId;
+ long cachedRemoveId = removeId;
+
+ if (upperBound != -1) {
+ if (cachedAddId - cachedRemoveId > upperBound) {
+ continue;
+ }
+ }
+
+ // Check our transaction id against completed stored one
+ if (isComplete.Value && cachedAddId >= completeId)
+ throw new InvalidOperationException ("The BlockingCollection<T> has"
+ + " been marked as complete with regards to additions.");
+
+ if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
+ continue;
+
+ if (!underlyingColl.TryAdd (item))
+ throw new InvalidOperationException ("The underlying collection didn't accept the item.");
+
+ return true;
+ } while (contFunc != null && contFunc ());
- return underlyingColl.TryAdd (item);
+ return false;
}
public bool TryAdd (T item, TimeSpan ts)
public bool TryAdd (T item, int millisecondsTimeout)
{
Stopwatch sw = Stopwatch.StartNew ();
- while (isFull ()) {
- if (isComplete.Value || sw.ElapsedMilliseconds > millisecondsTimeout) {
- sw.Stop ();
- return false;
+ return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
+ }
+
+ public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
+ {
+ Stopwatch sw = Stopwatch.StartNew ();
+ return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
+ }
+
+ public bool TryTake (out T item)
+ {
+ return TryTake (out item, null, null);
+ }
+
+ bool TryTake (out T item, Func<bool> contFunc, CancellationToken? token)
+ {
+ item = default (T);
+
+ do {
+ if (token.HasValue && token.Value.IsCancellationRequested)
+ throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
+
+ long cachedRemoveId = removeId;
+ long cachedAddId = addId;
+
+ // Empty case
+ if (cachedRemoveId == cachedAddId) {
+ if (isComplete.Value && cachedRemoveId >= completeId)
+ continue;
+
+ continue;
}
- Block ();
- }
- return TryAdd (item);
+
+ if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
+ continue;
+
+ return underlyingColl.TryTake (out item);
+ } while (contFunc != null && contFunc ());
+
+ return false;
}
- public bool TryRemove (out T item)
+ public bool TryTake (out T item, TimeSpan ts)
{
- return underlyingColl.TryTake (out item);
+ return TryTake (out item, (int)ts.TotalMilliseconds);
}
- public bool TryRemove (out T item, TimeSpan ts)
+ public bool TryTake (out T item, int millisecondsTimeout)
{
- return TryRemove (out item, (int)ts.TotalMilliseconds);
+ item = default (T);
+ Stopwatch sw = Stopwatch.StartNew ();
+
+ return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
}
- public bool TryRemove (out T item, int millisecondsTimeout)
+ public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
{
+ item = default (T);
Stopwatch sw = Stopwatch.StartNew ();
- while (underlyingColl.Count == 0) {
- if (isComplete.Value || sw.ElapsedMilliseconds > millisecondsTimeout) {
- item = default (T);
- return false;
- }
-
- Block ();
- }
- return TryRemove (out item);
+
+ return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
}
#endregion
return false;
}
- public static int AddAny (BlockingCollection<T>[] collections, T item)
+ public static int AddToAny (BlockingCollection<T>[] collections, T item)
{
CheckArray (collections);
int index = 0;
return -1;
}
- public static int TryAddAny (BlockingCollection<T>[] collections, T item)
+ public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
+ {
+ CheckArray (collections);
+ int index = 0;
+ foreach (var coll in collections) {
+ try {
+ coll.Add (item, token);
+ return index;
+ } catch {}
+ index++;
+ }
+ return -1;
+ }
+
+ public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
{
CheckArray (collections);
int index = 0;
return -1;
}
- public static int TryAddAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
+ public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
{
CheckArray (collections);
int index = 0;
return -1;
}
- public static int TryAddAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
+ public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
{
CheckArray (collections);
int index = 0;
return -1;
}
- public static int RemoveAny (BlockingCollection<T>[] collections, out T item)
+ public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
+ CancellationToken token)
+ {
+ CheckArray (collections);
+ int index = 0;
+ foreach (var coll in collections) {
+ if (coll.TryAdd (item, millisecondsTimeout, token))
+ return index;
+ index++;
+ }
+ return -1;
+ }
+
+ public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
+ {
+ item = default (T);
+ CheckArray (collections);
+ int index = 0;
+ foreach (var coll in collections) {
+ try {
+ item = coll.Take ();
+ return index;
+ } catch {}
+ index++;
+ }
+ return -1;
+ }
+
+ public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
{
item = default (T);
CheckArray (collections);
int index = 0;
foreach (var coll in collections) {
try {
- item = coll.Remove ();
+ item = coll.Take (token);
return index;
} catch {}
index++;
return -1;
}
- public static int TryRemoveAny (BlockingCollection<T>[] collections, out T item)
+ public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
+ {
+ item = default (T);
+
+ CheckArray (collections);
+ int index = 0;
+ foreach (var coll in collections) {
+ if (coll.TryTake (out item))
+ return index;
+ index++;
+ }
+ return -1;
+ }
+
+ public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
{
item = default (T);
CheckArray (collections);
int index = 0;
foreach (var coll in collections) {
- if (coll.TryRemove (out item))
+ if (coll.TryTake (out item, ts))
return index;
index++;
}
return -1;
}
- public static int TryRemoveAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
+ public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
{
item = default (T);
CheckArray (collections);
int index = 0;
foreach (var coll in collections) {
- if (coll.TryRemove (out item, ts))
+ if (coll.TryTake (out item, millisecondsTimeout))
return index;
index++;
}
return -1;
}
- public static int TryRemoveAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
+ public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
+ CancellationToken token)
{
item = default (T);
CheckArray (collections);
int index = 0;
foreach (var coll in collections) {
- if (coll.TryRemove (out item, millisecondsTimeout))
+ if (coll.TryTake (out item, millisecondsTimeout, token))
return index;
index++;
}
public void CompleteAdding ()
{
- isComplete.Value = true;
+ // No further add beside that point
+ completeId = addId;
+ isComplete.Value = true;
}
void ICollection.CopyTo (Array array, int index)
public IEnumerable<T> GetConsumingEnumerable ()
{
- T item;
- while (underlyingColl.TryTake (out item)) {
+ return GetConsumingEnumerable (Take);
+ }
+
+ public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
+ {
+ return GetConsumingEnumerable (() => Take (token));
+ }
+
+ IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
+ {
+ while (true) {
+ T item = default (T);
+
+ try {
+ item = getFunc ();
+ } catch {
+ break;
+ }
+
yield return item;
}
}
return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
}
- public IEnumerator<T> GetEnumerator ()
+ public void Dispose ()
{
- return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
+
}
- public void Dispose ()
+ protected virtual void Dispose (bool managedRes)
{
+
}
public T[] ToArray ()
public bool IsCompleted {
get {
- return isComplete.Value && underlyingColl.Count == 0;
+ return isComplete.Value && addId == removeId;
}
}
+2009-08-19 Jérémie Laval <jeremie.laval@gmail.com>
+
+ * BlockingCollection.cs: Rewrite to use a transaction id
+ based approach. Ported to 4.0 API. Fix GetConsumingEnumerable.
+
2009-08-11 Jérémie Laval <jeremie.laval@gmail.com>
* BlockingCollection.cs: New addition.
int participants;
CountdownEvent cntd;
+ AtomicBoolean cleaned = new AtomicBoolean ();
int phase;
-
+
public Barrier (int participants) : this (participants, null)
{
}
void InitCountdownEvent ()
{
+ cleaned = new AtomicBoolean ();
cntd = new CountdownEvent (participants);
}
public void SignalAndWait ()
{
- SignalAndWait (() => { cntd.Wait (); return true; });
+ SignalAndWait ((c) => { c.Wait (); return true; });
}
public bool SignalAndWait (int millisecondTimeout)
{
- return SignalAndWait (() => cntd.Wait (millisecondTimeout));
+ return SignalAndWait ((c) => c.Wait (millisecondTimeout));
}
public bool SignalAndWait (TimeSpan ts)
{
- return SignalAndWait (() => cntd.Wait (ts));
+ return SignalAndWait ((c) => c.Wait (ts));
}
public bool SignalAndWait (int millisecondTimeout, CancellationToken token)
{
- return SignalAndWait (() => cntd.Wait (millisecondTimeout, token));
+ return SignalAndWait ((c) => c.Wait (millisecondTimeout, token));
}
public bool SignalAndWait (TimeSpan ts, CancellationToken token)
{
- return SignalAndWait (() => cntd.Wait (ts, token));
+ return SignalAndWait ((c) => c.Wait (ts, token));
}
- bool SignalAndWait (Func<bool> associate)
+ bool SignalAndWait (Func<CountdownEvent, bool> associate)
{
bool result;
+ AtomicBoolean cl = cleaned;
+ CountdownEvent temp = cntd;
- if (!cntd.Signal ()) {
- result = associate ();
+ if (!temp.Signal ()) {
+ result = Wait (associate, temp, cl);
} else {
result = true;
- postPhaseAction (this);
+ PostPhaseAction (cl);
phase++;
}
return result;
}
+ bool Wait (Func<CountdownEvent, bool> associate, CountdownEvent temp, AtomicBoolean cl)
+ {
+ if (!associate (temp))
+ return false;
+
+ SpinWait sw = new SpinWait ();
+ while (!cl.Value) {
+ //Console.WriteLine (cleaned);
+ sw.SpinOnce ();
+ }
+
+ return true;
+ }
+
+ void PostPhaseAction (AtomicBoolean cl)
+ {
+ postPhaseAction (this);
+
+ InitCountdownEvent ();
+
+ cl.Value = true;
+ }
+
public int CurrentPhaseNumber {
get {
return phase;
+2009-08-19 Jérémie Laval <jeremie.laval@gmail.com>
+
+ * Barrier.cs: Fix Barrier to be really thread-safe.
+ Remove deadlocking.
+
2009-08-11 Jérémie Laval <jeremie.laval@gmail.com>
* Barrier.cs: added.
}
[TestAttribute]
- public void RemoveTestCase()
+ public void TakeTestCase()
{
defaultCollection.Add(1);
defaultCollection.Add(2);
boundedCollection.Add(1);
boundedCollection.Add(2);
- int value = defaultCollection.Remove();
+ int value = defaultCollection.Take();
Assert.AreEqual(1, value, "#1");
- value = boundedCollection.Remove();
+ value = boundedCollection.Take();
Assert.AreEqual(1, value, "#2");
}
defaultCollection.CompleteAdding();
Assert.IsFalse(defaultCollection.IsCompleted, "#3");
- defaultCollection.Remove();
- defaultCollection.Remove();
+ defaultCollection.Take();
+ defaultCollection.Take();
Assert.IsTrue(defaultCollection.IsAddingCompleted, "#1");
Assert.AreEqual(0, defaultCollection.Count, "#2");
defaultCollection.Add(4);
defaultCollection.Add(5);
defaultCollection.Add(6);
+ defaultCollection.CompleteAdding ();
IEnumerable<int> enumerable = defaultCollection.GetConsumingEnumerable();
Assert.IsNotNull(enumerable, "#1");
--- /dev/null
+2009-08-19 Jérémie Laval <jeremie.laval@gmail.com>
+
+ * BlockingCollectionTests.cs: track API changes
+
+2009-08-19 Jérémie Laval <jeremie.laval@gmail.com>
+
+ * Partitioner.cs: Fix infinite recursion when calling Create
+ with a IList.
+ * Partitionners/ListPartitioner.cs: Fix bad splitting for the
+ last partition.
+
2009-08-11 Jérémie Laval <jeremie.laval@gmail.com>
* ConcurrentQueue.cs:
{
IList<TSource> tempIList = source as IList<TSource>;
if (tempIList != null)
- return Create (tempIList);
+ return Create (tempIList, true);
return new EnumerablePartitioner<TSource> (source);
}
public static OrderablePartitioner<TSource> Create<TSource> (TSource[] source, bool loadBalance)
{
- return Create ((IList<TSource>)source);
+ return Create ((IList<TSource>)source, loadBalance);
}
public static OrderablePartitioner<TSource> Create<TSource> (IList<TSource> source, bool loadBalance)
if (i != enumerators.Length - 1)
enumerators[i] = GetEnumeratorForRange (i * count, i * count + count);
else
- enumerators[i] = GetEnumeratorForRange (i * count, source.Count - i * count);
+ enumerators[i] = GetEnumeratorForRange (i * count, source.Count);
}
return enumerators;
}
- IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRange (int startIndex, int count)
+ IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRange (int startIndex, int lastIndex)
{
if (startIndex >= source.Count)
return GetEmpty ();
- return GetEnumeratorForRangeInternal (startIndex, count);
+ return GetEnumeratorForRangeInternal (startIndex, lastIndex);
}
IEnumerator<KeyValuePair<long, T>> GetEmpty ()
yield break;
}
- IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRangeInternal (int startIndex, int count)
+ IEnumerator<KeyValuePair<long, T>> GetEnumeratorForRangeInternal (int startIndex, int lastIndex)
{
- for (int i = startIndex; i < count; i++) {
+ for (int i = startIndex; i < lastIndex; i++) {
yield return new KeyValuePair<long, T> (i, source[i]);
}
}
+2009-08-19 Jérémie Laval <jeremie.laval@gmail.com>
+
+ * Task.cs: Refactor Wait methods.
+
2009-08-11 Jérémie Laval <jeremie.laval@gmail.com>
* Future.cs: Add static to Factory property
}
internal void ContinueWithCore (Task continuation, TaskContinuationOptions kind,
- TaskScheduler scheduler, Func<bool> predicate)
+ TaskScheduler scheduler, Func<bool> predicate)
{
// Already set the scheduler so that user can call Wait and that sort of stuff
continuation.taskScheduler = scheduler;
if (exception != null && !(exception is TaskCanceledException))
throw exception;
}
-
- [MonoTODO ("Refactor")]
+
public void Wait (CancellationToken token)
{
- if (scheduler == null)
- throw new InvalidOperationException ("The Task hasn't been Started and thus can't be waited on");
-
- Watch sw = Watch.StartNew ();
- scheduler.ParticipateUntil (this, delegate {
- return token.IsCancellationRequested;
- });
- sw.Stop ();
-
- if (exception != null && !(exception is TaskCanceledException))
- throw exception;
-
+ Wait (null, token);
}
public bool Wait (TimeSpan ts)
public bool Wait (int millisecondsTimeout)
{
- if (scheduler == null)
- throw new InvalidOperationException ("The Task hasn't been Started and thus can't be waited on");
-
Watch sw = Watch.StartNew ();
- bool result = scheduler.ParticipateUntil (this, delegate {
- return sw.ElapsedMilliseconds >= millisecondsTimeout;
- });
- sw.Stop ();
-
- if (exception != null && !(exception is TaskCanceledException))
- throw exception;
-
- return !result;
+ return Wait (() => sw.ElapsedMilliseconds >= millisecondsTimeout, null);
}
- [MonoTODO ("Refactor")]
public bool Wait (int millisecondsTimeout, CancellationToken token)
+ {
+ Watch sw = Watch.StartNew ();
+ return Wait (() => sw.ElapsedMilliseconds >= millisecondsTimeout, token);
+ }
+
+ bool Wait (Func<bool> stopFunc, CancellationToken? token)
{
if (scheduler == null)
throw new InvalidOperationException ("The Task hasn't been Started and thus can't be waited on");
- Watch sw = Watch.StartNew ();
bool result = scheduler.ParticipateUntil (this, delegate {
- return sw.ElapsedMilliseconds >= millisecondsTimeout || token.IsCancellationRequested;
+ if (token.HasValue && token.Value.IsCancellationRequested)
+ throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
+
+
+ return (stopFunc != null) ? stopFunc () : false;
});
- sw.Stop ();
-
+
if (exception != null && !(exception is TaskCanceledException))
throw exception;
}
public static int WaitAny (params Task[] tasks)
+ {
+ return WaitAny (tasks, null, null);
+ }
+
+ static int WaitAny (Task[] tasks, Func<bool> stopFunc, CancellationToken? token)
{
if (tasks == null)
throw new ArgumentNullException ("tasks");
// All tasks are supposed to use the same TaskManager
tasks[0].scheduler.ParticipateUntil (delegate {
+ if (stopFunc != null && stopFunc ())
+ return true;
+
+ if (token.HasValue && token.Value.IsCancellationRequested)
+ throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
+
return numFinished >= 1;
});
{
if (millisecondsTimeout < -1)
throw new ArgumentOutOfRangeException ("millisecondsTimeout");
- if (tasks == null)
- throw new ArgumentNullException ("tasks");
if (millisecondsTimeout == -1)
return WaitAny (tasks);
- int numFinished = 0;
- int indexFirstFinished = -1;
-
- foreach (Task t in tasks) {
- t.completed += delegate (object sender, EventArgs e) {
- int result = Interlocked.Increment (ref numFinished);
- if (result == 1) {
- Task target = (Task)sender;
- indexFirstFinished = Array.FindIndex (tasks, (elem) => elem == target);
- }
- };
- }
-
Watch sw = Watch.StartNew ();
- tasks[0].scheduler.ParticipateUntil (delegate {
- if (sw.ElapsedMilliseconds > millisecondsTimeout)
- return true;
- return numFinished >= 1;
- });
- sw.Stop ();
-
- return indexFirstFinished;
+ return WaitAny (tasks, () => sw.ElapsedMilliseconds > millisecondsTimeout, null);
}
-
- [MonoTODO ("Refactor")]
+
public static int WaitAny (Task[] tasks, int millisecondsTimeout, CancellationToken token)
- {
+ {
if (millisecondsTimeout < -1)
throw new ArgumentOutOfRangeException ("millisecondsTimeout");
- if (tasks == null)
- throw new ArgumentNullException ("tasks");
if (millisecondsTimeout == -1)
return WaitAny (tasks);
- int numFinished = 0;
- int indexFirstFinished = -1;
-
- foreach (Task t in tasks) {
- t.completed += delegate (object sender, EventArgs e) {
- int result = Interlocked.Increment (ref numFinished);
- if (result == 1) {
- Task target = (Task)sender;
- indexFirstFinished = Array.FindIndex (tasks, (elem) => elem == target);
- }
- };
- }
-
Watch sw = Watch.StartNew ();
- tasks[0].scheduler.ParticipateUntil (delegate {
- if (sw.ElapsedMilliseconds > millisecondsTimeout || token.IsCancellationRequested)
- return true;
- return numFinished >= 1;
- });
- sw.Stop ();
-
- return indexFirstFinished;
+ return WaitAny (tasks, () => sw.ElapsedMilliseconds > millisecondsTimeout, token);
}
- [MonoTODO ("Refactor")]
public static int WaitAny (Task[] tasks, CancellationToken token)
- {
- if (tasks == null)
- throw new ArgumentNullException ("tasks");
- if (tasks.Length == 0)
- throw new ArgumentException ("tasks is empty", "tasks");
-
- int numFinished = 0;
- int indexFirstFinished = -1;
- int index = 0;
-
- foreach (Task t in tasks) {
- if (t.IsCompleted) {
- return index;
- }
- t.completed += delegate (object sender, EventArgs e) {
- int result = Interlocked.Increment (ref numFinished);
- // Check if we are the first to have finished
- if (result == 1) {
- Task target = (Task)sender;
- indexFirstFinished = Array.FindIndex (tasks, (elem) => elem == target);
- }
- };
- index++;
- }
-
- // All tasks are supposed to use the same TaskManager
- tasks[0].scheduler.ParticipateUntil (delegate {
- if (token.IsCancellationRequested)
- return true;
-
- return numFinished >= 1;
- });
-
- return indexFirstFinished;
+ {
+ return WaitAny (tasks, null, token);
}
#endregion
namespace System.Threading
{
- internal struct AtomicBoolean
+ internal class AtomicBoolean
{
int flag;
const int UnSet = 0;
return temp;
}
+ public bool TrySet ()
+ {
+ return !Exchange (true);
+ }
+
public bool Exchange (bool newVal)
{
int newTemp = newVal ? Set : UnSet;
+2009-08-19 Jérémie Laval <jeremie.laval@gmail.com>
+
+ * ParallelLoopState.cs: Take in account that
+ AtomicBoolean is a class.
+
+2009-08-19 Jérémie Laval <jeremie.laval@gmail.com>
+
+ * AtomicBoolean.cs: Turn it into a class
+ * CountdownEvent.cs: Work on cached variable. Make sure
+ count doesn't go under 0.
+
2009-08-11 Jérémie Laval <jeremie.laval@gmail.com>
* Watch.cs:
public bool Signal (int num)
{
- if (num < 0)
+ if (num <= 0)
throw new ArgumentOutOfRangeException ("num");
Action<int> check = delegate (int value) {
if (!ApplyOperation (-num, check, out newValue))
throw new InvalidOperationException ("The event is already set");
- if (newValue <= 0) {
+ if (newValue == 0) {
evt.Set ();
return true;
}
if (num < 0)
throw new ArgumentOutOfRangeException ("num");
- Action<int> check = delegate (int value) { };
-
- return ApplyOperation (num, check);
+ return ApplyOperation (num, null);
}
bool ApplyOperation (int num, Action<int> doCheck)
newValue = 0;
do {
- if (IsSet)
+ oldCount = count;
+ if (oldCount == 0)
return false;
- oldCount = count;
newValue = oldCount + num;
- doCheck (newValue);
-
+ if (doCheck != null)
+ doCheck (newValue);
} while (Interlocked.CompareExchange (ref count, newValue, oldCount) != oldCount);
return true;
public bool IsSet {
get {
- return count <= 0;
+ return count == 0;
}
}
{
internal class ExternalInfos
{
- public AtomicBoolean IsStopped;
- public AtomicBoolean IsBroken;
+ public AtomicBoolean IsStopped = new AtomicBoolean ();
+ public AtomicBoolean IsBroken = new AtomicBoolean ();
public volatile bool IsExceptional;
public long? LowestBreakIteration;
}