From 64bdc81e0f4b1da162fa671f3905b5ed546d9681 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Wed, 24 Dec 2008 09:55:54 +0000 Subject: [PATCH] 2008-12-21 Michael Barker * MessageQueueBase.cs: Added optional base class that provide async messaging support. * CompletedEventArgs.cs: Added for events on async methods. * CompletedEventHandler.cs: Added for events on async methods. * IMessageQueue.cs: Added method and event signatures for async messaging. * MessagingProviderLocator.cs: Added constant for InfiniteTimeout. * QueueReference.cs: Remove unecessary logging. svn path=/trunk/mcs/; revision=122084 --- .../Mono.Messaging/Mono.Messaging/ChangeLog | 10 + .../Mono.Messaging/CompletedEventArgs.cs | 50 ++++ .../Mono.Messaging/CompletedEventHandler.cs | 13 + .../Mono.Messaging/IMessageQueue.cs | 32 ++- .../Mono.Messaging/MessageQueueBase.cs | 263 ++++++++++++++++++ .../MessagingProviderLocator.cs | 3 +- .../Mono.Messaging/QueueReference.cs | 1 - 7 files changed, 368 insertions(+), 4 deletions(-) create mode 100644 mcs/class/Mono.Messaging/Mono.Messaging/CompletedEventArgs.cs create mode 100644 mcs/class/Mono.Messaging/Mono.Messaging/CompletedEventHandler.cs create mode 100644 mcs/class/Mono.Messaging/Mono.Messaging/MessageQueueBase.cs diff --git a/mcs/class/Mono.Messaging/Mono.Messaging/ChangeLog b/mcs/class/Mono.Messaging/Mono.Messaging/ChangeLog index 70948f474f9..222a74f0088 100644 --- a/mcs/class/Mono.Messaging/Mono.Messaging/ChangeLog +++ b/mcs/class/Mono.Messaging/Mono.Messaging/ChangeLog @@ -1,3 +1,13 @@ +2008-12-21 Michael Barker + + * MessageQueueBase.cs: Added optional base class that provide async + messaging support. + * CompletedEventArgs.cs: Added for events on async methods. + * CompletedEventHandler.cs: Added for events on async methods. + * IMessageQueue.cs: Added method and event signatures for async messaging. + * MessagingProviderLocator.cs: Added constant for InfiniteTimeout. + * QueueReference.cs: Remove unecessary logging. + 2008-12-07 Michael Barker * MessageUnavailableException.cs: Specific exception for messages not being diff --git a/mcs/class/Mono.Messaging/Mono.Messaging/CompletedEventArgs.cs b/mcs/class/Mono.Messaging/Mono.Messaging/CompletedEventArgs.cs new file mode 100644 index 00000000000..bd8eba4ebc0 --- /dev/null +++ b/mcs/class/Mono.Messaging/Mono.Messaging/CompletedEventArgs.cs @@ -0,0 +1,50 @@ +// +// Mono.Messaging +// +// Authors: +// Michael Barker (mike@middlesoft.co.uk) +// +// (C) 2008 Michael Barker +// + +// +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// + +using System; + +namespace Mono.Messaging +{ + public class CompletedEventArgs : EventArgs + { + private IAsyncResult _result; + + public CompletedEventArgs(IAsyncResult result) + { + _result = result; + } + + public IAsyncResult AsyncResult + { + get { return _result; } + set { _result = value; } + } + } +} diff --git a/mcs/class/Mono.Messaging/Mono.Messaging/CompletedEventHandler.cs b/mcs/class/Mono.Messaging/Mono.Messaging/CompletedEventHandler.cs new file mode 100644 index 00000000000..a6ea59d51e4 --- /dev/null +++ b/mcs/class/Mono.Messaging/Mono.Messaging/CompletedEventHandler.cs @@ -0,0 +1,13 @@ +// CompletedEventHandler.cs created with MonoDevelop +// User: mike at 8:16 p 21/12/2008 +// +// To change standard headers go to Edit->Preferences->Coding->Standard Headers +// + +using System; + +namespace Mono.Messaging +{ + [Serializable] + public delegate void CompletedEventHandler(object sender, CompletedEventArgs e); +} diff --git a/mcs/class/Mono.Messaging/Mono.Messaging/IMessageQueue.cs b/mcs/class/Mono.Messaging/Mono.Messaging/IMessageQueue.cs index 8146c503618..1781369e7c6 100644 --- a/mcs/class/Mono.Messaging/Mono.Messaging/IMessageQueue.cs +++ b/mcs/class/Mono.Messaging/Mono.Messaging/IMessageQueue.cs @@ -34,7 +34,7 @@ using System.ComponentModel; namespace Mono.Messaging { public interface IMessageQueue { - + bool Authenticate { get; set; } @@ -158,14 +158,42 @@ namespace Mono.Messaging { IMessage ReceiveByCorrelationId (string correlationId, TimeSpan timeout); IMessage ReceiveByCorrelationId (string correlationId, IMessageQueueTransaction transaction); - + IMessage ReceiveByCorrelationId (string correlationId, MessageQueueTransactionType transactionType); IMessage ReceiveByCorrelationId (string correlationId, TimeSpan timeout, IMessageQueueTransaction transaction); IMessage ReceiveByCorrelationId (string correlationId, TimeSpan timeout, MessageQueueTransactionType transactionType); + IAsyncResult BeginPeek (); + + IAsyncResult BeginPeek (TimeSpan timeout); + + IAsyncResult BeginPeek (TimeSpan timeout, object stateObject); + + IAsyncResult BeginPeek (TimeSpan timeout, object stateObject, AsyncCallback callback); + + IMessage EndPeek (IAsyncResult asyncResult); + + IAsyncResult BeginReceive (); + + IAsyncResult BeginReceive (TimeSpan timeout); + + IAsyncResult BeginReceive (TimeSpan timeout, object stateObject); + + IAsyncResult BeginReceive (TimeSpan timeout, object stateObject, AsyncCallback callback); + + IMessage EndReceive (IAsyncResult asyncResult); + IMessageEnumerator GetMessageEnumerator (); + + event CompletedEventHandler PeekCompleted; + + event CompletedEventHandler ReceiveCompleted; + + void SendReceiveCompleted (IAsyncResult result); + + void SendPeekCompleted (IAsyncResult result); } } diff --git a/mcs/class/Mono.Messaging/Mono.Messaging/MessageQueueBase.cs b/mcs/class/Mono.Messaging/Mono.Messaging/MessageQueueBase.cs new file mode 100644 index 00000000000..e15d4471e63 --- /dev/null +++ b/mcs/class/Mono.Messaging/Mono.Messaging/MessageQueueBase.cs @@ -0,0 +1,263 @@ +// +// Mono.Messaging +// +// Authors: +// Michael Barker (mike@middlesoft.co.uk) +// +// (C) 2008 Michael Barker +// + +// +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// + +using System; +using System.Threading; +using System.Collections; + +namespace Mono.Messaging +{ + + public abstract class MessageQueueBase + { + protected abstract IMessageQueue Queue { + get; + } + + public event CompletedEventHandler PeekCompleted; + + public event CompletedEventHandler ReceiveCompleted; + + public IAsyncResult BeginPeek () + { + return new PeekAsyncResult (null, Queue, MessagingProviderLocator.InfiniteTimeout, + NullAsyncCallback); + } + + public IAsyncResult BeginPeek (TimeSpan timeout) + { + return new PeekAsyncResult (null, Queue, timeout, NullAsyncCallback); + } + + public IAsyncResult BeginPeek (TimeSpan timeout, object stateObject) + { + return new PeekAsyncResult (stateObject, Queue, + timeout, NullAsyncCallback); + } + + public IAsyncResult BeginPeek (TimeSpan timeout, + object stateObject, + AsyncCallback callback) + { + return new PeekAsyncResult (stateObject, Queue, timeout, callback); + } + + public IMessage EndPeek (IAsyncResult asyncResult) + { + PeekAsyncResult result = (PeekAsyncResult) asyncResult; + return result.Message; + } + + public IAsyncResult BeginReceive () + { + return new ReceiveAsyncResult (null, Queue, MessagingProviderLocator.InfiniteTimeout, + NullAsyncCallback); + } + + public IAsyncResult BeginReceive (TimeSpan timeout) + { + return new ReceiveAsyncResult (null, Queue, timeout, NullAsyncCallback); + } + + public IAsyncResult BeginReceive (TimeSpan timeout, object stateObject) + { + return new ReceiveAsyncResult (stateObject, Queue, timeout, NullAsyncCallback); + } + + public IAsyncResult BeginReceive (TimeSpan timeout, + object stateObject, + AsyncCallback callback) + { + return new ReceiveAsyncResult (stateObject, Queue, timeout, callback); + } + + public IMessage EndReceive (IAsyncResult asyncResult) + { + ReceiveAsyncResult result = (ReceiveAsyncResult) asyncResult; + return result.Message; + } + + public void SendReceiveCompleted (IAsyncResult result) + { + if (ReceiveCompleted == null) + return; + + ReceiveCompleted (this, new CompletedEventArgs (result)); + } + + public void SendPeekCompleted (IAsyncResult result) + { + if (PeekCompleted == null) + return; + + PeekCompleted (this, new CompletedEventArgs (result)); + } + + internal void NullAsyncCallback (IAsyncResult result) + { + } + + internal class ThreadWaitHandle : WaitHandle { + + private readonly Thread t; + + public ThreadWaitHandle (Thread t) + { + this.t = t; + } + + public override bool WaitOne () + { + t.Join (); + return true; + } + + public override bool WaitOne (Int32 timeout, bool exitContext) + { + t.Join (timeout); + return true; + } + + public override bool WaitOne (TimeSpan timeout, bool exitContext) + { + t.Join (timeout); + return true; + } + } + + internal abstract class AsyncResultBase : IAsyncResult { + + private readonly object asyncState; + protected readonly WaitHandle asyncWaitHandle; + protected volatile bool isCompleted; + protected readonly IMessageQueue q; + private readonly Thread t; + protected IMessage message; + protected readonly TimeSpan timeout; + protected readonly AsyncCallback callback; + + public AsyncResultBase (object asyncState, + IMessageQueue q, + TimeSpan timeout, + AsyncCallback callback) + { + this.asyncState = asyncState; + this.asyncWaitHandle = new Mutex (false); + this.q = q; + this.timeout = timeout; + this.callback = callback; + this.t = new Thread(run); + t.Start (); + asyncWaitHandle = new ThreadWaitHandle(t); + } + + public object AsyncState { + get { return asyncState; } + } + + public WaitHandle AsyncWaitHandle { + get { return asyncWaitHandle; } + } + + public bool CompletedSynchronously { + get { return false; } + } + + public bool IsCompleted { + get { return isCompleted; } + } + + internal IMessage Message { + get { return message; } + } + + protected abstract IMessage GetMessage (); + + protected abstract void SendCompletedEvent (IAsyncResult result); + + private void run () + { + message = GetMessage (); + isCompleted = true; + callback (this); + SendCompletedEvent (this); + } + } + + internal class ReceiveAsyncResult : AsyncResultBase { + + public ReceiveAsyncResult (object asyncState, + IMessageQueue q, + TimeSpan timeout, + AsyncCallback callback) + : base (asyncState, q, timeout, callback) + { + } + + protected override IMessage GetMessage () + { + if (timeout == MessagingProviderLocator.InfiniteTimeout) + return q.Receive (); + else + return q.Receive (timeout); + } + + protected override void SendCompletedEvent (IAsyncResult result) + { + q.SendReceiveCompleted (result); + } + } + + internal class PeekAsyncResult : AsyncResultBase { + + public PeekAsyncResult (object asyncState, + IMessageQueue q, + TimeSpan timeout, + AsyncCallback callback) + : base (asyncState, q, timeout, callback) + { + } + + protected override void SendCompletedEvent (IAsyncResult result) + { + Console.WriteLine ("Send Peek Completed"); + q.SendPeekCompleted (result); + } + + protected override IMessage GetMessage () + { + if (timeout == MessagingProviderLocator.InfiniteTimeout) + return q.Peek (); + else + return q.Peek (timeout); + } + } + } +} diff --git a/mcs/class/Mono.Messaging/Mono.Messaging/MessagingProviderLocator.cs b/mcs/class/Mono.Messaging/Mono.Messaging/MessagingProviderLocator.cs index 2a647aa2a99..f267e7e19f6 100644 --- a/mcs/class/Mono.Messaging/Mono.Messaging/MessagingProviderLocator.cs +++ b/mcs/class/Mono.Messaging/Mono.Messaging/MessagingProviderLocator.cs @@ -36,7 +36,8 @@ namespace Mono.Messaging { private static IMessagingProvider provider = null; private static readonly object syncObj = new object(); - + public static readonly TimeSpan InfiniteTimeout = TimeSpan.MaxValue; + public static IMessagingProvider GetProvider () { //Assembly a = Assembly.Load("Mono.Messaging.RabbitMQ.dll"); diff --git a/mcs/class/Mono.Messaging/Mono.Messaging/QueueReference.cs b/mcs/class/Mono.Messaging/Mono.Messaging/QueueReference.cs index e9770b65c37..f85bc3a7820 100644 --- a/mcs/class/Mono.Messaging/Mono.Messaging/QueueReference.cs +++ b/mcs/class/Mono.Messaging/Mono.Messaging/QueueReference.cs @@ -84,7 +84,6 @@ namespace Mono.Messaging public override bool Equals (object other) { - Console.Write("Equals Called\n"); if (other == null) return false; else if (typeof (QueueReference) != other.GetType ()) -- 2.25.1