2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 using RabbitMQ.Client;
36 using RabbitMQ.Client.Content;
37 using RabbitMQ.Client.Events;
38 using RabbitMQ.Client.Exceptions;
39 using RabbitMQ.Client.MessagePatterns;
42 namespace Mono.Messaging.RabbitMQ {
44 public class RabbitMQMessageEnumerator : IMessageEnumerator {
46 private readonly MessageFactory helper;
47 private readonly QueueReference qRef;
48 private IConnection cn = null;
49 private BasicDeliverEventArgs current = null;
50 private IModel model = null;
51 private Subscription subscription = null;
53 public RabbitMQMessageEnumerator (MessageFactory helper,
54 QueueReference qRef) {
59 public IMessage Current {
62 throw new InvalidOperationException ();
64 return CreateMessage (current);
68 public IntPtr CursorHandle {
69 get { throw new NotImplementedException (); }
74 if (subscription != null) {
75 subscription.Close ();
90 public void Dispose (bool disposing)
94 public void Dispose ()
104 private IModel Model {
107 ConnectionFactory cf = new ConnectionFactory ();
108 cf.Address = qRef.Host;
109 cn = cf.CreateConnection ();
113 model = cn.CreateModel ();
120 private Subscription Subscription {
122 if (subscription == null) {
125 string finalName = ch.QueueDeclare (qRef.Queue, false);
127 subscription = new Subscription (ch, finalName);
134 public bool MoveNext ()
136 Subscription sub = Subscription;
137 return sub.Next (500, out current);
140 public bool MoveNext (TimeSpan timeout)
142 int to = MessageFactory.TimeSpanToInt32 (timeout);
143 return Subscription.Next (to, out current);
146 public IMessage RemoveCurrent ()
149 throw new InvalidOperationException ();
151 IMessage msg = CreateMessage (current);
152 Subscription.Ack (current);
156 public IMessage RemoveCurrent (IMessageQueueTransaction transaction)
158 throw new NotSupportedException ("Unable to remove messages within a transaction");
161 public IMessage RemoveCurrent (MessageQueueTransactionType transactionType)
163 throw new NotSupportedException ("Unable to remove messages within a transaction");
166 public IMessage RemoveCurrent (TimeSpan timeout)
168 // Timeout makes no sense for this implementation, so we just work
169 // the same as the non-timeout based one.
172 throw new InvalidOperationException ();
174 IMessage msg = CreateMessage (current);
175 Subscription.Ack (current);
179 public IMessage RemoveCurrent (TimeSpan timeout, IMessageQueueTransaction transaction)
181 throw new NotImplementedException ();
184 public IMessage RemoveCurrent (TimeSpan timeout, MessageQueueTransactionType transactionType)
186 throw new NotImplementedException ();
189 private IMessage CreateMessage (BasicDeliverEventArgs result)
191 return helper.ReadMessage (qRef, result);