2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System.Collections;
34 using System.Net.Sockets;
35 using System.Threading;
39 using RabbitMQ.Client;
41 namespace Mono.Messaging.RabbitMQ {
43 public class RabbitMQMessagingProvider : IMessagingProvider {
45 private int txCounter = 0;
46 private readonly uint localIp;
47 private readonly MessagingContextPool contextPool;
49 public RabbitMQMessagingProvider ()
51 localIp = GetLocalIP ();
52 contextPool = new MessagingContextPool (new MessageFactory (this),
56 private static uint GetLocalIP ()
58 String strHostName = Dns.GetHostName ();
59 IPHostEntry ipEntry = Dns.GetHostByName (strHostName);
60 foreach (IPAddress ip in ipEntry.AddressList) {
61 if (AddressFamily.InterNetwork == ip.AddressFamily) {
62 byte[] addr = ip.GetAddressBytes ();
64 for (int i = 0; i < 4 && i < addr.Length; i++) {
65 localIP += (uint) (addr[i] << 8 * (3 - i));
73 public IMessage CreateMessage ()
75 return new MessageBase ();
78 public IMessageQueueTransaction CreateMessageQueueTransaction ()
80 Interlocked.Increment (ref txCounter);
81 string txId = localIp.ToString () + txCounter.ToString ();
83 return new RabbitMQMessageQueueTransaction (txId, contextPool);
86 public IMessagingContext CreateContext (string host)
88 return contextPool.GetContext (host);
91 private IConnection CreateConnection (string host)
93 ConnectionFactory cf = new ConnectionFactory ();
95 return cf.CreateConnection ();
98 public void DeleteQueue (QueueReference qRef)
100 RabbitMQMessageQueue.Delete (qRef);
103 private readonly IDictionary queues = new Hashtable ();
104 private readonly ReaderWriterLock qLock = new ReaderWriterLock ();
105 private const int TIMEOUT = 15000;
107 public IMessageQueue[] GetPublicQueues ()
110 qLock.AcquireReaderLock (TIMEOUT);
112 ICollection qCollection = queues.Values;
113 qs = new IMessageQueue[qCollection.Count];
114 qCollection.CopyTo (qs, 0);
117 qLock.ReleaseReaderLock ();
121 public bool Exists (QueueReference qRef)
123 qLock.AcquireReaderLock (TIMEOUT);
125 return queues.Contains (qRef);
127 qLock.ReleaseReaderLock ();
131 public IMessageQueue CreateMessageQueue (QueueReference qRef,
134 qLock.AcquireWriterLock (TIMEOUT);
136 IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, transactional);
140 qLock.ReleaseWriterLock ();
144 public IMessageQueue GetMessageQueue (QueueReference qRef)
146 qLock.AcquireReaderLock (TIMEOUT);
148 if (queues.Contains (qRef))
149 return (IMessageQueue) queues[qRef];
151 LockCookie lc = qLock.UpgradeToWriterLock (TIMEOUT);
153 IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, false);
157 qLock.DowngradeFromWriterLock (ref lc);
161 qLock.ReleaseReaderLock ();