2009-07-11 Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / MessageFactory.cs
1 //
2 // Mono.Messaging.RabbitMQ
3 //
4 // Authors:
5 //        Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 // 
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 // 
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30
31 using System;
32 using System.Collections;
33 using System.IO;
34 using System.Text;
35
36 using Mono.Messaging;
37
38 using RabbitMQ.Client;
39 using RabbitMQ.Client.Content;
40 using RabbitMQ.Client.Events;
41 using RabbitMQ.Client.Exceptions;
42 using RabbitMQ.Client.MessagePatterns;
43 using RabbitMQ.Util;
44
45 namespace Mono.Messaging.RabbitMQ {
46
47         public class MessageFactory {
48                 
49                 private static readonly string SENDER_VERSION_KEY = "SenderVersion";
50                 private static readonly string SOURCE_MACHINE_KEY = "SourceMachine";
51                 private static readonly string BODY_TYPE_KEY = "BodyType";
52                 private static readonly string ACKNOWLEDGE_TYPE_KEY = "AcknowledgeType";
53                 private static readonly string ADMINISTRATION_QUEUE_KEY = "AdministrationQueue";
54                 private static readonly string APP_SPECIFIC_KEY = "AppSpecific";
55                 private static readonly string AUTHENTICATION_PROVIDER_NAME_KEY = "AuthenticationProviderName";
56                 private static readonly string AUTHENTICATION_PROVIDER_TYPE_KEY = "AuthenticationProviderType";
57                 private static readonly string CONNECTOR_TYPE_KEY = "ConnectorType";
58                 private static readonly string DESTINATION_SYMMETRIC_KEY_KEY = "DestinationSymmetricKey";
59                 private static readonly string DIGITAL_SIGNATURE_KEY = "DigitalSignature";
60                 private static readonly string ENCRYPTION_ALGORITHM_KEY = "EncryptionAlgorithm";
61                 private static readonly string EXTENSION_KEY = "Extension";
62                 private static readonly string HASH_ALGORITHM_KEY = "HashAlgorithm";
63                 private static readonly string LABEL_KEY = "Label";
64                 private static readonly string SENDER_CERTIFICATE_KEY = "SenderCertificate";
65                 private static readonly string TIME_TO_BE_RECEIVED_KEY = "TimeToBeReceived";
66                 private static readonly string TIME_TO_REACH_QUEUE_KEY = "TimeToReachQueue";
67                 private static readonly string USE_AUTHENTICATION_KEY = "UseAuthentication";
68                 private static readonly string USE_DEAD_LETTER_QUEUE_KEY = "UseDeadLetterQueue";
69                 private static readonly string USE_ENCRYPTION_KEY = "UseEncryption";
70                 private static readonly string TRANSACTION_ID_KEY = "TrandactionId";
71                 
72                 private static readonly int PERSISTENT_DELIVERY_MODE = 2;
73                 
74                 private readonly RabbitMQMessagingProvider provider;
75                 
76                 public MessageFactory (RabbitMQMessagingProvider provider)
77                 {
78                         this.provider = provider;
79                 }
80                 
81                 public IMessageBuilder WriteMessage (IModel ch, IMessage msg)
82                 {
83                         BasicMessageBuilder mb = new BasicMessageBuilder (ch);
84                         mb.Properties.MessageId = msg.Id;
85                         if (msg.CorrelationId != null)
86                                 mb.Properties.CorrelationId = msg.CorrelationId;
87                         // TODO: Change to DateTime.UtcNow??
88                         mb.Properties.Timestamp = MessageFactory.DateTimeToAmqpTimestamp (DateTime.UtcNow);
89                         Hashtable headers = new Hashtable ();
90                         
91                         headers[SENDER_VERSION_KEY] = msg.SenderVersion;
92                         headers[SOURCE_MACHINE_KEY] = (string) System.Environment.MachineName;
93                         headers[BODY_TYPE_KEY] = msg.BodyType;
94                         headers[ACKNOWLEDGE_TYPE_KEY] = (int) msg.AcknowledgeType;
95                         if (msg.AdministrationQueue != null)
96                                 headers[ADMINISTRATION_QUEUE_KEY] = msg.AdministrationQueue.QRef.ToString ();
97                         headers[APP_SPECIFIC_KEY] = msg.AppSpecific;
98                         headers[AUTHENTICATION_PROVIDER_NAME_KEY] = msg.AuthenticationProviderName;
99                         headers[AUTHENTICATION_PROVIDER_TYPE_KEY] = (int) msg.AuthenticationProviderType;
100                         headers[CONNECTOR_TYPE_KEY] = msg.ConnectorType.ToString ();
101                         headers[DESTINATION_SYMMETRIC_KEY_KEY] = msg.DestinationSymmetricKey;
102                         headers[DIGITAL_SIGNATURE_KEY] = msg.DigitalSignature;
103                         headers[ENCRYPTION_ALGORITHM_KEY] = (int) msg.EncryptionAlgorithm;
104                         headers[EXTENSION_KEY] = msg.Extension;
105                         headers[HASH_ALGORITHM_KEY] = (int) msg.HashAlgorithm;
106                         SetValue (headers, LABEL_KEY, msg.Label);
107                         mb.Properties.Priority = (byte) (int) msg.Priority;
108                         mb.Properties.SetPersistent (msg.Recoverable);
109                         if (msg.ResponseQueue != null)
110                                 mb.Properties.ReplyTo = msg.ResponseQueue.QRef.ToString ();
111                         headers[SENDER_CERTIFICATE_KEY] = msg.SenderCertificate;
112                         headers[TIME_TO_BE_RECEIVED_KEY] = msg.TimeToBeReceived.Ticks;
113                         headers[TIME_TO_REACH_QUEUE_KEY] = msg.TimeToReachQueue.Ticks;
114                         SetValue (headers, TRANSACTION_ID_KEY, msg.TransactionId);
115                         headers[USE_AUTHENTICATION_KEY] = msg.UseAuthentication;
116                         headers[USE_DEAD_LETTER_QUEUE_KEY] = msg.UseDeadLetterQueue;
117                         headers[USE_ENCRYPTION_KEY] = msg.UseEncryption;
118                         
119                         mb.Properties.Headers = headers;
120                         Stream s = msg.BodyStream;
121                         s.Seek (0, SeekOrigin.Begin);
122                         byte[] buf = new byte[s.Length];                        
123                         msg.BodyStream.Read (buf, 0, buf.Length);
124                         mb.BodyStream.Write (buf, 0, buf.Length);
125                         return mb;
126                 }
127                 
128                 private static void SetValue (Hashtable headers, string name, object val)
129                 {
130                         if (val != null)
131                                 headers[name] = val;
132                 }
133                 
134                 public IMessage ReadMessage (QueueReference destination, BasicDeliverEventArgs result)
135                 {
136                         /*
137                         if (destination == null)
138                                 throw new ArgumentException ("destination must not be null");
139                         if (result == null)
140                                 throw new ArgumentException ("result must not be null");
141                         */
142                         MessageBase msg = new MessageBase ();
143                         Stream s = new MemoryStream ();
144                         s.Write (result.Body, 0, result.Body.Length);
145                         DateTime arrivedTime = DateTime.Now;
146                         IDictionary headers = result.BasicProperties.Headers;
147                         long senderVersion = (long) headers[SENDER_VERSION_KEY];
148                         string sourceMachine = GetString (headers, SOURCE_MACHINE_KEY);
149                         DateTime sentTime = AmqpTimestampToDateTime (result.BasicProperties.Timestamp);
150                         string transactionId = GetString (headers, TRANSACTION_ID_KEY);
151                         msg.SetDeliveryInfo (Acknowledgment.None,
152                                              arrivedTime,
153                                              new RabbitMQMessageQueue (provider,
154                                                                        destination,
155                                                                        true),
156                                              result.BasicProperties.MessageId,
157                                              MessageType.Normal,
158                                              new byte[0],
159                                              senderVersion,
160                                              sentTime,
161                                              sourceMachine,
162                                              transactionId);
163                         msg.CorrelationId = result.BasicProperties.CorrelationId;
164                         msg.BodyStream = s;
165                         msg.BodyType = (int) result.BasicProperties.Headers[BODY_TYPE_KEY];
166                         msg.AcknowledgeType = (AcknowledgeTypes) 
167                                 Enum.ToObject (typeof (AcknowledgeTypes), 
168                                                headers[ACKNOWLEDGE_TYPE_KEY]);
169                         string adminQueuePath = GetString (headers, ADMINISTRATION_QUEUE_KEY);
170                         if (adminQueuePath != null) {
171                                 QueueReference qRef = QueueReference.Parse (adminQueuePath);
172                                 msg.AdministrationQueue = new RabbitMQMessageQueue (provider,
173                                                                                     qRef,
174                                                                                     true);
175                         }
176                         msg.AppSpecific = (int) headers[APP_SPECIFIC_KEY];
177                         msg.AuthenticationProviderName = GetString (headers, AUTHENTICATION_PROVIDER_NAME_KEY);
178                         msg.AuthenticationProviderType = (CryptographicProviderType) Enum.ToObject (typeof (CryptographicProviderType), headers[AUTHENTICATION_PROVIDER_TYPE_KEY]);
179                         string connectorType = GetString (headers, CONNECTOR_TYPE_KEY);
180                         msg.ConnectorType = new Guid(connectorType);
181                         msg.DestinationSymmetricKey = (byte[]) headers[DESTINATION_SYMMETRIC_KEY_KEY];
182                         msg.DigitalSignature = (byte[]) headers[DIGITAL_SIGNATURE_KEY];
183                         msg.EncryptionAlgorithm = (EncryptionAlgorithm) Enum.ToObject (typeof (EncryptionAlgorithm), headers[ENCRYPTION_ALGORITHM_KEY]);
184                         msg.Extension = (byte[]) headers[EXTENSION_KEY];
185                         msg.HashAlgorithm = (HashAlgorithm) Enum.ToObject (typeof (HashAlgorithm), headers[HASH_ALGORITHM_KEY]);
186                         msg.Label = GetString (headers, LABEL_KEY);
187                         msg.Priority = (MessagePriority) Enum.ToObject (typeof (MessagePriority), result.BasicProperties.Priority);
188                         msg.Recoverable = result.BasicProperties.DeliveryMode == PERSISTENT_DELIVERY_MODE;
189                         if (result.BasicProperties.ReplyTo != null)
190                                 msg.ResponseQueue = new RabbitMQMessageQueue (provider, QueueReference.Parse (result.BasicProperties.ReplyTo), true);
191                         msg.SenderCertificate = (byte[]) headers[SENDER_CERTIFICATE_KEY];
192                         msg.TimeToBeReceived = new TimeSpan((long) headers[TIME_TO_BE_RECEIVED_KEY]);
193                         msg.TimeToReachQueue = new TimeSpan((long) headers[TIME_TO_REACH_QUEUE_KEY]);
194                         msg.UseAuthentication = (bool) headers[USE_AUTHENTICATION_KEY];
195                         msg.UseDeadLetterQueue = (bool) headers[USE_DEAD_LETTER_QUEUE_KEY];
196                         msg.UseEncryption = (bool) headers[USE_ENCRYPTION_KEY];
197                         
198                         return msg;
199                 }
200                 
201                 public static string GetString (IDictionary properties, String key)
202                 {
203                         byte[] b = (byte[]) properties[key];
204                         if (b == null)
205                                 return null;
206                         
207                         return Encoding.UTF8.GetString (b);
208                 }
209                 
210                 public static AmqpTimestamp DateTimeToAmqpTimestamp (DateTime t)
211                 {
212                         DateTime epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
213                         TimeSpan ts = t.ToUniversalTime () - epoch;
214                         return new AmqpTimestamp((long) ts.TotalSeconds);
215                 }
216                 
217                 public static DateTime AmqpTimestampToDateTime (AmqpTimestamp ats)
218                 {
219                         DateTime epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
220                         return epoch.AddSeconds (ats.UnixTime).ToLocalTime ();
221                 }
222
223                 public static int TimeSpanToInt32 (TimeSpan timespan)
224                 {
225                         if (timespan == TimeSpan.MaxValue)
226                                 return -1;
227                         else
228                                 return (int) timespan.TotalMilliseconds;
229                 }
230         }
231 }