1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
6 //---------------------------------------------------------------------------
7 // Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial
8 // Technologies LLC., and Rabbit Technologies Ltd.
10 // Licensed under the Apache License, Version 2.0 (the "License");
11 // you may not use this file except in compliance with the License.
12 // You may obtain a copy of the License at
14 // http://www.apache.org/licenses/LICENSE-2.0
16 // Unless required by applicable law or agreed to in writing, software
17 // distributed under the License is distributed on an "AS IS" BASIS,
18 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 // See the License for the specific language governing permissions and
20 // limitations under the License.
21 //---------------------------------------------------------------------------
25 //---------------------------------------------------------------------------
26 // The contents of this file are subject to the Mozilla Public License
27 // Version 1.1 (the "License"); you may not use this file except in
28 // compliance with the License. You may obtain a copy of the License at
29 // http://www.rabbitmq.com/mpl.html
31 // Software distributed under the License is distributed on an "AS IS"
32 // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 // License for the specific language governing rights and limitations
36 // The Original Code is The RabbitMQ .NET Client.
38 // The Initial Developers of the Original Code are LShift Ltd,
39 // Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
41 // Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42 // Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43 // are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44 // Technologies LLC, and Rabbit Technologies Ltd.
46 // Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
47 // Ltd. Portions created by Cohesive Financial Technologies LLC are
48 // Copyright (C) 2007-2010 Cohesive Financial Technologies
49 // LLC. Portions created by Rabbit Technologies Ltd are Copyright
50 // (C) 2007-2010 Rabbit Technologies Ltd.
52 // All Rights Reserved.
54 // Contributor(s): ______________________________________.
56 //---------------------------------------------------------------------------
60 using RabbitMQ.Client;
61 using RabbitMQ.Client.Content;
62 using RabbitMQ.Client.Events;
64 namespace RabbitMQ.Client.MessagePatterns {
65 ///<summary>Implements a simple RPC service, responding to
66 ///requests received via a Subscription.</summary>
69 /// This class interprets requests such as those sent by instances
70 /// of SimpleRpcClient.
73 /// The basic pattern for implementing a service is to subclass
74 /// SimpleRpcServer, overriding HandleCall and HandleCast as
75 /// appropriate, and then to create a Subscription object for
76 /// receiving requests from clients, and start an instance of the
77 /// SimpleRpcServer subclass with the Subscription.
80 /// string queueName = "ServiceRequestQueue"; // See also Subscription ctors
81 /// using (IConnection conn = new ConnectionFactory()
82 /// .CreateConnection(serverAddress)) {
83 /// using (IModel ch = conn.CreateModel()) {
84 /// Subscription sub = new Subscription(ch, queueName);
85 /// new MySimpleRpcServerSubclass(sub).MainLoop();
90 /// Note that this class itself does not declare any resources
91 /// (exchanges, queues or bindings). The Subscription we use for
92 /// receiving RPC requests should have already declared all the
93 /// resources we need. See the Subscription constructors and the
94 /// Subscription.Bind method.
97 /// If you are implementing a service that responds to
98 /// "jms/stream-message"-formatted requests (as implemented by
99 /// RabbitMQ.Client.Content.IStreamMessageReader), override
100 /// HandleStreamMessageCall. Otherwise, override HandleSimpleCall
101 /// or HandleCall as appropriate. Asynchronous, one-way requests
102 /// are dealt with by HandleCast etc.
105 /// Every time a request is successfully received and processed
106 /// within the server's MainLoop, the request message is Ack()ed
107 /// using Subscription.Ack before the next request is
108 /// retrieved. This causes the Subscription object to take care of
109 /// acknowledging receipt and processing of the request message.
112 /// If transactional service is enabled, via SetTransactional(),
113 /// then after every successful ProcessRequest, IModel.TxCommit is
114 /// called. Making use of transactional service has effects on all
115 /// parts of the application that share an IModel instance,
116 /// completely changing the style of interaction with the AMQP
117 /// server. For this reason, it is initially disabled, and must be
118 /// explicitly enabled with a call to SetTransactional(). Please
119 /// see the documentation for SetTransactional() for details.
122 /// To stop a running RPC server, call Close(). This will in turn
123 /// Close() the Subscription, which will cause MainLoop() to
124 /// return to its caller.
127 /// Unless overridden, ProcessRequest examines properties in the
128 /// request content header, and uses them to dispatch to one of
129 /// the Handle[...]() methods. See the documentation for
130 /// ProcessRequest and each Handle[...] method for details.
133 ///<see cref="SimpleRpcClient"/>
134 public class SimpleRpcServer: IDisposable {
135 protected Subscription m_subscription;
136 private bool m_transactional;
138 ///<summary>Returns true if we are in "transactional" mode, or
139 ///false if we are not.</summary>
140 public bool Transactional { get { return m_transactional; } }
142 ///<summary>Create, but do not start, an instance that will
143 ///receive requests via the given Subscription.</summary>
146 /// The instance is initially in non-transactional mode. See
147 /// SetTransactional().
150 /// Call MainLoop() to start the request-processing loop.
153 public SimpleRpcServer(Subscription subscription)
155 m_subscription = subscription;
156 m_transactional = false;
159 ///<summary>Shut down the server, causing MainLoop() to return
160 ///to its caller.</summary>
162 /// Acts by calling Close() on the server's Subscription object.
166 m_subscription.Close();
169 ///<summary>Enables transactional mode.</summary>
172 /// Once enabled, transactional mode is not only enabled for
173 /// all users of the underlying IModel instance, but cannot be
174 /// disabled without shutting down the entire IModel (which
175 /// involves shutting down all the services depending on it,
176 /// and should not be undertaken lightly).
179 /// This method calls IModel.TxSelect, every time it is
180 /// called. (TxSelect is idempotent, so this is harmless.)
183 public void SetTransactional() {
184 m_subscription.Model.TxSelect();
185 m_transactional = true;
188 ///<summary>Enters the main loop of the RPC service.</summary>
191 /// Retrieves requests repeatedly from the service's
192 /// subscription. Each request is passed to
193 /// ProcessRequest. Once ProcessRequest returns, the request
194 /// is acknowledged via Subscription.Ack(). If transactional
195 /// mode is enabled, TxCommit is then called. Finally, the
196 /// loop begins again.
199 /// Runs until the subscription ends, which happens either as
200 /// a result of disconnection, or of a call to Close().
203 public void MainLoop()
205 foreach (BasicDeliverEventArgs evt in m_subscription) {
207 m_subscription.Ack();
208 if (m_transactional) {
209 m_subscription.Model.TxCommit();
214 ///<summary>Process a single request received from our
215 ///subscription.</summary>
218 /// If the request's properties contain a non-null, non-empty
219 /// CorrelationId string (see IBasicProperties), it is assumed
220 /// to be a two-way call, requiring a response. The ReplyTo
221 /// header property is used as the reply address (via
222 /// PublicationAddress.Parse, unless that fails, in which case it
223 /// is treated as a simple queue name), and the request is
224 /// passed to HandleCall().
227 /// If the CorrelationId is absent or empty, the request is
228 /// treated as one-way asynchronous event, and is passed to
232 /// Usually, overriding HandleCall(), HandleCast(), or one of
233 /// their delegates is sufficient to implement a service, but
234 /// in some cases overriding ProcessRequest() is
235 /// required. Overriding ProcessRequest() gives the
236 /// opportunity to implement schemes for detecting interaction
237 /// patterns other than simple request/response or one-way
241 public virtual void ProcessRequest(BasicDeliverEventArgs evt)
243 IBasicProperties properties = evt.BasicProperties;
244 if (properties.ReplyTo != null && properties.ReplyTo != "") {
247 PublicationAddress replyAddress = PublicationAddress.Parse(properties.ReplyTo);
248 if (replyAddress == null) {
249 replyAddress = new PublicationAddress(ExchangeType.Direct,
254 IBasicProperties replyProperties;
255 byte[] reply = HandleCall(evt.Redelivered,
258 out replyProperties);
259 if (replyProperties == null) {
260 replyProperties = m_subscription.Model.CreateBasicProperties();
263 replyProperties.CorrelationId = properties.CorrelationId;
264 m_subscription.Model.BasicPublish(replyAddress,
268 // It's an asynchronous message.
269 HandleCast(evt.Redelivered, properties, evt.Body);
273 ///<summary>Called by HandleCall and HandleCast when a
274 ///"jms/stream-message" request is received.</summary>
277 /// The args array contains the values decoded by HandleCall
281 /// The replyWriter parameter will be null if we were called
282 /// from HandleCast, in which case a reply is not expected or
283 /// possible, or non-null if we were called from
284 /// HandleCall. Use the methods of replyWriter in this case to
285 /// assemble your reply, which will be sent back to the remote
289 /// This default implementation does nothing, which
290 /// effectively sends back an empty reply to any and all
294 public virtual void HandleStreamMessageCall(IStreamMessageBuilder replyWriter,
296 IBasicProperties requestProperties,
299 // Override to do something with the request.
302 ///<summary>Called by ProcessRequest(), this is the most
303 ///general method that handles RPC-style requests.</summary>
306 /// This method should map requestProperties and body to
307 /// replyProperties and the returned byte array.
310 /// The default implementation checks
311 /// requestProperties.ContentType, and if it is
312 /// "jms/stream-message" (i.e. the current value of
313 /// StreamMessageBuilder.MimeType), parses it using
314 /// StreamMessageReader and delegates to
315 /// HandleStreamMessageCall before encoding and returning the
316 /// reply. If the ContentType is any other value, the request
317 /// is passed to HandleSimpleCall instead.
320 /// The isRedelivered flag is true when the server knows for
321 /// sure that it has tried to send this request previously
322 /// (although not necessarily to this application). It is not
323 /// a reliable indicator of previous receipt, however - the
324 /// only claim it makes is that a delivery attempt was made,
325 /// not that the attempt succeeded. Be careful if you choose
326 /// to use the isRedelivered flag.
329 public virtual byte[] HandleCall(bool isRedelivered,
330 IBasicProperties requestProperties,
332 out IBasicProperties replyProperties)
334 if (requestProperties.ContentType == StreamMessageBuilder.MimeType) {
335 IStreamMessageReader r = new StreamMessageReader(requestProperties, body);
336 IStreamMessageBuilder w = new StreamMessageBuilder(m_subscription.Model);
337 HandleStreamMessageCall(w,
341 replyProperties = (IBasicProperties) w.GetContentHeader();
342 return w.GetContentBody();
344 return HandleSimpleCall(isRedelivered,
347 out replyProperties);
351 ///<summary>Called by the default HandleCall() implementation
352 ///as a fallback.</summary>
354 /// If the MIME ContentType of the request did not match any
355 /// of the types specially recognised
356 /// (e.g. "jms/stream-message"), this method is called instead
357 /// with the raw bytes of the request. It should fill in
358 /// replyProperties (or set it to null) and return a byte
359 /// array to send back to the remote caller as a reply
362 public virtual byte[] HandleSimpleCall(bool isRedelivered,
363 IBasicProperties requestProperties,
365 out IBasicProperties replyProperties)
367 // Override to do something with the request.
368 replyProperties = null;
372 ///<summary>Called by ProcessRequest(), this is the most
373 ///general method that handles asynchronous, one-way
374 ///requests.</summary>
377 /// The default implementation checks
378 /// requestProperties.ContentType, and if it is
379 /// "jms/stream-message" (i.e. the current value of
380 /// StreamMessageBuilder.MimeType), parses it using
381 /// StreamMessageReader and delegates to
382 /// HandleStreamMessageCall, passing in null as the
383 /// replyWriter parameter to indicate that no reply is desired
384 /// or possible. If the ContentType is any other value, the
385 /// request is passed to HandleSimpleCast instead.
388 /// The isRedelivered flag is true when the server knows for
389 /// sure that it has tried to send this request previously
390 /// (although not necessarily to this application). It is not
391 /// a reliable indicator of previous receipt, however - the
392 /// only claim it makes is that a delivery attempt was made,
393 /// not that the attempt succeeded. Be careful if you choose
394 /// to use the isRedelivered flag.
397 public virtual void HandleCast(bool isRedelivered,
398 IBasicProperties requestProperties,
401 if (requestProperties.ContentType == StreamMessageBuilder.MimeType) {
402 IStreamMessageReader r = new StreamMessageReader(requestProperties, body);
403 HandleStreamMessageCall(null,
408 HandleSimpleCast(isRedelivered,
414 ///<summary>Called by the default HandleCast() implementation
415 ///as a fallback.</summary>
417 /// If the MIME ContentType of the request did not match any
418 /// of the types specially recognised
419 /// (e.g. "jms/stream-message"), this method is called instead
420 /// with the raw bytes of the request.
422 public virtual void HandleSimpleCast(bool isRedelivered,
423 IBasicProperties requestProperties,
426 // Override to do something with the request.
429 ///<summary>Implement the IDisposable interface, permitting
430 ///SimpleRpcServer instances to be used in using
431 ///statements.</summary>
432 void IDisposable.Dispose()