2010-06-23: Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessageEnumerator.cs
1 //
2 // Mono.Messaging.RabbitMQ
3 //
4 // Authors:
5 //        Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 // 
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 // 
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30
31 using System;
32
33 using Mono.Messaging;
34
35 using RabbitMQ.Client;
36 using RabbitMQ.Client.Content;
37 using RabbitMQ.Client.Events;
38 using RabbitMQ.Client.Exceptions;
39 using RabbitMQ.Client.MessagePatterns;
40 using RabbitMQ.Util;
41
42 namespace Mono.Messaging.RabbitMQ {
43
44         public class RabbitMQMessageEnumerator : IMessageEnumerator {
45                 
46                 private readonly MessageFactory helper;
47                 private readonly QueueReference qRef;
48                 private IConnection cn = null;
49                 private BasicDeliverEventArgs current = null;
50                 private IModel model = null;
51                 private Subscription subscription = null;
52                 
53                 public RabbitMQMessageEnumerator (MessageFactory helper,
54                                                   QueueReference qRef) {
55                         this.helper = helper;
56                         this.qRef = qRef;
57                 }
58                 
59                 public IMessage Current { 
60                         get {
61                                 if (current == null)
62                                         throw new InvalidOperationException ();
63                                 
64                                 return CreateMessage (current);
65                         }
66                 }
67                 
68                 public IntPtr CursorHandle {
69                         get { throw new NotImplementedException (); }
70                 }
71                 
72                 public void Close ()
73                 {
74                         if (subscription != null) {
75                                 subscription.Close ();
76                                 subscription = null;
77                         }
78                         
79                         if (model != null) {
80                                 model.Dispose ();
81                                 model = null;
82                         }
83                         
84                         if (cn != null) {
85                                 cn.Dispose ();
86                                 cn = null;
87                         }
88                 }
89
90                 public void Dispose (bool disposing)
91                 {
92                 }
93                 
94                 public void Dispose ()
95                 {
96                         Close ();
97                 }
98                 
99                 public void Reset ()
100                 {
101                         Close ();
102                 }
103                 
104                 private IModel Model {
105                         get {
106                                 if (cn == null) {
107                                         ConnectionFactory cf = new ConnectionFactory ();
108                                         cf.Address = qRef.Host;
109                                         cn = cf.CreateConnection ();
110                                 }
111                                 
112                                 if (model == null) {
113                                         model = cn.CreateModel ();
114                                 }
115                                 
116                                 return model;
117                         }
118                 }
119                 
120                 private Subscription Subscription {
121                         get {
122                                 if (subscription == null) {
123                                         IModel ch = Model;
124                                         
125                                         string finalName = ch.QueueDeclare (qRef.Queue, false);
126                                         
127                                         subscription = new Subscription (ch, finalName);
128                                 }
129                                 
130                                 return subscription;
131                         }
132                 }
133
134                 public bool MoveNext ()
135                 {
136                         Subscription sub = Subscription;
137                         return sub.Next (500, out current);
138                 }
139                 
140                 public bool MoveNext (TimeSpan timeout)
141                 {
142                         int to = MessageFactory.TimeSpanToInt32 (timeout);
143                         return Subscription.Next (to, out current);
144                 }
145
146                 public IMessage RemoveCurrent ()
147                 {
148                         if (current == null)
149                                 throw new InvalidOperationException ();
150                         
151                         IMessage msg = CreateMessage (current);
152                         Subscription.Ack (current);
153                         return msg;
154                 }
155                 
156                 public IMessage RemoveCurrent (IMessageQueueTransaction transaction)
157                 {
158                         throw new NotSupportedException ("Unable to remove messages within a transaction");
159                 }
160                 
161                 public IMessage RemoveCurrent (MessageQueueTransactionType transactionType)
162                 {
163                         throw new NotSupportedException ("Unable to remove messages within a transaction");
164                 }
165                 
166                 public IMessage RemoveCurrent (TimeSpan timeout)
167                 {
168                         // Timeout makes no sense for this implementation, so we just work 
169                         // the same as the non-timeout based one. 
170                         
171                         if (current == null)
172                                 throw new InvalidOperationException ();
173                         
174                         IMessage msg = CreateMessage (current);
175                         Subscription.Ack (current);
176                         return msg;
177                 }
178                 
179                 public IMessage RemoveCurrent (TimeSpan timeout, IMessageQueueTransaction transaction)
180                 {
181                         throw new NotImplementedException ();
182                 }
183                 
184                 public IMessage RemoveCurrent (TimeSpan timeout, MessageQueueTransactionType transactionType)
185                 {
186                         throw new NotImplementedException ();
187                 }
188                 
189                 private IMessage CreateMessage (BasicDeliverEventArgs result)
190                 {
191                         return helper.ReadMessage (qRef, result);
192                 }
193
194         }
195 }