Merge pull request #5714 from alexischr/update_bockbuild
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessagingProvider.cs
1 //
2 // Mono.Messaging.RabbitMQ
3 //
4 // Authors:
5 //        Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 // 
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 // 
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30
31 using System;
32 using System.Collections;
33 using System.Net;
34 using System.Net.Sockets;
35 using System.Threading;
36
37 using Mono.Messaging;
38
39 using RabbitMQ.Client;
40
41 namespace Mono.Messaging.RabbitMQ {
42
43         public class RabbitMQMessagingProvider : IMessagingProvider {
44                 
45                 private int txCounter = 0;
46                 private readonly uint localIp;
47                 private readonly MessagingContextPool contextPool;
48                 
49                 public RabbitMQMessagingProvider ()
50                 {
51                         localIp = GetLocalIP ();
52                         contextPool = new MessagingContextPool (new MessageFactory (this),
53                                                                                                         CreateConnection);
54                 }
55                 
56                 private static uint GetLocalIP ()
57                 {
58                         String strHostName = Dns.GetHostName ();
59                         IPHostEntry ipEntry = Dns.GetHostByName (strHostName);
60                         foreach (IPAddress ip in ipEntry.AddressList) {
61                                 if (AddressFamily.InterNetwork == ip.AddressFamily) {
62                                         byte[] addr = ip.GetAddressBytes ();
63                                         uint localIP = 0;
64                                         for (int i = 0; i < 4 && i < addr.Length; i++) {
65                                                 localIP += (uint) (addr[i] << 8 * (3 - i));
66                                         }
67                                         return localIP;
68                                 }
69                         }
70                         return 0;
71                 }
72                 
73                 public IMessage CreateMessage ()
74                 {
75                         return new MessageBase ();
76                 }
77                 
78                 public IMessageQueueTransaction CreateMessageQueueTransaction ()
79                 {
80                         Interlocked.Increment (ref txCounter);
81                         string txId = localIp.ToString () + txCounter.ToString ();
82                         
83                         return new RabbitMQMessageQueueTransaction (txId, contextPool);
84                 }
85                 
86                 public IMessagingContext CreateContext (string host)
87                 {
88                         return contextPool.GetContext (host);
89                 }
90                 
91                 private IConnection CreateConnection (string host)
92                 {
93                         ConnectionFactory cf = new ConnectionFactory ();
94                         cf.Address = host;
95                         return cf.CreateConnection ();
96                 }
97                 
98                 public void DeleteQueue (QueueReference qRef)
99                 {
100                         RabbitMQMessageQueue.Delete (qRef);
101                 }
102                 
103                 private readonly IDictionary queues = new Hashtable ();
104                 private readonly ReaderWriterLock qLock = new ReaderWriterLock ();
105                 private const int TIMEOUT = 15000;
106                 
107                 public IMessageQueue[] GetPublicQueues ()
108                 {
109                         IMessageQueue[] qs;
110                         qLock.AcquireReaderLock (TIMEOUT);
111                         try {
112                                 ICollection qCollection = queues.Values;
113                                 qs = new IMessageQueue[qCollection.Count];
114                                 qCollection.CopyTo (qs, 0);
115                                 return qs;
116                         } finally {
117                                 qLock.ReleaseReaderLock ();
118                         }
119                 }
120                 
121                 public bool Exists (QueueReference qRef)
122                 {
123                         qLock.AcquireReaderLock (TIMEOUT);
124                         try {
125                                 return queues.Contains (qRef);
126                         } finally {
127                                 qLock.ReleaseReaderLock ();
128                         }
129                 }
130                 
131                 public IMessageQueue CreateMessageQueue (QueueReference qRef,
132                                                          bool transactional)
133                 {
134                         qLock.AcquireWriterLock (TIMEOUT);
135                         try {
136                                 IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, transactional);
137                                 queues[qRef] = mq;
138                                 return mq;
139                         } finally {
140                                 qLock.ReleaseWriterLock ();
141                         }
142                 }
143
144                 public IMessageQueue GetMessageQueue (QueueReference qRef)
145                 {
146                         qLock.AcquireReaderLock (TIMEOUT);
147                         try {
148                                 if (queues.Contains (qRef))
149                                         return (IMessageQueue) queues[qRef];
150                                 else {
151                                         LockCookie lc = qLock.UpgradeToWriterLock (TIMEOUT);
152                                         try {
153                                                 IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, false);
154                                                 queues[qRef] = mq;
155                                                 return mq;
156                                         } finally {
157                                                 qLock.DowngradeFromWriterLock (ref lc);
158                                         }
159                                 }
160                         } finally {
161                                 qLock.ReleaseReaderLock ();
162                         }
163                 }
164                 
165
166         }
167 }