c1cb54fa9a18197e4c8f86e0aa28f06ece082e71
[mono.git] / mcs / class / RabbitMQ.Client / src / client / messagepatterns / SimpleRpcClient.cs
1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
3 //
4 // The APL v2.0:
5 //
6 //---------------------------------------------------------------------------
7 //   Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial
8 //   Technologies LLC., and Rabbit Technologies Ltd.
9 //
10 //   Licensed under the Apache License, Version 2.0 (the "License");
11 //   you may not use this file except in compliance with the License.
12 //   You may obtain a copy of the License at
13 //
14 //       http://www.apache.org/licenses/LICENSE-2.0
15 //
16 //   Unless required by applicable law or agreed to in writing, software
17 //   distributed under the License is distributed on an "AS IS" BASIS,
18 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 //   See the License for the specific language governing permissions and
20 //   limitations under the License.
21 //---------------------------------------------------------------------------
22 //
23 // The MPL v1.1:
24 //
25 //---------------------------------------------------------------------------
26 //   The contents of this file are subject to the Mozilla Public License
27 //   Version 1.1 (the "License"); you may not use this file except in
28 //   compliance with the License. You may obtain a copy of the License at
29 //   http://www.rabbitmq.com/mpl.html
30 //
31 //   Software distributed under the License is distributed on an "AS IS"
32 //   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 //   License for the specific language governing rights and limitations
34 //   under the License.
35 //
36 //   The Original Code is The RabbitMQ .NET Client.
37 //
38 //   The Initial Developers of the Original Code are LShift Ltd,
39 //   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40 //
41 //   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42 //   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43 //   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44 //   Technologies LLC, and Rabbit Technologies Ltd.
45 //
46 //   Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
47 //   Ltd. Portions created by Cohesive Financial Technologies LLC are
48 //   Copyright (C) 2007-2009 Cohesive Financial Technologies
49 //   LLC. Portions created by Rabbit Technologies Ltd are Copyright
50 //   (C) 2007-2009 Rabbit Technologies Ltd.
51 //
52 //   All Rights Reserved.
53 //
54 //   Contributor(s): ______________________________________.
55 //
56 //---------------------------------------------------------------------------
57 using System;
58 using System.IO;
59 using System.Net;
60 using System.Collections;
61 using System.Threading;
62
63 using RabbitMQ.Client;
64 using RabbitMQ.Client.Content;
65 using RabbitMQ.Client.Events;
66
67 namespace RabbitMQ.Client.MessagePatterns {
68     ///<summary>Implements a simple RPC client.</summary>
69     ///<remarks>
70     ///<para>
71     /// This class sends requests that can be processed by remote
72     /// SimpleRpcServer instances.
73     ///</para>
74     ///<para>
75     /// The basic pattern for accessing a remote service is to
76     /// determine the exchange name and routing key needed for
77     /// submissions of service requests, and to construct a
78     /// SimpleRpcClient instance using that address. Once constructed,
79     /// the various Call() and Cast() overloads can be used to send
80     /// requests and receive the corresponding replies.
81     ///</para>
82     ///<example><code>
83     /// string queueName = "ServiceRequestQueue"; // See also Subscription ctors
84     /// using (IConnection conn = new ConnectionFactory()
85     ///                                 .CreateConnection(serverAddress)) {
86     ///     using (IModel ch = conn.CreateModel()) {
87     ///         SimpleRpcClient client =
88     ///             new SimpleRpcClient(ch, queueName);
89     ///         client.TimeoutMilliseconds = 5000; // optional
90     /// 
91     ///         /// ... make use of the various Call() overloads
92     ///     }
93     /// }
94     ///</code></example>
95     ///<para>
96     /// Instances of this class do not themselves declare any
97     /// resources (exchanges, queues or bindings). The Subscription we
98     /// use for receiving RPC replies declares its own resources
99     /// (usually a single queue), but if we are sending to an exchange
100     /// other than one of the AMQP-standard mandated predefined
101     /// exchanges, it is the user's responsibility to ensure that the
102     /// exchange concerned exists (using IModel.ExchangeDeclare)
103     /// before invoking Call() or Cast().
104     ///</para>
105     ///<para>
106     /// This class implements only a few basic RPC message formats -
107     /// to extend it with support for more formats, either subclass,
108     /// or transcode the messages before transmission using the
109     /// built-in byte[] format.
110     ///</para>
111     ///</remarks>
112     ///<see cref="SimpleRpcServer"/>
113     public class SimpleRpcClient: IDisposable {
114         ///<summary>This event is fired whenever Call() decides that a
115         ///timeout has occurred while waiting for a reply from the
116         ///service.</summary>
117         ///<remarks>
118         /// See also OnTimedOut().
119         ///</remarks>
120         public event EventHandler TimedOut;
121
122         ///<summary>This event is fired whenever Call() detects the
123         ///disconnection of the underlying Subscription while waiting
124         ///for a reply from the service.</summary>
125         ///<remarks>
126         /// See also OnDisconnected(). Note that the sending of a
127         /// request may result in OperationInterruptedException before
128         /// the request is even sent.
129         ///</remarks>
130         public event EventHandler Disconnected;
131
132         protected IModel m_model;
133         protected Subscription m_subscription;
134         private PublicationAddress m_address;
135         private int m_timeout;
136
137         ///<summary>Retrieve the IModel this instance uses to communicate.</summary>
138         public IModel Model { get { return m_model; } }
139
140         ///<summary>Retrieve the Subscription that is used to receive
141         ///RPC replies corresponding to Call() RPC requests. May be
142         ///null.</summary>
143         ///<remarks>
144         ///<para>
145         /// Upon construction, this property will be null. It is
146         /// initialised by the protected virtual method
147         /// EnsureSubscription upon the first call to Call(). Calls to
148         /// Cast() do not initialise the subscription, since no
149         /// replies are expected or possible when using Cast().
150         ///</para>
151         ///</remarks>
152         public Subscription Subscription { get { return m_subscription; } }
153
154         ///<summary>Retrieve or modify the address that will be used
155         ///for the next Call() or Cast().</summary>
156         ///<remarks>
157         /// This address represents the service, i.e. the destination
158         /// service requests should be published to. It can be changed
159         /// at any time before a Call() or Cast() request is sent -
160         /// the value at the time of the call is used by Call() and
161         /// Cast().
162         ///</remarks>
163         public PublicationAddress Address {
164             get { return m_address; }
165             set { m_address = value; }
166         }
167
168         ///<summary>Retrieve or modify the timeout (in milliseconds)
169         ///that will be used for the next Call().</summary>
170         ///<remarks>
171         ///<para>
172         /// This property defaults to
173         /// System.Threading.Timeout.Infinite (i.e. -1). If it is set
174         /// to any other value, Call() will only wait for the
175         /// specified amount of time before returning indicating a
176         /// timeout.
177         ///</para>
178         ///<para>
179         /// See also TimedOut event and OnTimedOut().
180         ///</para>
181         ///</remarks>
182         public int TimeoutMilliseconds {
183             get { return m_timeout; }
184             set { m_timeout = value; }
185         }
186
187         ///<summary>Construct an instance with no configured
188         ///Address. The Address property must be set before Call() or
189         ///Cast() are called.</summary>
190         public SimpleRpcClient(IModel model)
191             : this(model, (PublicationAddress) null) {}
192
193         ///<summary>Construct an instance that will deliver to the
194         ///default exchange (""), with routing key equal to the passed
195         ///in queueName, thereby delivering directly to a named queue
196         ///on the AMQP server.</summary>
197         public SimpleRpcClient(IModel model, string queueName)
198             : this(model, new PublicationAddress(ExchangeType.Direct, "", queueName)) {}
199
200         ///<summary>Construct an instance that will deliver to the
201         ///named and typed exchange, with the given routing
202         ///key.</summary>
203         public SimpleRpcClient(IModel model, string exchange,
204                                string exchangeType, string routingKey)
205             : this(model, new PublicationAddress(exchangeType, exchange, routingKey)) {}
206
207         ///<summary>Construct an instance that will deliver to the
208         ///given address.</summary>
209         public SimpleRpcClient(IModel model, PublicationAddress address)
210         {
211             m_model = model;
212             m_address = address;
213             m_subscription = null;
214             m_timeout = Timeout.Infinite;
215         }
216
217         ///<summary>Close the reply subscription associated with this instance, if any.</summary>
218         ///<remarks>
219         /// Simply delegates to calling Subscription.Close(). Clears
220         /// the Subscription property, so that subsequent Call()s, if
221         /// any, will re-initialize it to a fresh Subscription
222         /// instance.
223         ///</remarks>
224         public void Close()
225         {
226             if (m_subscription != null) {
227                 m_subscription.Close();
228                 m_subscription = null;
229             }
230         }
231
232         ///<summary>Should initialise m_subscription to be non-null
233         ///and usable for fetching RPC replies from the service
234         ///through the AMQP server.</summary>
235         protected virtual void EnsureSubscription()
236         {
237             if (m_subscription == null) {
238                 m_subscription = new Subscription(m_model);
239             }
240         }
241
242         ///<summary>Sends a "jms/stream-message"-encoded RPC request,
243         ///and expects an RPC reply in the same format.</summary>
244         ///<remarks>
245         ///<para>
246         /// The arguments passed in must be of types that are
247         /// representable as JMS StreamMessage values, and so must the
248         /// results returned from the service in its reply message.
249         ///</para>
250         ///<para>
251         /// Calls OnTimedOut() and OnDisconnected() when a timeout or
252         /// disconnection, respectively, is detected when waiting for
253         /// our reply.
254         ///</para>
255         ///<para>
256         /// Returns null if the request timed out or if we were
257         /// disconnected before a reply arrived.
258         ///</para>
259         ///<para>
260         /// The reply message, if any, is acknowledged to the AMQP
261         /// server via Subscription.Ack().
262         ///</para>
263         ///</remarks>
264         ///<see cref="IStreamMessageBuilder"/>
265         ///<see cref="IStreamMessageReader"/>
266         public virtual object[] Call(params object[] args)
267         {
268             IStreamMessageBuilder builder = new StreamMessageBuilder(m_model);
269             builder.WriteObjects(args);
270             IBasicProperties replyProperties;
271             byte[] replyBody = Call((IBasicProperties) builder.GetContentHeader(),
272                                     builder.GetContentBody(),
273                                     out replyProperties);
274             if (replyProperties == null) {
275                 return null;
276             }
277             if (replyProperties.ContentType != StreamMessageBuilder.MimeType) {
278                 throw new ProtocolViolationException
279                     (string.Format("Expected reply of MIME type {0}; got {1}",
280                                    StreamMessageBuilder.MimeType,
281                                    replyProperties.ContentType));
282             }
283             IStreamMessageReader reader = new StreamMessageReader(replyProperties, replyBody);
284             return reader.ReadObjects();
285         }
286
287         ///<summary>Sends a simple byte[] message, without any custom
288         ///headers or properties.</summary>
289         ///<remarks>
290         ///<para>
291         /// Delegates directly to Call(IBasicProperties, byte[]), and
292         /// discards the properties of the received reply, returning
293         /// only the body of the reply.
294         ///</para>
295         ///<para>
296         /// Calls OnTimedOut() and OnDisconnected() when a timeout or
297         /// disconnection, respectively, is detected when waiting for
298         /// our reply.
299         ///</para>
300         ///<para>
301         /// Returns null if the request timed out or if we were
302         /// disconnected before a reply arrived.
303         ///</para>
304         ///<para>
305         /// The reply message, if any, is acknowledged to the AMQP
306         /// server via Subscription.Ack().
307         ///</para>
308         ///</remarks>
309         public virtual byte[] Call(byte[] body)
310         {
311             BasicDeliverEventArgs reply = Call(null, body);
312             return reply == null ? null : reply.Body;
313         }
314
315         ///<summary>Sends a byte[] message and IBasicProperties
316         ///header, returning both the body and headers of the received
317         ///reply.</summary>
318         ///<remarks>
319         ///<para>
320         /// Sets the "replyProperties" outbound parameter to the
321         /// properties of the received reply, and returns the byte[]
322         /// body of the reply.
323         ///</para>
324         ///<para>
325         /// Calls OnTimedOut() and OnDisconnected() when a timeout or
326         /// disconnection, respectively, is detected when waiting for
327         /// our reply.
328         ///</para>
329         ///<para>
330         /// Both sets "replyProperties" to null and returns null when
331         /// either the request timed out or we were disconnected
332         /// before a reply arrived.
333         ///</para>
334         ///<para>
335         /// The reply message, if any, is acknowledged to the AMQP
336         /// server via Subscription.Ack().
337         ///</para>
338         ///</remarks>
339         public virtual byte[] Call(IBasicProperties requestProperties,
340                                    byte[] body,
341                                    out IBasicProperties replyProperties)
342         {
343             BasicDeliverEventArgs reply = Call(requestProperties, body);
344             if (reply == null) {
345                 replyProperties = null;
346                 return null;
347             } else {
348                 replyProperties = reply.BasicProperties;
349                 return reply.Body;
350             }
351         }
352
353         ///<summary>Sends a byte[]/IBasicProperties RPC request,
354         ///returning full information about the delivered reply as a
355         ///BasicDeliverEventArgs.</summary>
356         ///<remarks>
357         ///<para>
358         /// This is the most general/lowest-level Call()-style method
359         /// on SimpleRpcClient. It sets CorrelationId and ReplyTo on
360         /// the request message's headers before transmitting the
361         /// request to the service via the AMQP server. If the reply's
362         /// CorrelationId does not match the request's CorrelationId,
363         /// ProtocolViolationException will be thrown.
364         ///</para>
365         ///<para>
366         /// Calls OnTimedOut() and OnDisconnected() when a timeout or
367         /// disconnection, respectively, is detected when waiting for
368         /// our reply.
369         ///</para>
370         ///<para>
371         /// Returns null if the request timed out or if we were
372         /// disconnected before a reply arrived.
373         ///</para>
374         ///<para>
375         /// The reply message, if any, is acknowledged to the AMQP
376         /// server via Subscription.Ack().
377         ///</para>
378         ///</remarks>
379         ///<see cref="ProtocolViolationException"/>
380         public virtual BasicDeliverEventArgs Call(IBasicProperties requestProperties, byte[] body)
381         {
382             EnsureSubscription();
383
384             if (requestProperties == null) {
385                 requestProperties = m_model.CreateBasicProperties();
386             }
387             requestProperties.CorrelationId = Guid.NewGuid().ToString();
388             requestProperties.ReplyTo = m_subscription.QueueName;
389
390             Cast(requestProperties, body);
391             return RetrieveReply(requestProperties.CorrelationId);
392         }
393
394         ///<summary>Retrieves the reply for the request with the given
395         ///correlation ID from our internal Subscription.</summary>
396         ///<remarks>
397         /// Currently requires replies to arrive in the same order as
398         /// the requests were sent out. Subclasses may override this
399         /// to provide more sophisticated behaviour.
400         ///</remarks>
401         protected virtual BasicDeliverEventArgs RetrieveReply(string correlationId)
402         {
403             BasicDeliverEventArgs reply;
404             if (!m_subscription.Next(m_timeout, out reply)) {
405                 OnTimedOut();
406                 return null;
407             }
408
409             if (reply == null) {
410                 OnDisconnected();
411                 return null;
412             }
413
414             if (reply.BasicProperties.CorrelationId != correlationId) {
415                 throw new ProtocolViolationException
416                     (string.Format("Wrong CorrelationId in reply; expected {0}, got {1}",
417                                    correlationId,
418                                    reply.BasicProperties.CorrelationId));
419             }
420
421             m_subscription.Ack(reply);
422             return reply;
423         }
424
425         ///<summary>Sends an asynchronous/one-way message to the
426         ///service.</summary>
427         public virtual void Cast(IBasicProperties requestProperties,
428                                  byte[] body)
429         {
430             m_model.BasicPublish(Address,
431                                  requestProperties,
432                                  body);
433         }
434
435         ///<summary>Signals that the configured timeout fired while
436         ///waiting for an RPC reply.</summary>
437         ///<remarks>
438         /// Fires the TimedOut event.
439         ///</remarks>
440         public virtual void OnTimedOut()
441         {
442             if (TimedOut != null)
443                 TimedOut(this, null);
444         }
445
446         ///<summary>Signals that the Subscription we use for receiving
447         ///our RPC replies was disconnected while we were
448         ///waiting.</summary>
449         ///<remarks>
450         /// Fires the Disconnected event.
451         ///</remarks>
452         public virtual void OnDisconnected()
453         {
454             if (Disconnected != null)
455                 Disconnected(this, null);
456         }
457
458         ///<summary>Implement the IDisposable interface, permitting
459         ///SimpleRpcClient instances to be used in using
460         ///statements.</summary>
461         void IDisposable.Dispose()
462         {
463             Close();
464         }
465     }
466 }