2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System.Collections;
38 using RabbitMQ.Client;
39 using RabbitMQ.Client.Content;
40 using RabbitMQ.Client.Events;
41 using RabbitMQ.Client.Exceptions;
42 using RabbitMQ.Client.MessagePatterns;
45 namespace Mono.Messaging.RabbitMQ {
47 public class MessageFactory {
49 private static readonly string SENDER_VERSION_KEY = "SenderVersion";
50 private static readonly string SOURCE_MACHINE_KEY = "SourceMachine";
51 private static readonly string BODY_TYPE_KEY = "BodyType";
52 private static readonly string ACKNOWLEDGE_TYPE_KEY = "AcknowledgeType";
53 private static readonly string ADMINISTRATION_QUEUE_KEY = "AdministrationQueue";
54 private static readonly string APP_SPECIFIC_KEY = "AppSpecific";
55 private static readonly string AUTHENTICATION_PROVIDER_NAME_KEY = "AuthenticationProviderName";
56 private static readonly string AUTHENTICATION_PROVIDER_TYPE_KEY = "AuthenticationProviderType";
57 private static readonly string CONNECTOR_TYPE_KEY = "ConnectorType";
58 private static readonly string DESTINATION_SYMMETRIC_KEY_KEY = "DestinationSymmetricKey";
59 private static readonly string DIGITAL_SIGNATURE_KEY = "DigitalSignature";
60 private static readonly string ENCRYPTION_ALGORITHM_KEY = "EncryptionAlgorithm";
61 private static readonly string EXTENSION_KEY = "Extension";
62 private static readonly string HASH_ALGORITHM_KEY = "HashAlgorithm";
63 private static readonly string LABEL_KEY = "Label";
64 private static readonly string SENDER_CERTIFICATE_KEY = "SenderCertificate";
65 private static readonly string TIME_TO_BE_RECEIVED_KEY = "TimeToBeReceived";
66 private static readonly string TIME_TO_REACH_QUEUE_KEY = "TimeToReachQueue";
67 private static readonly string USE_AUTHENTICATION_KEY = "UseAuthentication";
68 private static readonly string USE_DEAD_LETTER_QUEUE_KEY = "UseDeadLetterQueue";
69 private static readonly string USE_ENCRYPTION_KEY = "UseEncryption";
70 private static readonly string TRANSACTION_ID_KEY = "TrandactionId";
72 private static readonly int PERSISTENT_DELIVERY_MODE = 2;
74 private readonly RabbitMQMessagingProvider provider;
76 public MessageFactory (RabbitMQMessagingProvider provider)
78 this.provider = provider;
81 public IMessageBuilder WriteMessage (IModel ch, IMessage msg)
83 BasicMessageBuilder mb = new BasicMessageBuilder (ch);
84 mb.Properties.MessageId = msg.Id;
85 if (msg.CorrelationId != null)
86 mb.Properties.CorrelationId = msg.CorrelationId;
87 // TODO: Change to DateTime.UtcNow??
88 mb.Properties.Timestamp = MessageFactory.DateTimeToAmqpTimestamp (DateTime.UtcNow);
89 Hashtable headers = new Hashtable ();
91 headers[SENDER_VERSION_KEY] = msg.SenderVersion;
92 headers[SOURCE_MACHINE_KEY] = (string) System.Environment.MachineName;
93 headers[BODY_TYPE_KEY] = msg.BodyType;
94 headers[ACKNOWLEDGE_TYPE_KEY] = (int) msg.AcknowledgeType;
95 if (msg.AdministrationQueue != null)
96 headers[ADMINISTRATION_QUEUE_KEY] = msg.AdministrationQueue.QRef.ToString ();
97 headers[APP_SPECIFIC_KEY] = msg.AppSpecific;
98 headers[AUTHENTICATION_PROVIDER_NAME_KEY] = msg.AuthenticationProviderName;
99 headers[AUTHENTICATION_PROVIDER_TYPE_KEY] = (int) msg.AuthenticationProviderType;
100 headers[CONNECTOR_TYPE_KEY] = msg.ConnectorType.ToString ();
101 headers[DESTINATION_SYMMETRIC_KEY_KEY] = msg.DestinationSymmetricKey;
102 headers[DIGITAL_SIGNATURE_KEY] = msg.DigitalSignature;
103 headers[ENCRYPTION_ALGORITHM_KEY] = (int) msg.EncryptionAlgorithm;
104 headers[EXTENSION_KEY] = msg.Extension;
105 headers[HASH_ALGORITHM_KEY] = (int) msg.HashAlgorithm;
106 SetValue (headers, LABEL_KEY, msg.Label);
107 mb.Properties.Priority = (byte) (int) msg.Priority;
108 mb.Properties.SetPersistent (msg.Recoverable);
109 if (msg.ResponseQueue != null)
110 mb.Properties.ReplyTo = msg.ResponseQueue.QRef.ToString ();
111 headers[SENDER_CERTIFICATE_KEY] = msg.SenderCertificate;
112 headers[TIME_TO_BE_RECEIVED_KEY] = msg.TimeToBeReceived.Ticks;
113 headers[TIME_TO_REACH_QUEUE_KEY] = msg.TimeToReachQueue.Ticks;
114 SetValue (headers, TRANSACTION_ID_KEY, msg.TransactionId);
115 headers[USE_AUTHENTICATION_KEY] = msg.UseAuthentication;
116 headers[USE_DEAD_LETTER_QUEUE_KEY] = msg.UseDeadLetterQueue;
117 headers[USE_ENCRYPTION_KEY] = msg.UseEncryption;
119 mb.Properties.Headers = headers;
120 Stream s = msg.BodyStream;
121 s.Seek (0, SeekOrigin.Begin);
122 byte[] buf = new byte[s.Length];
123 msg.BodyStream.Read (buf, 0, buf.Length);
124 mb.BodyStream.Write (buf, 0, buf.Length);
128 private static void SetValue (Hashtable headers, string name, object val)
134 public IMessage ReadMessage (QueueReference destination, BasicDeliverEventArgs result)
137 if (destination == null)
138 throw new ArgumentException ("destination must not be null");
140 throw new ArgumentException ("result must not be null");
142 MessageBase msg = new MessageBase ();
143 Stream s = new MemoryStream ();
144 s.Write (result.Body, 0, result.Body.Length);
145 DateTime arrivedTime = DateTime.Now;
146 IDictionary headers = result.BasicProperties.Headers;
147 long senderVersion = (long) headers[SENDER_VERSION_KEY];
148 string sourceMachine = GetString (headers, SOURCE_MACHINE_KEY);
149 DateTime sentTime = AmqpTimestampToDateTime (result.BasicProperties.Timestamp);
150 string transactionId = GetString (headers, TRANSACTION_ID_KEY);
151 msg.SetDeliveryInfo (Acknowledgment.None,
153 new RabbitMQMessageQueue (provider,
156 result.BasicProperties.MessageId,
163 msg.CorrelationId = result.BasicProperties.CorrelationId;
165 msg.BodyType = (int) result.BasicProperties.Headers[BODY_TYPE_KEY];
166 msg.AcknowledgeType = (AcknowledgeTypes)
167 Enum.ToObject (typeof (AcknowledgeTypes),
168 headers[ACKNOWLEDGE_TYPE_KEY]);
169 string adminQueuePath = GetString (headers, ADMINISTRATION_QUEUE_KEY);
170 if (adminQueuePath != null) {
171 QueueReference qRef = QueueReference.Parse (adminQueuePath);
172 msg.AdministrationQueue = new RabbitMQMessageQueue (provider,
176 msg.AppSpecific = (int) headers[APP_SPECIFIC_KEY];
177 msg.AuthenticationProviderName = GetString (headers, AUTHENTICATION_PROVIDER_NAME_KEY);
178 msg.AuthenticationProviderType = (CryptographicProviderType) Enum.ToObject (typeof (CryptographicProviderType), headers[AUTHENTICATION_PROVIDER_TYPE_KEY]);
179 string connectorType = GetString (headers, CONNECTOR_TYPE_KEY);
180 msg.ConnectorType = new Guid(connectorType);
181 msg.DestinationSymmetricKey = (byte[]) headers[DESTINATION_SYMMETRIC_KEY_KEY];
182 msg.DigitalSignature = (byte[]) headers[DIGITAL_SIGNATURE_KEY];
183 msg.EncryptionAlgorithm = (EncryptionAlgorithm) Enum.ToObject (typeof (EncryptionAlgorithm), headers[ENCRYPTION_ALGORITHM_KEY]);
184 msg.Extension = (byte[]) headers[EXTENSION_KEY];
185 msg.HashAlgorithm = (HashAlgorithm) Enum.ToObject (typeof (HashAlgorithm), headers[HASH_ALGORITHM_KEY]);
186 msg.Label = GetString (headers, LABEL_KEY);
187 msg.Priority = (MessagePriority) Enum.ToObject (typeof (MessagePriority), result.BasicProperties.Priority);
188 msg.Recoverable = result.BasicProperties.DeliveryMode == PERSISTENT_DELIVERY_MODE;
189 if (result.BasicProperties.ReplyTo != null)
190 msg.ResponseQueue = new RabbitMQMessageQueue (provider, QueueReference.Parse (result.BasicProperties.ReplyTo), true);
191 msg.SenderCertificate = (byte[]) headers[SENDER_CERTIFICATE_KEY];
192 msg.TimeToBeReceived = new TimeSpan((long) headers[TIME_TO_BE_RECEIVED_KEY]);
193 msg.TimeToReachQueue = new TimeSpan((long) headers[TIME_TO_REACH_QUEUE_KEY]);
194 msg.UseAuthentication = (bool) headers[USE_AUTHENTICATION_KEY];
195 msg.UseDeadLetterQueue = (bool) headers[USE_DEAD_LETTER_QUEUE_KEY];
196 msg.UseEncryption = (bool) headers[USE_ENCRYPTION_KEY];
201 public static string GetString (IDictionary properties, String key)
203 byte[] b = (byte[]) properties[key];
207 return Encoding.UTF8.GetString (b);
210 public static AmqpTimestamp DateTimeToAmqpTimestamp (DateTime t)
212 DateTime epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
213 TimeSpan ts = t.ToUniversalTime () - epoch;
214 return new AmqpTimestamp((long) ts.TotalSeconds);
217 public static DateTime AmqpTimestampToDateTime (AmqpTimestamp ats)
219 DateTime epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
220 return epoch.AddSeconds (ats.UnixTime).ToLocalTime ();
223 public static int TimeSpanToInt32 (TimeSpan timespan)
225 if (timespan == TimeSpan.MaxValue)
228 return (int) timespan.TotalMilliseconds;