Home | Contact Us | FAQ | Search & Site Map | Link to Us
Sign In | Join | Other 45 Sites in Network
HomeAnnouncementsFree MagazinesWhite PapersSubmit Content
Discussion GroupsASP.NETWindows FormsLanguages.NET FrameworkVisual Studio.NET
Articles.NET FrameworkASP.NETToolsWindows Forms
.NET DirectoryOpen Source ProjectsUser GroupsWeb Resources
Related Topics
Visual Basic 6SQL ServerMS AccessOther DB ProductsMS Server ProductsMore Topics ...

.NET Forum / .NET Framework / New Users / August 2006

Tip: Looking for answers? Try searching our database.

How to improve performance of Queue accessing between 2 threads?

Thread view: 
Enable EMail Alerts  Start New Thread
Thread rating: 
Thana N. - 24 Aug 2006 08:41 GMT
Dear Guys,

I write program that sharing Queue between 2 threads which 1 thread add data
to Queue.  And another thread get data from Queue to process.  My situation
is if there are alot of data to add (like loop to add).  The 2nd thread which
try to get data from Queue cannot access or rarely to access that Queue which
make the program has low performance.

How can I do to improve this situation?  I want 2nd thread can access as
fast as possible once Queue is not empty.

Here is some snippet code:-

Thread#1

For i as Integer = 1 To 1000
  SyncLock _queue.SyncRoot
     _queue.Enqueue(i)
     _newItemEvent.Set()
  End SyncRoot
Next

Thread#2

Dim item As Integer
While _queue.Count <= 0
    Thread.SpinWait(0)
End While

While _queue.Count > 0
  SyncLock _queue.SyncRoot
     item = DirectCast(_queue.Dequeue(), Integer)

     ....
  End SyncLock
End While

Thanks,
Thana N.
Greg Young - 28 Aug 2006 19:42 GMT
Don't use locking ....

There is a great implementation of a lock free queue here
http://www.boyet.com/Articles/LockfreeQueue.html. He also has a library you
can download which includes the implementation.

Cheers,

Greg Young
MVP - C#
http://codebetter.com/blogs/gregyoung

> Dear Guys,
>
[quoted text clipped - 39 lines]
> Thanks,
> Thana N.
Chris Mullins - 28 Aug 2006 20:16 GMT
I had access to a 16-process or Itanium Box, a 4 Processor Itanium box, a 2
Processor AMD machine, and my single processor laptop.

I took some time to run some locking / no-locking tests and came up with
surprising results. I keep meaning to write this up as a blog entry, but
unfortuantly I didn't to a thorough enough job of data capture.

On the 1 and 2 processor box, locking was faster. On the 4 processor box,
Lock-Free was a tiny bit faster. On the 16 processor box it wasn't even
close - lock free won by more than an an order of magnitude.

What this ended up telling me is that I need a "smart queue" and "smart
list" that can look at the number of processors and make the decision which
algorithm to use.

--
Chris Mullins

> Don't use locking ....
>
[quoted text clipped - 51 lines]
>> Thanks,
>> Thana N.
Greg Young - 31 Aug 2006 19:36 GMT
I depends on usage as well .. with only a single reader/writer lock free
will fly.

> I had access to a 16-process or Itanium Box, a 4 Processor Itanium box, a
> 2 Processor AMD machine, and my single processor laptop.
[quoted text clipped - 69 lines]
>>> Thanks,
>>> Thana N.
Thana N. - 29 Aug 2006 04:25 GMT
Thanks for your reply, but I've forgot to tell you all that I still using
.NET 1.1 not 2.0.  Any suggestion for 1.1.

Thanks again,

> Don't use locking ....
>
[quoted text clipped - 51 lines]
> > Thanks,
> > Thana N.
Greg Young - 31 Aug 2006 19:35 GMT
So port the code to not use generics .. shouldn't take much more than 1/2 an
hour to only do the queue :) (just use object instead of T)

Cheers,

Greg
> Thanks for your reply, but I've forgot to tell you all that I still using
> .NET 1.1 not 2.0.  Any suggestion for 1.1.
[quoted text clipped - 58 lines]
>> > Thanks,
>> > Thana N.
Brian Gideon - 31 Aug 2006 19:54 GMT
> Don't use locking ....
>
[quoted text clipped - 7 lines]
> MVP - C#
> http://codebetter.com/blogs/gregyoung

Unfortunately, that implementation is not a blocking queue.  It is an
interesting article though.
Greg Young - 28 Aug 2006 19:45 GMT
Also just for your example ...

> For i as Integer = 1 To 1000
>   SyncLock _queue.SyncRoot
>      _queue.Enqueue(i)
>      _newItemEvent.Set()
>   End SyncRoot
> Next

The other thread cannot access the data because you hold the lock on the
data for the duration of all of the inserts.

Cheers,

Greg

> Dear Guys,
>
[quoted text clipped - 39 lines]
> Thanks,
> Thana N.
Thana N. - 29 Aug 2006 05:40 GMT
I understand your point Greg, that code just my simulation code.  My real
situation is it is a TCP Server that receives multiple messages at the same
time and these messages will be added to queue for another thread to get
these messages from the queue to send out to another server on 1-connection
and it cannot utilize my netwidth bandwidth, it just use 20% of it.  I think
queue accessing will be the one of my delay.  Any suggestion for my situation.

And again, my code is in .NET 1.1.

Thanks again for your reply.
Thana N.

> Also just for your example ...
>
[quoted text clipped - 55 lines]
> > Thanks,
> > Thana N.
Markus Stoeger - 28 Aug 2006 20:20 GMT
> How can I do to improve this situation?  I want 2nd thread can access as
> fast as possible once Queue is not empty.

Two things... #1 I assume that your _newItemEvent is either an
AutoReset- or ManualResetEvent? Thats slow. Try to replace that with the
Monitor.Pulse/Monitor.Wait methods.
#2 Do you really have to use Thread.SpinWait? I think Monitor.Wait would
be better here.

hth,
Max
Brian Gideon - 28 Aug 2006 21:20 GMT
Thana,

It looks like what you want is a blocking queue.  It's easier if you
wrap that logic in separate class.  The following implementation is a
port from the this article
http://www.yoda.arachsys.com/csharp/threads/deadlocks.shtml.

Public Class BlockingQueue

 ReadOnly lockObject As Object = New Object
 Private queue As Queue = New Queue

 Public Sub Enqueue(ByVal o As Object)
   SyncLock lockObject
     queue.Enqueue(o)
     Monitor.Pulse(lockObject)
   End SyncLock
 End Sub

 Public Function Dequeue() As Object
    SyncLock lockObject
       While queue.Count = 0
         Monitor.Wait(lockObject)
       End While
       Return queue.Dequeue
    End SyncLock
 End Function

End Class

Brian

> Dear Guys,
>
[quoted text clipped - 35 lines]
> Thanks,
> Thana N.
William Stacey [MVP] - 29 Aug 2006 00:57 GMT
Couple issues here.  First, don't use spin locks that way - big waste of
cpu.  Moreover, the sync wrapper takes a lock for all methods and
properties.  So the 2nd thread is taking an releasing the lock 4 times for a
single dequeue operation.  As Brian said, use a blocking queue for this kind
of producer/consumer pattern.  Here is my 1 lock blocking queue below.

       private void button5_Click(object sender, EventArgs e)
       {
           BlockingQueue<int> bq = new BlockingQueue<int>();

           new Thread(delegate()
           {
               int i;
               while( bq.TryDequeue(200, out i) )
                   Console.WriteLine(i);
               Console.WriteLine("Consumer thread completed.");
           }).Start();

           for (int i = 0; i < 100; i++)
           {
               bq.Enqueue(i);
           }

       }

   /// <summary>
   /// Represents a first-in, first-out collection of objects.
   /// </summary>
   /// <typeparam name="T">Type of element queue will contain.</typeparam>
   public class BlockingQueue<T> : IEnumerable<T>, ICollection
   {
       private bool isOpened = true;
       private readonly Queue<T> q;
       private readonly object syncRoot = new object();

       /// <summary>
       /// Initializes a new instance of the BlockingQueue class.
       /// </summary>
       public BlockingQueue()
       {
           q = new Queue<T>();
       }

       /// <summary>
       /// Initializes a new instance of the BlockingQueue class.
       /// </summary>
       /// <param name="capacity">The initial number of elements the queue
can contain.</param>
       public BlockingQueue(int capacity)
       {
           q = new Queue<T>(capacity);
       }

       /// <summary>
       /// Initializes a new instance of the BlockingQueue class.
       /// </summary>
       /// <param name="collection">A collection whose elements are copied
to the new queue.</param>
       public BlockingQueue(IEnumerable<T> collection)
       {
           q = new Queue<T>(collection);
       }

       /// <summary>
       /// Gets the number of elements in the queue.
       /// </summary>
       public int Count
       {
           get
           {
               lock ( syncRoot )
               {
                   return q.Count;
               }
           }
       }

       /// <summary>
       /// Remove all objects from the BlockingQueue<T>.
       /// </summary>
       public void Clear()
       {
           lock ( syncRoot )
           {
               q.Clear();
           }
       }

       /// <summary>
       /// Closes the queue.
       /// </summary>
       public void Close()
       {
           lock ( syncRoot )
           {
               if ( ! this.isOpened )
                   return; // Already closed.

               isOpened = false;
               q.Clear();
               Monitor.PulseAll(syncRoot);    // resume any waiting threads
so they see the queue is closed.
           }
       }

       /// <summary>
       /// Gets a value indicating if queue is opened.
       /// </summary>
       public bool Opened
       {
           get
           {
               lock ( syncRoot )
               {
                   return this.isOpened;
               }
           }
       }

       /// <summary>
       /// Determines whether an element is in the
System.Collections.Generic.Queue<T>.
       /// </summary>
       /// <param name="item">The object to locate in the
System.Collections.Generic.Queue<T>. The value can be null for reference
types.</param>
       /// <returns>true if item is found in the
System.Collections.Generic.Queue<T>; otherwise, false.</returns>
       public bool Contains(T item)
       {
           lock ( syncRoot )
           {
               return q.Contains(item);
           }
       }

       /// <summary>
       /// Copies the System.Collections.Generic.Queue<T> elements to an
existing one-dimensional System.Array, starting at the specified array
index.
       /// </summary>
       /// <param name="array">The one-dimensional System.Array that is the
destination of the elements
       /// copied from System.Collections.Generic.Queue<T>. The
System.Array must have zero-based indexing.
       /// </param>
       /// <param name="arrayIndex">The zero-based index in array at which
copying begins.</param>
       public void CopyTo(T[] array, int arrayIndex)
       {
           lock ( syncRoot )
           {
               q.CopyTo(array, arrayIndex);
           }
       }

       public T[] ToArray()
       {
           lock ( syncRoot )
           {
               return q.ToArray();
           }
       }

       public IEnumerator<T> GetEnumerator()
       {
           return new BlockingQueue<T>.Enumerator(this, -1);
       }
       public IEnumerator<T> GetEnumerator(int millisecondsTimeout)
       {
           return new BlockingQueue<T>.Enumerator(this,
millisecondsTimeout);
       }
       IEnumerator IEnumerable.GetEnumerator()
       {
           return new BlockingQueue<T>.Enumerator(this, -1);
       }

       /// <summary>
       /// Sets the capacity to the actual number of elements in the
System.Collections.Generic.Queue<T>,
       /// if that number is less than 90 percent of current capacity.
       /// </summary>
       public void TrimExcess()
       {
           lock ( syncRoot )
           {
               q.TrimExcess();
           }
       }

       /// <summary>
       /// Removes and returns the object at the beginning of the Queue.
       /// </summary>
       /// <returns>Object in queue.</returns>
       public T Dequeue()
       {
           return Dequeue(Timeout.Infinite);
       }

       /// <summary>
       /// Removes and returns the object at the beginning of the Queue.
       /// </summary>
       /// <param name="timeout">Time to wait before returning (in
milliseconds).</param>
       /// <returns>Object in queue.</returns>
       public T Dequeue(int millisecondsTimeout)
       {
           lock ( syncRoot )
           {
               while ( isOpened && (q.Count == 0) )
               {
                   if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
                       throw new TimeoutException("Operation timeout");
               }

               if ( ! isOpened )
                   throw new InvalidOperationException("Queue closed");
               return q.Dequeue();
           }
       }

       public bool TryDequeue(int millisecondsTimeout, out T value)
       {
           lock (syncRoot)
           {
               while (isOpened && (q.Count == 0))
               {
                   if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
                   {
                       value = default(T);
                       return false;
                   }
               }

               if (! isOpened)
                   throw new InvalidOperationException("Queue closed");
               value = q.Dequeue();
               return true;
           }
       }

       /// <summary>
       /// Returns the object at the beginning of the BlockingQueue<T>
       /// without removing it.
       /// </summary>
       /// <returns>The object at the beginning of the
BlockingQueue<T>.</returns>
       public T Peek()
       {
           return Peek(-1);
       }

       /// <summary>
       /// Returns the object at the beginning of the BlockingQueue<T>
       /// without removing it.
       /// </summary>
       /// <returns>The object at the beginning of the
BlockingQueue<T>.</returns>
       /// <param name="millisecondsTimeout">Time to wait before returning
(in milliseconds).</param>
       public T Peek(int millisecondsTimeout)
       {
           lock ( syncRoot )
           {
               while ( isOpened && (q.Count == 0) )
               {
                   if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
                       throw new TimeoutException("Operation timeout");
               }

               if ( ! isOpened )
                   throw new InvalidOperationException("Queue closed");
               return q.Peek();
           }
       }

       /// <summary>
       /// Adds an object to the end of the Queue.
       /// </summary>
       /// <param name="obj">Object to put in queue.</param>
       public void Enqueue(T item)
       {
           lock ( syncRoot )
           {
               if ( ! isOpened )
                   throw new InvalidOperationException("Queue closed");
               q.Enqueue(item);
               Monitor.Pulse(syncRoot);   // Move 1 waiting thread to the
"ready" queue in this monitor object.
           } // Exiting lock will free thread(s) in the "ready" queue for
this monitor object.
       }

       [Serializable, StructLayout(LayoutKind.Sequential)]
       public struct Enumerator : IEnumerator<T>, IDisposable, IEnumerator
       {
           private BlockingQueue<T> q;
           private IEnumerator<T> e;

           internal Enumerator(BlockingQueue<T> q, int timeout)
           {
               this.q = q;
               if (!Monitor.TryEnter(this.q.SyncRoot, timeout))
                   throw new TimeoutException("Timeout waiting for
enumerator lock on BlockingQueue<T>.");
               this.e = this.q.q.GetEnumerator();  // Get the contained
Queue<T> enumerator.
           }

           public void Dispose()
           {
               this.e.Dispose();
               Monitor.Exit(q.SyncRoot);
           }

           public bool MoveNext()
           {
               return e.MoveNext();
           }

           public T Current
           {
               get
               {
                   return e.Current;
               }
           }

           object IEnumerator.Current
           {
               get
               {
                   return ((IEnumerator)e).Current;
               }
           }

           void IEnumerator.Reset()
           {
               e.Reset();
           }
       }

       #region ICollection Members

       /// <summary>
       /// Copies the BlockingQueue<T> elements to an existing
one-dimensional System.Array, starting at the specified array index.
       /// </summary>
       /// <param name="array"></param>
       /// <param name="index"></param>
       public void CopyTo(Array array, int index)
       {
           lock (syncRoot)
           {
               ((ICollection)q).CopyTo(array, index);
           }
       }

       /// <summary>
       /// Get a value that indicates if the queue is synchronized.
       /// </summary>
       public bool IsSynchronized
       {
           get
           {
               return true;
           }
       }

       public object SyncRoot
       {
           get
           {
               return this.syncRoot;
           }
       }

       #endregion
   }

Signature

William Stacey [MVP]

| Dear Guys,
|
[quoted text clipped - 35 lines]
| Thanks,
| Thana N.
Thana N. - 29 Aug 2006 04:26 GMT
Thanks for your reply, but I've forgot to tell you all that I still using
.NET 1.1 not 2.0.  Any suggestion for 1.1.

Rgrds,
Thana N.

> Couple issues here.  First, don't use spin locks that way - big waste of
> cpu.  Moreover, the sync wrapper takes a lock for all methods and
[quoted text clipped - 295 lines]
>             private BlockingQueue<T> q;
>             private IEnumerator<T> e;
Brian Gideon - 29 Aug 2006 14:49 GMT
Thana,

The implementation I posted, although not as complete as William's, is
compatible with 1.1 and should work fine.

Brian

> Thanks for your reply, but I've forgot to tell you all that I still using
> .NET 1.1 not 2.0.  Any suggestion for 1.1.
>
> Rgrds,
> Thana N.
William Stacey [MVP] - 29 Aug 2006 16:18 GMT
Here is a non-generic version. hth

using System;
using System.Collections;
using System.Threading;

namespace WJS.Threading
{
   /// <summary>
   /// A blocking queue derived from Queue. The dequeue method will block
on an empty queue.
   /// Enqueue operations will not block as queue is only bounded by array
size or MaxSize parameter.
   /// <remarks>This class is thread safe for multiple consumer and
producer threads.</remarks>
   /// </summary>
   public class BlockingQueue : Queue
   {
       private bool opened = true;
       private int maxSize = int.MaxValue;
       private readonly object syncRoot = new object();

       /// <summary>
       /// Create new BlockingQueue.
       /// </summary>
       /// <param name="col">The System.Collections.ICollection to copy
elements from.</param>
       public BlockingQueue(ICollection col) : base(col)
       {
       }

       /// <summary>
       /// Create new BlockingQueue.
       /// </summary>
       /// <param name="capacity">The initial number of elements that the
queue can contain.</param>
       /// <param name="growFactor">The factor by which the capacity of the
queue is expanded.</param>
       public BlockingQueue(int capacity, float growFactor) :
base(capacity, growFactor)
       {
       }

       /// <summary>
       /// Create new BlockingQueue.
       /// </summary>
       /// <param name="capacity">The initial number of elements that the
queue can contain.</param>
       public BlockingQueue(int capacity) : base(capacity)
       {
       }

       /// <summary>
       /// Create new BlockingQueue.
       /// </summary>
       public BlockingQueue() : base()
       {
       }

       /// <summary>
       /// BlockingQueue Destructor (Close queue, resume any waiting
thread).
       /// </summary>
       ~BlockingQueue()
       {
           Close();
       }

       /// <summary>
       /// Gets or sets the maximum size of the queue. After MaxSize is
reached, an additional Enqueue operation will
       /// throw an exception.
       /// </summary>
       public int MaxSize
       {
           get
           {
               lock ( syncRoot )
               {
                   return this.maxSize;
               }
           }
           set
           {
               lock ( syncRoot )
               {
                   if ( value < 0 )
                       throw new ArgumentOutOfRangeException("MaxSize must
be >= 0.");
               }
           }
       }

       /// <summary>
       /// Remove all objects from the Queue.
       /// </summary>
       public override void Clear()
       {
           lock ( syncRoot )
           {
               base.Clear();
           }
       }

       /// <summary>
       /// Remove all objects from the Queue, resume all dequeue threads.
       /// </summary>
       public void Close()
       {
           lock ( syncRoot )
           {
               if ( ! Opened )
                   return; // Already closed.

               opened = false;
               base.Clear();
               Monitor.PulseAll(syncRoot);    // resume any waiting threads
           }
       }

       /// <summary>
       /// Gets flag indicating if queue has been closed.
       /// </summary>
       public bool Opened
       {
           get
           {
               lock ( syncRoot )
               {
                   return this.opened;
               }
           }
       }

       /// <summary>
       /// Removes and returns the object at the beginning of the Queue.
       /// </summary>
       /// <returns>Object in queue.</returns>
       public override object Dequeue()
       {
           return Dequeue(Timeout.Infinite);
       }

       /// <summary>
       /// Removes and returns the object at the beginning of the Queue.
       /// </summary>
       /// <param name="timeout">time to wait before returning</param>
       /// <returns>Object in queue.</returns>
       public object Dequeue(TimeSpan timeout)
       {
           return Dequeue(timeout.Milliseconds);
       }

       /// <summary>
       /// Removes and returns the object at the beginning of the Queue.
       /// </summary>
       /// <param name="timeout">Time to wait before returning (in
milliseconds)</param>
       /// <returns>Object in queue.</returns>
       public object Dequeue(int timeout)
       {
           lock ( syncRoot )
           {
               while ( Opened && (base.Count == 0) )
               {
                   if ( ! Monitor.Wait(syncRoot, timeout) )
                       throw new InvalidOperationException("Timeout");
               }

               if ( Opened )
                   return base.Dequeue();
               else
                   throw new InvalidOperationException("Queue closed");
           }
       }

       /// <summary>
       /// Adds an object to the end of the Queue.
       /// </summary>
       /// <param name="obj">Object to put in queue</param>
       public override void Enqueue(object obj)
       {
           lock ( syncRoot )
           {
               if ( Opened )
                   throw new InvalidOperationException("Queue closed");
               if ( base.Count >= this.maxSize )
                   throw new InvalidOperationException("Queue full. MaxSize
reached.");
               base.Enqueue(obj);
               Monitor.Pulse(syncRoot);   // Move 1 waiting thread to ready
queue in this monitor object.
           } // Exiting lock will start all threads in the ready queue for
this monitor object.
       }
   }
}

Signature

William Stacey [MVP]

| Thanks for your reply, but I've forgot to tell you all that I still using
| .NET 1.1 not 2.0.  Any suggestion for 1.1.
[quoted text clipped - 301 lines]
| >             private BlockingQueue<T> q;
| >             private IEnumerator<T> e;
Thana N. - 31 Aug 2006 13:51 GMT
Thank you for your reply, Will.
Thana

> Here is a non-generic version. hth
>
[quoted text clipped - 288 lines]
> | >         {
> | >             lock ( syncRoot )

Free Magazines

Get these publications absolutely FREE for up to 12 months. There are no hidden fees and no obligation. Simply choose a title, complete the application form and submit it. Read more ...

Oracle MagazineNetwork ComputingComputer WorldBio-IT WorldeWeekInformation WeekInfosecurity
 
Sign In
Join
My Latest Posts
My Monitored Threads
My Blog
My Photo Gallery
My Profile
My Homepage

Start New Thread
Enable EMail Alerts
Rate this Thread



©2008 Advenet LLC   Privacy Policy - Terms of Use
This website includes both content owned or controlled by Advenet as well as content owned or controlled by third parties.