1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
6 //---------------------------------------------------------------------------
7 // Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial
8 // Technologies LLC., and Rabbit Technologies Ltd.
10 // Licensed under the Apache License, Version 2.0 (the "License");
11 // you may not use this file except in compliance with the License.
12 // You may obtain a copy of the License at
14 // http://www.apache.org/licenses/LICENSE-2.0
16 // Unless required by applicable law or agreed to in writing, software
17 // distributed under the License is distributed on an "AS IS" BASIS,
18 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 // See the License for the specific language governing permissions and
20 // limitations under the License.
21 //---------------------------------------------------------------------------
25 //---------------------------------------------------------------------------
26 // The contents of this file are subject to the Mozilla Public License
27 // Version 1.1 (the "License"); you may not use this file except in
28 // compliance with the License. You may obtain a copy of the License at
29 // http://www.rabbitmq.com/mpl.html
31 // Software distributed under the License is distributed on an "AS IS"
32 // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 // License for the specific language governing rights and limitations
36 // The Original Code is The RabbitMQ .NET Client.
38 // The Initial Developers of the Original Code are LShift Ltd,
39 // Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
41 // Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42 // Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43 // are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44 // Technologies LLC, and Rabbit Technologies Ltd.
46 // Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
47 // Ltd. Portions created by Cohesive Financial Technologies LLC are
48 // Copyright (C) 2007-2010 Cohesive Financial Technologies
49 // LLC. Portions created by Rabbit Technologies Ltd are Copyright
50 // (C) 2007-2010 Rabbit Technologies Ltd.
52 // All Rights Reserved.
54 // Contributor(s): ______________________________________.
56 //---------------------------------------------------------------------------
60 using System.Collections;
61 using System.Threading;
63 using RabbitMQ.Client;
64 using RabbitMQ.Client.Content;
65 using RabbitMQ.Client.Events;
67 namespace RabbitMQ.Client.MessagePatterns {
68 ///<summary>Implements a simple RPC client.</summary>
71 /// This class sends requests that can be processed by remote
72 /// SimpleRpcServer instances.
75 /// The basic pattern for accessing a remote service is to
76 /// determine the exchange name and routing key needed for
77 /// submissions of service requests, and to construct a
78 /// SimpleRpcClient instance using that address. Once constructed,
79 /// the various Call() and Cast() overloads can be used to send
80 /// requests and receive the corresponding replies.
83 /// string queueName = "ServiceRequestQueue"; // See also Subscription ctors
84 /// using (IConnection conn = new ConnectionFactory()
85 /// .CreateConnection(serverAddress)) {
86 /// using (IModel ch = conn.CreateModel()) {
87 /// SimpleRpcClient client =
88 /// new SimpleRpcClient(ch, queueName);
89 /// client.TimeoutMilliseconds = 5000; // optional
91 /// /// ... make use of the various Call() overloads
96 /// Instances of this class do not themselves declare any
97 /// resources (exchanges, queues or bindings). The Subscription we
98 /// use for receiving RPC replies declares its own resources
99 /// (usually a single queue), but if we are sending to an exchange
100 /// other than one of the AMQP-standard mandated predefined
101 /// exchanges, it is the user's responsibility to ensure that the
102 /// exchange concerned exists (using IModel.ExchangeDeclare)
103 /// before invoking Call() or Cast().
106 /// This class implements only a few basic RPC message formats -
107 /// to extend it with support for more formats, either subclass,
108 /// or transcode the messages before transmission using the
109 /// built-in byte[] format.
112 ///<see cref="SimpleRpcServer"/>
113 public class SimpleRpcClient: IDisposable {
114 ///<summary>This event is fired whenever Call() decides that a
115 ///timeout has occurred while waiting for a reply from the
116 ///service.</summary>
118 /// See also OnTimedOut().
120 public event EventHandler TimedOut;
122 ///<summary>This event is fired whenever Call() detects the
123 ///disconnection of the underlying Subscription while waiting
124 ///for a reply from the service.</summary>
126 /// See also OnDisconnected(). Note that the sending of a
127 /// request may result in OperationInterruptedException before
128 /// the request is even sent.
130 public event EventHandler Disconnected;
132 protected IModel m_model;
133 protected Subscription m_subscription;
134 private PublicationAddress m_address;
135 private int m_timeout;
137 ///<summary>Retrieve the IModel this instance uses to communicate.</summary>
138 public IModel Model { get { return m_model; } }
140 ///<summary>Retrieve the Subscription that is used to receive
141 ///RPC replies corresponding to Call() RPC requests. May be
145 /// Upon construction, this property will be null. It is
146 /// initialised by the protected virtual method
147 /// EnsureSubscription upon the first call to Call(). Calls to
148 /// Cast() do not initialise the subscription, since no
149 /// replies are expected or possible when using Cast().
152 public Subscription Subscription { get { return m_subscription; } }
154 ///<summary>Retrieve or modify the address that will be used
155 ///for the next Call() or Cast().</summary>
157 /// This address represents the service, i.e. the destination
158 /// service requests should be published to. It can be changed
159 /// at any time before a Call() or Cast() request is sent -
160 /// the value at the time of the call is used by Call() and
163 public PublicationAddress Address {
164 get { return m_address; }
165 set { m_address = value; }
168 ///<summary>Retrieve or modify the timeout (in milliseconds)
169 ///that will be used for the next Call().</summary>
172 /// This property defaults to
173 /// System.Threading.Timeout.Infinite (i.e. -1). If it is set
174 /// to any other value, Call() will only wait for the
175 /// specified amount of time before returning indicating a
179 /// See also TimedOut event and OnTimedOut().
182 public int TimeoutMilliseconds {
183 get { return m_timeout; }
184 set { m_timeout = value; }
187 ///<summary>Construct an instance with no configured
188 ///Address. The Address property must be set before Call() or
189 ///Cast() are called.</summary>
190 public SimpleRpcClient(IModel model)
191 : this(model, (PublicationAddress) null) {}
193 ///<summary>Construct an instance that will deliver to the
194 ///default exchange (""), with routing key equal to the passed
195 ///in queueName, thereby delivering directly to a named queue
196 ///on the AMQP server.</summary>
197 public SimpleRpcClient(IModel model, string queueName)
198 : this(model, new PublicationAddress(ExchangeType.Direct, "", queueName)) {}
200 ///<summary>Construct an instance that will deliver to the
201 ///named and typed exchange, with the given routing
203 public SimpleRpcClient(IModel model, string exchange,
204 string exchangeType, string routingKey)
205 : this(model, new PublicationAddress(exchangeType, exchange, routingKey)) {}
207 ///<summary>Construct an instance that will deliver to the
208 ///given address.</summary>
209 public SimpleRpcClient(IModel model, PublicationAddress address)
213 m_subscription = null;
214 m_timeout = Timeout.Infinite;
217 ///<summary>Close the reply subscription associated with this instance, if any.</summary>
219 /// Simply delegates to calling Subscription.Close(). Clears
220 /// the Subscription property, so that subsequent Call()s, if
221 /// any, will re-initialize it to a fresh Subscription
226 if (m_subscription != null) {
227 m_subscription.Close();
228 m_subscription = null;
232 ///<summary>Should initialise m_subscription to be non-null
233 ///and usable for fetching RPC replies from the service
234 ///through the AMQP server.</summary>
235 protected virtual void EnsureSubscription()
237 if (m_subscription == null) {
238 m_subscription = new Subscription(m_model);
242 ///<summary>Sends a "jms/stream-message"-encoded RPC request,
243 ///and expects an RPC reply in the same format.</summary>
246 /// The arguments passed in must be of types that are
247 /// representable as JMS StreamMessage values, and so must the
248 /// results returned from the service in its reply message.
251 /// Calls OnTimedOut() and OnDisconnected() when a timeout or
252 /// disconnection, respectively, is detected when waiting for
256 /// Returns null if the request timed out or if we were
257 /// disconnected before a reply arrived.
260 /// The reply message, if any, is acknowledged to the AMQP
261 /// server via Subscription.Ack().
264 ///<see cref="IStreamMessageBuilder"/>
265 ///<see cref="IStreamMessageReader"/>
266 public virtual object[] Call(params object[] args)
268 IStreamMessageBuilder builder = new StreamMessageBuilder(m_model);
269 builder.WriteObjects(args);
270 IBasicProperties replyProperties;
271 byte[] replyBody = Call((IBasicProperties) builder.GetContentHeader(),
272 builder.GetContentBody(),
273 out replyProperties);
274 if (replyProperties == null) {
277 if (replyProperties.ContentType != StreamMessageBuilder.MimeType) {
278 throw new ProtocolViolationException
279 (string.Format("Expected reply of MIME type {0}; got {1}",
280 StreamMessageBuilder.MimeType,
281 replyProperties.ContentType));
283 IStreamMessageReader reader = new StreamMessageReader(replyProperties, replyBody);
284 return reader.ReadObjects();
287 ///<summary>Sends a simple byte[] message, without any custom
288 ///headers or properties.</summary>
291 /// Delegates directly to Call(IBasicProperties, byte[]), and
292 /// discards the properties of the received reply, returning
293 /// only the body of the reply.
296 /// Calls OnTimedOut() and OnDisconnected() when a timeout or
297 /// disconnection, respectively, is detected when waiting for
301 /// Returns null if the request timed out or if we were
302 /// disconnected before a reply arrived.
305 /// The reply message, if any, is acknowledged to the AMQP
306 /// server via Subscription.Ack().
309 public virtual byte[] Call(byte[] body)
311 BasicDeliverEventArgs reply = Call(null, body);
312 return reply == null ? null : reply.Body;
315 ///<summary>Sends a byte[] message and IBasicProperties
316 ///header, returning both the body and headers of the received
320 /// Sets the "replyProperties" outbound parameter to the
321 /// properties of the received reply, and returns the byte[]
322 /// body of the reply.
325 /// Calls OnTimedOut() and OnDisconnected() when a timeout or
326 /// disconnection, respectively, is detected when waiting for
330 /// Both sets "replyProperties" to null and returns null when
331 /// either the request timed out or we were disconnected
332 /// before a reply arrived.
335 /// The reply message, if any, is acknowledged to the AMQP
336 /// server via Subscription.Ack().
339 public virtual byte[] Call(IBasicProperties requestProperties,
341 out IBasicProperties replyProperties)
343 BasicDeliverEventArgs reply = Call(requestProperties, body);
345 replyProperties = null;
348 replyProperties = reply.BasicProperties;
353 ///<summary>Sends a byte[]/IBasicProperties RPC request,
354 ///returning full information about the delivered reply as a
355 ///BasicDeliverEventArgs.</summary>
358 /// This is the most general/lowest-level Call()-style method
359 /// on SimpleRpcClient. It sets CorrelationId and ReplyTo on
360 /// the request message's headers before transmitting the
361 /// request to the service via the AMQP server. If the reply's
362 /// CorrelationId does not match the request's CorrelationId,
363 /// ProtocolViolationException will be thrown.
366 /// Calls OnTimedOut() and OnDisconnected() when a timeout or
367 /// disconnection, respectively, is detected when waiting for
371 /// Returns null if the request timed out or if we were
372 /// disconnected before a reply arrived.
375 /// The reply message, if any, is acknowledged to the AMQP
376 /// server via Subscription.Ack().
379 ///<see cref="ProtocolViolationException"/>
380 public virtual BasicDeliverEventArgs Call(IBasicProperties requestProperties, byte[] body)
382 EnsureSubscription();
384 if (requestProperties == null) {
385 requestProperties = m_model.CreateBasicProperties();
387 requestProperties.CorrelationId = Guid.NewGuid().ToString();
388 requestProperties.ReplyTo = m_subscription.QueueName;
390 Cast(requestProperties, body);
391 return RetrieveReply(requestProperties.CorrelationId);
394 ///<summary>Retrieves the reply for the request with the given
395 ///correlation ID from our internal Subscription.</summary>
397 /// Currently requires replies to arrive in the same order as
398 /// the requests were sent out. Subclasses may override this
399 /// to provide more sophisticated behaviour.
401 protected virtual BasicDeliverEventArgs RetrieveReply(string correlationId)
403 BasicDeliverEventArgs reply;
404 if (!m_subscription.Next(m_timeout, out reply)) {
414 if (reply.BasicProperties.CorrelationId != correlationId) {
415 throw new ProtocolViolationException
416 (string.Format("Wrong CorrelationId in reply; expected {0}, got {1}",
418 reply.BasicProperties.CorrelationId));
421 m_subscription.Ack(reply);
425 ///<summary>Sends an asynchronous/one-way message to the
426 ///service.</summary>
427 public virtual void Cast(IBasicProperties requestProperties,
430 m_model.BasicPublish(Address,
435 ///<summary>Signals that the configured timeout fired while
436 ///waiting for an RPC reply.</summary>
438 /// Fires the TimedOut event.
440 public virtual void OnTimedOut()
442 if (TimedOut != null)
443 TimedOut(this, null);
446 ///<summary>Signals that the Subscription we use for receiving
447 ///our RPC replies was disconnected while we were
448 ///waiting.</summary>
450 /// Fires the Disconnected event.
452 public virtual void OnDisconnected()
454 if (Disconnected != null)
455 Disconnected(this, null);
458 ///<summary>Implement the IDisposable interface, permitting
459 ///SimpleRpcClient instances to be used in using
460 ///statements.</summary>
461 void IDisposable.Dispose()