2010-06-23: Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mcs / class / RabbitMQ.Client / src / client / messagepatterns / SimpleRpcServer.cs
1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
3 //
4 // The APL v2.0:
5 //
6 //---------------------------------------------------------------------------
7 //   Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial
8 //   Technologies LLC., and Rabbit Technologies Ltd.
9 //
10 //   Licensed under the Apache License, Version 2.0 (the "License");
11 //   you may not use this file except in compliance with the License.
12 //   You may obtain a copy of the License at
13 //
14 //       http://www.apache.org/licenses/LICENSE-2.0
15 //
16 //   Unless required by applicable law or agreed to in writing, software
17 //   distributed under the License is distributed on an "AS IS" BASIS,
18 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 //   See the License for the specific language governing permissions and
20 //   limitations under the License.
21 //---------------------------------------------------------------------------
22 //
23 // The MPL v1.1:
24 //
25 //---------------------------------------------------------------------------
26 //   The contents of this file are subject to the Mozilla Public License
27 //   Version 1.1 (the "License"); you may not use this file except in
28 //   compliance with the License. You may obtain a copy of the License at
29 //   http://www.rabbitmq.com/mpl.html
30 //
31 //   Software distributed under the License is distributed on an "AS IS"
32 //   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 //   License for the specific language governing rights and limitations
34 //   under the License.
35 //
36 //   The Original Code is The RabbitMQ .NET Client.
37 //
38 //   The Initial Developers of the Original Code are LShift Ltd,
39 //   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40 //
41 //   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42 //   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43 //   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44 //   Technologies LLC, and Rabbit Technologies Ltd.
45 //
46 //   Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
47 //   Ltd. Portions created by Cohesive Financial Technologies LLC are
48 //   Copyright (C) 2007-2010 Cohesive Financial Technologies
49 //   LLC. Portions created by Rabbit Technologies Ltd are Copyright
50 //   (C) 2007-2010 Rabbit Technologies Ltd.
51 //
52 //   All Rights Reserved.
53 //
54 //   Contributor(s): ______________________________________.
55 //
56 //---------------------------------------------------------------------------
57 using System;
58 using System.IO;
59
60 using RabbitMQ.Client;
61 using RabbitMQ.Client.Content;
62 using RabbitMQ.Client.Events;
63
64 namespace RabbitMQ.Client.MessagePatterns {
65     ///<summary>Implements a simple RPC service, responding to
66     ///requests received via a Subscription.</summary>
67     ///<remarks>
68     ///<para>
69     /// This class interprets requests such as those sent by instances
70     /// of SimpleRpcClient.
71     ///</para>
72     ///<para>
73     /// The basic pattern for implementing a service is to subclass
74     /// SimpleRpcServer, overriding HandleCall and HandleCast as
75     /// appropriate, and then to create a Subscription object for
76     /// receiving requests from clients, and start an instance of the
77     /// SimpleRpcServer subclass with the Subscription.
78     ///</para>
79     ///<example><code>
80     /// string queueName = "ServiceRequestQueue"; // See also Subscription ctors
81     /// using (IConnection conn = new ConnectionFactory()
82     ///                                 .CreateConnection(serverAddress)) {
83     ///     using (IModel ch = conn.CreateModel()) {
84     ///         Subscription sub = new Subscription(ch, queueName);
85     ///         new MySimpleRpcServerSubclass(sub).MainLoop();
86     ///     }
87     /// }
88     ///</code></example>
89     ///<para>
90     /// Note that this class itself does not declare any resources
91     /// (exchanges, queues or bindings). The Subscription we use for
92     /// receiving RPC requests should have already declared all the
93     /// resources we need. See the Subscription constructors and the
94     /// Subscription.Bind method.
95     ///</para>
96     ///<para>
97     /// If you are implementing a service that responds to
98     /// "jms/stream-message"-formatted requests (as implemented by
99     /// RabbitMQ.Client.Content.IStreamMessageReader), override
100     /// HandleStreamMessageCall. Otherwise, override HandleSimpleCall
101     /// or HandleCall as appropriate. Asynchronous, one-way requests
102     /// are dealt with by HandleCast etc.
103     ///</para>
104     ///<para>
105     /// Every time a request is successfully received and processed
106     /// within the server's MainLoop, the request message is Ack()ed
107     /// using Subscription.Ack before the next request is
108     /// retrieved. This causes the Subscription object to take care of
109     /// acknowledging receipt and processing of the request message.
110     ///</para>
111     ///<para>
112     /// If transactional service is enabled, via SetTransactional(),
113     /// then after every successful ProcessRequest, IModel.TxCommit is
114     /// called. Making use of transactional service has effects on all
115     /// parts of the application that share an IModel instance,
116     /// completely changing the style of interaction with the AMQP
117     /// server. For this reason, it is initially disabled, and must be
118     /// explicitly enabled with a call to SetTransactional(). Please
119     /// see the documentation for SetTransactional() for details.
120     ///</para>
121     ///<para>
122     /// To stop a running RPC server, call Close(). This will in turn
123     /// Close() the Subscription, which will cause MainLoop() to
124     /// return to its caller.
125     ///</para>
126     ///<para>
127     /// Unless overridden, ProcessRequest examines properties in the
128     /// request content header, and uses them to dispatch to one of
129     /// the Handle[...]() methods. See the documentation for
130     /// ProcessRequest and each Handle[...] method for details.
131     ///</para>
132     ///</remarks>
133     ///<see cref="SimpleRpcClient"/>
134     public class SimpleRpcServer: IDisposable {
135         protected Subscription m_subscription;
136         private bool m_transactional;
137
138         ///<summary>Returns true if we are in "transactional" mode, or
139         ///false if we are not.</summary>
140         public bool Transactional { get { return m_transactional; } }
141
142         ///<summary>Create, but do not start, an instance that will
143         ///receive requests via the given Subscription.</summary>
144         ///<remarks>
145         ///<para>
146         /// The instance is initially in non-transactional mode. See
147         /// SetTransactional().
148         ///</para>
149         ///<para>
150         /// Call MainLoop() to start the request-processing loop.
151         ///</para>
152         ///</remarks>
153         public SimpleRpcServer(Subscription subscription)
154         {
155             m_subscription = subscription;
156             m_transactional = false;
157         }
158
159         ///<summary>Shut down the server, causing MainLoop() to return
160         ///to its caller.</summary>
161         ///<remarks>
162         /// Acts by calling Close() on the server's Subscription object.
163         ///</remarks>
164         public void Close()
165         {
166             m_subscription.Close();
167         }
168
169         ///<summary>Enables transactional mode.</summary>
170         ///<remarks>
171         ///<para>
172         /// Once enabled, transactional mode is not only enabled for
173         /// all users of the underlying IModel instance, but cannot be
174         /// disabled without shutting down the entire IModel (which
175         /// involves shutting down all the services depending on it,
176         /// and should not be undertaken lightly).
177         ///</para>
178         ///<para>
179         /// This method calls IModel.TxSelect, every time it is
180         /// called. (TxSelect is idempotent, so this is harmless.)
181         ///</para>
182         ///</remarks>
183         public void SetTransactional() {
184             m_subscription.Model.TxSelect();
185             m_transactional = true;
186         }
187
188         ///<summary>Enters the main loop of the RPC service.</summary>
189         ///<remarks>
190         ///<para>
191         /// Retrieves requests repeatedly from the service's
192         /// subscription. Each request is passed to
193         /// ProcessRequest. Once ProcessRequest returns, the request
194         /// is acknowledged via Subscription.Ack(). If transactional
195         /// mode is enabled, TxCommit is then called. Finally, the
196         /// loop begins again.
197         ///</para>
198         ///<para>
199         /// Runs until the subscription ends, which happens either as
200         /// a result of disconnection, or of a call to Close().
201         ///</para>
202         ///</remarks>
203         public void MainLoop()
204         {
205             foreach (BasicDeliverEventArgs evt in m_subscription) {
206                 ProcessRequest(evt);
207                 m_subscription.Ack();
208                 if (m_transactional) {
209                     m_subscription.Model.TxCommit();
210                 }
211             }
212         }
213
214         ///<summary>Process a single request received from our
215         ///subscription.</summary>
216         ///<remarks>
217         ///<para>
218         /// If the request's properties contain a non-null, non-empty
219         /// CorrelationId string (see IBasicProperties), it is assumed
220         /// to be a two-way call, requiring a response. The ReplyTo
221         /// header property is used as the reply address (via
222         /// PublicationAddress.Parse, unless that fails, in which case it
223         /// is treated as a simple queue name), and the request is
224         /// passed to HandleCall().
225         ///</para>
226         ///<para>
227         /// If the CorrelationId is absent or empty, the request is
228         /// treated as one-way asynchronous event, and is passed to
229         /// HandleCast().
230         ///</para>
231         ///<para>
232         /// Usually, overriding HandleCall(), HandleCast(), or one of
233         /// their delegates is sufficient to implement a service, but
234         /// in some cases overriding ProcessRequest() is
235         /// required. Overriding ProcessRequest() gives the
236         /// opportunity to implement schemes for detecting interaction
237         /// patterns other than simple request/response or one-way
238         /// communication.
239         ///</para>
240         ///</remarks>
241         public virtual void ProcessRequest(BasicDeliverEventArgs evt)
242         {
243             IBasicProperties properties = evt.BasicProperties;
244             if (properties.ReplyTo != null && properties.ReplyTo != "") {
245                 // It's a request.
246
247                 PublicationAddress replyAddress = PublicationAddress.Parse(properties.ReplyTo);
248                 if (replyAddress == null) {
249                     replyAddress = new PublicationAddress(ExchangeType.Direct,
250                                                           "",
251                                                           properties.ReplyTo);
252                 }
253
254                 IBasicProperties replyProperties;
255                 byte[] reply = HandleCall(evt.Redelivered,
256                                           properties,
257                                           evt.Body,
258                                           out replyProperties);
259                 if (replyProperties == null) {
260                     replyProperties = m_subscription.Model.CreateBasicProperties();
261                 }
262
263                 replyProperties.CorrelationId = properties.CorrelationId;
264                 m_subscription.Model.BasicPublish(replyAddress,
265                                                   replyProperties,
266                                                   reply);
267             } else {
268                 // It's an asynchronous message.
269                 HandleCast(evt.Redelivered, properties, evt.Body);
270             }
271         }
272
273         ///<summary>Called by HandleCall and HandleCast when a
274         ///"jms/stream-message" request is received.</summary>
275         ///<remarks>
276         ///<para>
277         /// The args array contains the values decoded by HandleCall
278         /// or HandleCast.
279         ///</para>
280         ///<para>
281         /// The replyWriter parameter will be null if we were called
282         /// from HandleCast, in which case a reply is not expected or
283         /// possible, or non-null if we were called from
284         /// HandleCall. Use the methods of replyWriter in this case to
285         /// assemble your reply, which will be sent back to the remote
286         /// caller.
287         ///</para>
288         ///<para>
289         /// This default implementation does nothing, which
290         /// effectively sends back an empty reply to any and all
291         /// remote callers.
292         ///</para>
293         ///</remarks>
294         public virtual void HandleStreamMessageCall(IStreamMessageBuilder replyWriter,
295                                                     bool isRedelivered,
296                                                     IBasicProperties requestProperties,
297                                                     object[] args)
298         {
299             // Override to do something with the request.
300         }
301
302         ///<summary>Called by ProcessRequest(), this is the most
303         ///general method that handles RPC-style requests.</summary>
304         ///<remarks>
305         ///<para>
306         /// This method should map requestProperties and body to
307         /// replyProperties and the returned byte array.
308         ///</para>
309         ///<para>
310         /// The default implementation checks
311         /// requestProperties.ContentType, and if it is
312         /// "jms/stream-message" (i.e. the current value of
313         /// StreamMessageBuilder.MimeType), parses it using
314         /// StreamMessageReader and delegates to
315         /// HandleStreamMessageCall before encoding and returning the
316         /// reply. If the ContentType is any other value, the request
317         /// is passed to HandleSimpleCall instead.
318         ///</para>
319         ///<para>
320         /// The isRedelivered flag is true when the server knows for
321         /// sure that it has tried to send this request previously
322         /// (although not necessarily to this application). It is not
323         /// a reliable indicator of previous receipt, however - the
324         /// only claim it makes is that a delivery attempt was made,
325         /// not that the attempt succeeded. Be careful if you choose
326         /// to use the isRedelivered flag.
327         ///</para>
328         ///</remarks>
329         public virtual byte[] HandleCall(bool isRedelivered,
330                                          IBasicProperties requestProperties,
331                                          byte[] body,
332                                          out IBasicProperties replyProperties)
333         {
334             if (requestProperties.ContentType == StreamMessageBuilder.MimeType) {
335                 IStreamMessageReader r = new StreamMessageReader(requestProperties, body);
336                 IStreamMessageBuilder w = new StreamMessageBuilder(m_subscription.Model);
337                 HandleStreamMessageCall(w,
338                                         isRedelivered,
339                                         requestProperties,
340                                         r.ReadObjects());
341                 replyProperties = (IBasicProperties) w.GetContentHeader();
342                 return w.GetContentBody();
343             } else {
344                 return HandleSimpleCall(isRedelivered,
345                                         requestProperties,
346                                         body,
347                                         out replyProperties);
348             }
349         }
350
351         ///<summary>Called by the default HandleCall() implementation
352         ///as a fallback.</summary>
353         ///<remarks>
354         /// If the MIME ContentType of the request did not match any
355         /// of the types specially recognised
356         /// (e.g. "jms/stream-message"), this method is called instead
357         /// with the raw bytes of the request. It should fill in
358         /// replyProperties (or set it to null) and return a byte
359         /// array to send back to the remote caller as a reply
360         /// message.
361         ///</remarks>
362         public virtual byte[] HandleSimpleCall(bool isRedelivered,
363                                                IBasicProperties requestProperties,
364                                                byte[] body,
365                                                out IBasicProperties replyProperties)
366         {
367             // Override to do something with the request.
368             replyProperties = null;
369             return null;
370         }
371
372         ///<summary>Called by ProcessRequest(), this is the most
373         ///general method that handles asynchronous, one-way
374         ///requests.</summary>
375         ///<remarks>
376         ///<para>
377         /// The default implementation checks
378         /// requestProperties.ContentType, and if it is
379         /// "jms/stream-message" (i.e. the current value of
380         /// StreamMessageBuilder.MimeType), parses it using
381         /// StreamMessageReader and delegates to
382         /// HandleStreamMessageCall, passing in null as the
383         /// replyWriter parameter to indicate that no reply is desired
384         /// or possible. If the ContentType is any other value, the
385         /// request is passed to HandleSimpleCast instead.
386         ///</para>
387         ///<para>
388         /// The isRedelivered flag is true when the server knows for
389         /// sure that it has tried to send this request previously
390         /// (although not necessarily to this application). It is not
391         /// a reliable indicator of previous receipt, however - the
392         /// only claim it makes is that a delivery attempt was made,
393         /// not that the attempt succeeded. Be careful if you choose
394         /// to use the isRedelivered flag.
395         ///</para>
396         ///</remarks>
397         public virtual void HandleCast(bool isRedelivered,
398                                        IBasicProperties requestProperties,
399                                        byte[] body)
400         {
401             if (requestProperties.ContentType == StreamMessageBuilder.MimeType) {
402                 IStreamMessageReader r = new StreamMessageReader(requestProperties, body);
403                 HandleStreamMessageCall(null,
404                                         isRedelivered,
405                                         requestProperties,
406                                         r.ReadObjects());
407             } else {
408                 HandleSimpleCast(isRedelivered,
409                                  requestProperties,
410                                  body);
411             }
412         }
413
414         ///<summary>Called by the default HandleCast() implementation
415         ///as a fallback.</summary>
416         ///<remarks>
417         /// If the MIME ContentType of the request did not match any
418         /// of the types specially recognised
419         /// (e.g. "jms/stream-message"), this method is called instead
420         /// with the raw bytes of the request.
421         ///</remarks>
422         public virtual void HandleSimpleCast(bool isRedelivered,
423                                              IBasicProperties requestProperties,
424                                              byte[] body)
425         {
426             // Override to do something with the request.
427         }
428
429         ///<summary>Implement the IDisposable interface, permitting
430         ///SimpleRpcServer instances to be used in using
431         ///statements.</summary>
432         void IDisposable.Dispose()
433         {
434             Close();
435         }
436     }
437 }