.NET Forum / .NET Framework / New Users / August 2006
How to improve performance of Queue accessing between 2 threads?
|
|
Thread rating:  |
Thana N. - 24 Aug 2006 08:41 GMT Dear Guys,
I write program that sharing Queue between 2 threads which 1 thread add data to Queue. And another thread get data from Queue to process. My situation is if there are alot of data to add (like loop to add). The 2nd thread which try to get data from Queue cannot access or rarely to access that Queue which make the program has low performance.
How can I do to improve this situation? I want 2nd thread can access as fast as possible once Queue is not empty.
Here is some snippet code:-
Thread#1
For i as Integer = 1 To 1000 SyncLock _queue.SyncRoot _queue.Enqueue(i) _newItemEvent.Set() End SyncRoot Next
Thread#2
Dim item As Integer While _queue.Count <= 0 Thread.SpinWait(0) End While
While _queue.Count > 0 SyncLock _queue.SyncRoot item = DirectCast(_queue.Dequeue(), Integer)
.... End SyncLock End While
Thanks, Thana N.
Greg Young - 28 Aug 2006 19:42 GMT Don't use locking ....
There is a great implementation of a lock free queue here http://www.boyet.com/Articles/LockfreeQueue.html. He also has a library you can download which includes the implementation.
Cheers,
Greg Young MVP - C# http://codebetter.com/blogs/gregyoung
> Dear Guys, > [quoted text clipped - 39 lines] > Thanks, > Thana N. Chris Mullins - 28 Aug 2006 20:16 GMT I had access to a 16-process or Itanium Box, a 4 Processor Itanium box, a 2 Processor AMD machine, and my single processor laptop.
I took some time to run some locking / no-locking tests and came up with surprising results. I keep meaning to write this up as a blog entry, but unfortuantly I didn't to a thorough enough job of data capture.
On the 1 and 2 processor box, locking was faster. On the 4 processor box, Lock-Free was a tiny bit faster. On the 16 processor box it wasn't even close - lock free won by more than an an order of magnitude.
What this ended up telling me is that I need a "smart queue" and "smart list" that can look at the number of processors and make the decision which algorithm to use.
-- Chris Mullins
> Don't use locking .... > [quoted text clipped - 51 lines] >> Thanks, >> Thana N. Greg Young - 31 Aug 2006 19:36 GMT I depends on usage as well .. with only a single reader/writer lock free will fly.
> I had access to a 16-process or Itanium Box, a 4 Processor Itanium box, a > 2 Processor AMD machine, and my single processor laptop. [quoted text clipped - 69 lines] >>> Thanks, >>> Thana N. Thana N. - 29 Aug 2006 04:25 GMT Thanks for your reply, but I've forgot to tell you all that I still using .NET 1.1 not 2.0. Any suggestion for 1.1.
Thanks again,
> Don't use locking .... > [quoted text clipped - 51 lines] > > Thanks, > > Thana N. Greg Young - 31 Aug 2006 19:35 GMT So port the code to not use generics .. shouldn't take much more than 1/2 an hour to only do the queue :) (just use object instead of T)
Cheers,
Greg
> Thanks for your reply, but I've forgot to tell you all that I still using > .NET 1.1 not 2.0. Any suggestion for 1.1. [quoted text clipped - 58 lines] >> > Thanks, >> > Thana N. Brian Gideon - 31 Aug 2006 19:54 GMT > Don't use locking .... > [quoted text clipped - 7 lines] > MVP - C# > http://codebetter.com/blogs/gregyoung Unfortunately, that implementation is not a blocking queue. It is an interesting article though.
Greg Young - 28 Aug 2006 19:45 GMT Also just for your example ...
> For i as Integer = 1 To 1000 > SyncLock _queue.SyncRoot > _queue.Enqueue(i) > _newItemEvent.Set() > End SyncRoot > Next The other thread cannot access the data because you hold the lock on the data for the duration of all of the inserts.
Cheers,
Greg
> Dear Guys, > [quoted text clipped - 39 lines] > Thanks, > Thana N. Thana N. - 29 Aug 2006 05:40 GMT I understand your point Greg, that code just my simulation code. My real situation is it is a TCP Server that receives multiple messages at the same time and these messages will be added to queue for another thread to get these messages from the queue to send out to another server on 1-connection and it cannot utilize my netwidth bandwidth, it just use 20% of it. I think queue accessing will be the one of my delay. Any suggestion for my situation.
And again, my code is in .NET 1.1.
Thanks again for your reply. Thana N.
> Also just for your example ... > [quoted text clipped - 55 lines] > > Thanks, > > Thana N. Markus Stoeger - 28 Aug 2006 20:20 GMT > How can I do to improve this situation? I want 2nd thread can access as > fast as possible once Queue is not empty. Two things... #1 I assume that your _newItemEvent is either an AutoReset- or ManualResetEvent? Thats slow. Try to replace that with the Monitor.Pulse/Monitor.Wait methods. #2 Do you really have to use Thread.SpinWait? I think Monitor.Wait would be better here.
hth, Max
Brian Gideon - 28 Aug 2006 21:20 GMT Thana,
It looks like what you want is a blocking queue. It's easier if you wrap that logic in separate class. The following implementation is a port from the this article http://www.yoda.arachsys.com/csharp/threads/deadlocks.shtml.
Public Class BlockingQueue
ReadOnly lockObject As Object = New Object Private queue As Queue = New Queue
Public Sub Enqueue(ByVal o As Object) SyncLock lockObject queue.Enqueue(o) Monitor.Pulse(lockObject) End SyncLock End Sub
Public Function Dequeue() As Object SyncLock lockObject While queue.Count = 0 Monitor.Wait(lockObject) End While Return queue.Dequeue End SyncLock End Function
End Class
Brian
> Dear Guys, > [quoted text clipped - 35 lines] > Thanks, > Thana N. William Stacey [MVP] - 29 Aug 2006 00:57 GMT Couple issues here. First, don't use spin locks that way - big waste of cpu. Moreover, the sync wrapper takes a lock for all methods and properties. So the 2nd thread is taking an releasing the lock 4 times for a single dequeue operation. As Brian said, use a blocking queue for this kind of producer/consumer pattern. Here is my 1 lock blocking queue below.
private void button5_Click(object sender, EventArgs e) { BlockingQueue<int> bq = new BlockingQueue<int>();
new Thread(delegate() { int i; while( bq.TryDequeue(200, out i) ) Console.WriteLine(i); Console.WriteLine("Consumer thread completed."); }).Start();
for (int i = 0; i < 100; i++) { bq.Enqueue(i); }
}
/// <summary> /// Represents a first-in, first-out collection of objects. /// </summary> /// <typeparam name="T">Type of element queue will contain.</typeparam> public class BlockingQueue<T> : IEnumerable<T>, ICollection { private bool isOpened = true; private readonly Queue<T> q; private readonly object syncRoot = new object();
/// <summary> /// Initializes a new instance of the BlockingQueue class. /// </summary> public BlockingQueue() { q = new Queue<T>(); }
/// <summary> /// Initializes a new instance of the BlockingQueue class. /// </summary> /// <param name="capacity">The initial number of elements the queue can contain.</param> public BlockingQueue(int capacity) { q = new Queue<T>(capacity); }
/// <summary> /// Initializes a new instance of the BlockingQueue class. /// </summary> /// <param name="collection">A collection whose elements are copied to the new queue.</param> public BlockingQueue(IEnumerable<T> collection) { q = new Queue<T>(collection); }
/// <summary> /// Gets the number of elements in the queue. /// </summary> public int Count { get { lock ( syncRoot ) { return q.Count; } } }
/// <summary> /// Remove all objects from the BlockingQueue<T>. /// </summary> public void Clear() { lock ( syncRoot ) { q.Clear(); } }
/// <summary> /// Closes the queue. /// </summary> public void Close() { lock ( syncRoot ) { if ( ! this.isOpened ) return; // Already closed.
isOpened = false; q.Clear(); Monitor.PulseAll(syncRoot); // resume any waiting threads so they see the queue is closed. } }
/// <summary> /// Gets a value indicating if queue is opened. /// </summary> public bool Opened { get { lock ( syncRoot ) { return this.isOpened; } } }
/// <summary> /// Determines whether an element is in the System.Collections.Generic.Queue<T>. /// </summary> /// <param name="item">The object to locate in the System.Collections.Generic.Queue<T>. The value can be null for reference types.</param> /// <returns>true if item is found in the System.Collections.Generic.Queue<T>; otherwise, false.</returns> public bool Contains(T item) { lock ( syncRoot ) { return q.Contains(item); } }
/// <summary> /// Copies the System.Collections.Generic.Queue<T> elements to an existing one-dimensional System.Array, starting at the specified array index. /// </summary> /// <param name="array">The one-dimensional System.Array that is the destination of the elements /// copied from System.Collections.Generic.Queue<T>. The System.Array must have zero-based indexing. /// </param> /// <param name="arrayIndex">The zero-based index in array at which copying begins.</param> public void CopyTo(T[] array, int arrayIndex) { lock ( syncRoot ) { q.CopyTo(array, arrayIndex); } }
public T[] ToArray() { lock ( syncRoot ) { return q.ToArray(); } }
public IEnumerator<T> GetEnumerator() { return new BlockingQueue<T>.Enumerator(this, -1); } public IEnumerator<T> GetEnumerator(int millisecondsTimeout) { return new BlockingQueue<T>.Enumerator(this, millisecondsTimeout); } IEnumerator IEnumerable.GetEnumerator() { return new BlockingQueue<T>.Enumerator(this, -1); }
/// <summary> /// Sets the capacity to the actual number of elements in the System.Collections.Generic.Queue<T>, /// if that number is less than 90 percent of current capacity. /// </summary> public void TrimExcess() { lock ( syncRoot ) { q.TrimExcess(); } }
/// <summary> /// Removes and returns the object at the beginning of the Queue. /// </summary> /// <returns>Object in queue.</returns> public T Dequeue() { return Dequeue(Timeout.Infinite); }
/// <summary> /// Removes and returns the object at the beginning of the Queue. /// </summary> /// <param name="timeout">Time to wait before returning (in milliseconds).</param> /// <returns>Object in queue.</returns> public T Dequeue(int millisecondsTimeout) { lock ( syncRoot ) { while ( isOpened && (q.Count == 0) ) { if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) ) throw new TimeoutException("Operation timeout"); }
if ( ! isOpened ) throw new InvalidOperationException("Queue closed"); return q.Dequeue(); } }
public bool TryDequeue(int millisecondsTimeout, out T value) { lock (syncRoot) { while (isOpened && (q.Count == 0)) { if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) ) { value = default(T); return false; } }
if (! isOpened) throw new InvalidOperationException("Queue closed"); value = q.Dequeue(); return true; } }
/// <summary> /// Returns the object at the beginning of the BlockingQueue<T> /// without removing it. /// </summary> /// <returns>The object at the beginning of the BlockingQueue<T>.</returns> public T Peek() { return Peek(-1); }
/// <summary> /// Returns the object at the beginning of the BlockingQueue<T> /// without removing it. /// </summary> /// <returns>The object at the beginning of the BlockingQueue<T>.</returns> /// <param name="millisecondsTimeout">Time to wait before returning (in milliseconds).</param> public T Peek(int millisecondsTimeout) { lock ( syncRoot ) { while ( isOpened && (q.Count == 0) ) { if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) ) throw new TimeoutException("Operation timeout"); }
if ( ! isOpened ) throw new InvalidOperationException("Queue closed"); return q.Peek(); } }
/// <summary> /// Adds an object to the end of the Queue. /// </summary> /// <param name="obj">Object to put in queue.</param> public void Enqueue(T item) { lock ( syncRoot ) { if ( ! isOpened ) throw new InvalidOperationException("Queue closed"); q.Enqueue(item); Monitor.Pulse(syncRoot); // Move 1 waiting thread to the "ready" queue in this monitor object. } // Exiting lock will free thread(s) in the "ready" queue for this monitor object. }
[Serializable, StructLayout(LayoutKind.Sequential)] public struct Enumerator : IEnumerator<T>, IDisposable, IEnumerator { private BlockingQueue<T> q; private IEnumerator<T> e;
internal Enumerator(BlockingQueue<T> q, int timeout) { this.q = q; if (!Monitor.TryEnter(this.q.SyncRoot, timeout)) throw new TimeoutException("Timeout waiting for enumerator lock on BlockingQueue<T>."); this.e = this.q.q.GetEnumerator(); // Get the contained Queue<T> enumerator. }
public void Dispose() { this.e.Dispose(); Monitor.Exit(q.SyncRoot); }
public bool MoveNext() { return e.MoveNext(); }
public T Current { get { return e.Current; } }
object IEnumerator.Current { get { return ((IEnumerator)e).Current; } }
void IEnumerator.Reset() { e.Reset(); } }
#region ICollection Members
/// <summary> /// Copies the BlockingQueue<T> elements to an existing one-dimensional System.Array, starting at the specified array index. /// </summary> /// <param name="array"></param> /// <param name="index"></param> public void CopyTo(Array array, int index) { lock (syncRoot) { ((ICollection)q).CopyTo(array, index); } }
/// <summary> /// Get a value that indicates if the queue is synchronized. /// </summary> public bool IsSynchronized { get { return true; } }
public object SyncRoot { get { return this.syncRoot; } }
#endregion }
 Signature William Stacey [MVP]
| Dear Guys, | [quoted text clipped - 35 lines] | Thanks, | Thana N. Thana N. - 29 Aug 2006 04:26 GMT Thanks for your reply, but I've forgot to tell you all that I still using .NET 1.1 not 2.0. Any suggestion for 1.1.
Rgrds, Thana N.
> Couple issues here. First, don't use spin locks that way - big waste of > cpu. Moreover, the sync wrapper takes a lock for all methods and [quoted text clipped - 295 lines] > private BlockingQueue<T> q; > private IEnumerator<T> e; Brian Gideon - 29 Aug 2006 14:49 GMT Thana,
The implementation I posted, although not as complete as William's, is compatible with 1.1 and should work fine.
Brian
> Thanks for your reply, but I've forgot to tell you all that I still using > .NET 1.1 not 2.0. Any suggestion for 1.1. > > Rgrds, > Thana N. William Stacey [MVP] - 29 Aug 2006 16:18 GMT Here is a non-generic version. hth
using System; using System.Collections; using System.Threading;
namespace WJS.Threading { /// <summary> /// A blocking queue derived from Queue. The dequeue method will block on an empty queue. /// Enqueue operations will not block as queue is only bounded by array size or MaxSize parameter. /// <remarks>This class is thread safe for multiple consumer and producer threads.</remarks> /// </summary> public class BlockingQueue : Queue { private bool opened = true; private int maxSize = int.MaxValue; private readonly object syncRoot = new object();
/// <summary> /// Create new BlockingQueue. /// </summary> /// <param name="col">The System.Collections.ICollection to copy elements from.</param> public BlockingQueue(ICollection col) : base(col) { }
/// <summary> /// Create new BlockingQueue. /// </summary> /// <param name="capacity">The initial number of elements that the queue can contain.</param> /// <param name="growFactor">The factor by which the capacity of the queue is expanded.</param> public BlockingQueue(int capacity, float growFactor) : base(capacity, growFactor) { }
/// <summary> /// Create new BlockingQueue. /// </summary> /// <param name="capacity">The initial number of elements that the queue can contain.</param> public BlockingQueue(int capacity) : base(capacity) { }
/// <summary> /// Create new BlockingQueue. /// </summary> public BlockingQueue() : base() { }
/// <summary> /// BlockingQueue Destructor (Close queue, resume any waiting thread). /// </summary> ~BlockingQueue() { Close(); }
/// <summary> /// Gets or sets the maximum size of the queue. After MaxSize is reached, an additional Enqueue operation will /// throw an exception. /// </summary> public int MaxSize { get { lock ( syncRoot ) { return this.maxSize; } } set { lock ( syncRoot ) { if ( value < 0 ) throw new ArgumentOutOfRangeException("MaxSize must be >= 0."); } } }
/// <summary> /// Remove all objects from the Queue. /// </summary> public override void Clear() { lock ( syncRoot ) { base.Clear(); } }
/// <summary> /// Remove all objects from the Queue, resume all dequeue threads. /// </summary> public void Close() { lock ( syncRoot ) { if ( ! Opened ) return; // Already closed.
opened = false; base.Clear(); Monitor.PulseAll(syncRoot); // resume any waiting threads } }
/// <summary> /// Gets flag indicating if queue has been closed. /// </summary> public bool Opened { get { lock ( syncRoot ) { return this.opened; } } }
/// <summary> /// Removes and returns the object at the beginning of the Queue. /// </summary> /// <returns>Object in queue.</returns> public override object Dequeue() { return Dequeue(Timeout.Infinite); }
/// <summary> /// Removes and returns the object at the beginning of the Queue. /// </summary> /// <param name="timeout">time to wait before returning</param> /// <returns>Object in queue.</returns> public object Dequeue(TimeSpan timeout) { return Dequeue(timeout.Milliseconds); }
/// <summary> /// Removes and returns the object at the beginning of the Queue. /// </summary> /// <param name="timeout">Time to wait before returning (in milliseconds)</param> /// <returns>Object in queue.</returns> public object Dequeue(int timeout) { lock ( syncRoot ) { while ( Opened && (base.Count == 0) ) { if ( ! Monitor.Wait(syncRoot, timeout) ) throw new InvalidOperationException("Timeout"); }
if ( Opened ) return base.Dequeue(); else throw new InvalidOperationException("Queue closed"); } }
/// <summary> /// Adds an object to the end of the Queue. /// </summary> /// <param name="obj">Object to put in queue</param> public override void Enqueue(object obj) { lock ( syncRoot ) { if ( Opened ) throw new InvalidOperationException("Queue closed"); if ( base.Count >= this.maxSize ) throw new InvalidOperationException("Queue full. MaxSize reached."); base.Enqueue(obj); Monitor.Pulse(syncRoot); // Move 1 waiting thread to ready queue in this monitor object. } // Exiting lock will start all threads in the ready queue for this monitor object. } } }
 Signature William Stacey [MVP]
| Thanks for your reply, but I've forgot to tell you all that I still using | .NET 1.1 not 2.0. Any suggestion for 1.1. [quoted text clipped - 301 lines] | > private BlockingQueue<T> q; | > private IEnumerator<T> e; Thana N. - 31 Aug 2006 13:51 GMT Thank you for your reply, Will. Thana
> Here is a non-generic version. hth > [quoted text clipped - 288 lines] > | > { > | > lock ( syncRoot )
Free MagazinesGet these publications absolutely FREE for up to 12 months. There are no hidden fees and no obligation. Simply choose a title, complete the application form and submit it. Read more ...
|
|
|