9c03dcdfb64707e5dccba3b87f76e9bdd39088ed
[mono.git] / mcs / class / RabbitMQ.Client / src / client / impl / ConnectionBase.cs
1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
3 //
4 // The APL v2.0:
5 //
6 //---------------------------------------------------------------------------
7 //   Copyright (C) 2007, 2008 LShift Ltd., Cohesive Financial
8 //   Technologies LLC., and Rabbit Technologies Ltd.
9 //
10 //   Licensed under the Apache License, Version 2.0 (the "License");
11 //   you may not use this file except in compliance with the License.
12 //   You may obtain a copy of the License at
13 //
14 //       http://www.apache.org/licenses/LICENSE-2.0
15 //
16 //   Unless required by applicable law or agreed to in writing, software
17 //   distributed under the License is distributed on an "AS IS" BASIS,
18 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 //   See the License for the specific language governing permissions and
20 //   limitations under the License.
21 //---------------------------------------------------------------------------
22 //
23 // The MPL v1.1:
24 //
25 //---------------------------------------------------------------------------
26 //   The contents of this file are subject to the Mozilla Public License
27 //   Version 1.1 (the "License"); you may not use this file except in
28 //   compliance with the License. You may obtain a copy of the License at
29 //   http://www.rabbitmq.com/mpl.html
30 //
31 //   Software distributed under the License is distributed on an "AS IS"
32 //   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 //   License for the specific language governing rights and limitations
34 //   under the License.
35 //
36 //   The Original Code is The RabbitMQ .NET Client.
37 //
38 //   The Initial Developers of the Original Code are LShift Ltd.,
39 //   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
40 //
41 //   Portions created by LShift Ltd., Cohesive Financial Technologies
42 //   LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007, 2008
43 //   LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
44 //   Technologies Ltd.;
45 //
46 //   All Rights Reserved.
47 //
48 //   Contributor(s): ______________________________________.
49 //
50 //---------------------------------------------------------------------------
51 using System;
52 using System.IO;
53 using System.Text;
54 using System.Threading;
55 using System.Collections;
56
57 using RabbitMQ.Client;
58 using RabbitMQ.Client.Events;
59 using RabbitMQ.Client.Exceptions;
60 using RabbitMQ.Util;
61
62 // We use spec version 0-9 for common constants such as frame types,
63 // error codes, and the frame end byte, since they don't vary *within
64 // the versions we support*. Obviously we may need to revisit this if
65 // that ever changes.
66 using CommonFraming = RabbitMQ.Client.Framing.v0_9;
67
68 namespace RabbitMQ.Client.Impl
69 {
70     public abstract class ConnectionBase : IConnection
71     {
72         ///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
73         public readonly Frame heartbeatFrame = new Frame(CommonFraming.Constants.FrameHeartbeat,
74                                                          0,
75                                                          new byte[0]);
76
77         ///<summary>Timeout used while waiting for AMQP handshaking to
78         ///complete (milliseconds)</summary>
79         public static int HandshakeTimeout = 10000;
80
81         ///<summary>Timeout used while waiting for a
82         ///connection.close-ok reply to a connection close request
83         ///(milliseconds)</summary>
84         public static int ConnectionCloseTimeout = 10000;
85
86         public ConnectionParameters m_parameters;
87         public IFrameHandler m_frameHandler;
88         public uint m_frameMax = 0;
89         public ushort m_heartbeat = 0;
90         public AmqpTcpEndpoint[] m_knownHosts = null;
91
92         public MainSession m_session0;
93         public ModelBase m_model0;
94
95         public readonly SessionManager m_sessionManager;
96
97         public volatile bool m_running = true;
98
99         public readonly object m_eventLock = new object();
100         public ConnectionShutdownEventHandler m_connectionShutdown;
101         
102         public volatile ShutdownEventArgs m_closeReason = null;
103         public CallbackExceptionEventHandler m_callbackException;
104         
105         public ManualResetEvent m_appContinuation = new ManualResetEvent(false);
106         public AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
107         public AutoResetEvent m_heartbeatWrite = new AutoResetEvent(false);
108         public volatile bool closed = false;
109
110         public Guid id = Guid.NewGuid();
111
112         public int m_missedHeartbeats = 0;
113         
114         public IList shutdownReport = ArrayList.Synchronized(new ArrayList());
115
116         public ConnectionBase(ConnectionParameters parameters,
117                               bool insist,
118                               IFrameHandler frameHandler)
119         {
120             m_parameters = parameters;
121             m_frameHandler = frameHandler;
122
123             m_sessionManager = new SessionManager(this);
124             m_session0 = new MainSession(this);
125             m_session0.Handler = NotifyReceivedClose;
126             m_model0 = (ModelBase)Protocol.CreateModel(m_session0);
127
128             StartMainLoop();
129             Open(insist);
130             StartHeartbeatLoops();
131         }
132
133         public event ConnectionShutdownEventHandler ConnectionShutdown
134         {
135             add
136             {
137                 bool ok = false;
138                 lock (m_eventLock)
139                 {
140                     if (m_closeReason == null)
141                     {
142                         m_connectionShutdown += value;
143                         ok = true;
144                     }
145                 }
146                 if (!ok)
147                 {
148                     value(this, m_closeReason);
149                 }
150             }
151             remove
152             {
153                 lock (m_eventLock)
154                 {
155                     m_connectionShutdown -= value;
156                 }
157             }
158         }
159
160         public event CallbackExceptionEventHandler CallbackException
161         {
162             add
163             {
164                 lock (m_eventLock)
165                 {
166                     m_callbackException += value;
167                 }
168             }
169             remove
170             {
171                 lock (m_eventLock)
172                 {
173                     m_callbackException -= value;
174                 }
175             }
176         }
177
178         public AmqpTcpEndpoint Endpoint
179         {
180             get
181             {
182                 return m_frameHandler.Endpoint;
183             }
184         }
185
186         ///<summary>Explicit implementation of IConnection.Protocol.</summary>
187         IProtocol IConnection.Protocol
188         {
189             get
190             {
191                 return Endpoint.Protocol;
192             }
193         }
194
195         ///<summary>Another overload of a Protocol property, useful
196         ///for exposing a tighter type.</summary>
197         public AbstractProtocolBase Protocol
198         {
199             get
200             {
201                 return (AbstractProtocolBase)Endpoint.Protocol;
202             }
203         }
204
205         public void WriteFrame(Frame f)
206         {
207             m_frameHandler.WriteFrame(f);
208             m_heartbeatWrite.Set();
209         }
210
211         public ConnectionParameters Parameters
212         {
213             get
214             {
215                 return m_parameters;
216             }
217         }
218
219         public ushort ChannelMax
220         {
221             get
222             {
223                 return m_sessionManager.ChannelMax;
224             }
225             set
226             {
227                 m_sessionManager.ChannelMax = value;
228             }
229         }
230
231         public uint FrameMax
232         {
233             get
234             {
235                 return m_frameMax;
236             }
237             set
238             {
239                 m_frameMax = value;
240             }
241         }
242
243         public ushort Heartbeat
244         {
245             get
246             {
247                 return m_heartbeat;
248             }
249             set
250             {
251                 m_heartbeat = value;
252                 // Socket read timeout is twice the hearbeat
253                 // because when we hit the timeout socket is
254                 // in unusable state
255                 m_frameHandler.Timeout = value * 2 * 1000;
256             }
257         }
258
259         public AmqpTcpEndpoint[] KnownHosts
260         {
261             get { return m_knownHosts; }
262             set { m_knownHosts = value; }
263         }
264
265         public ShutdownEventArgs CloseReason
266         {
267             get
268             {
269                 return m_closeReason;
270             }
271         }
272
273         public bool IsOpen
274         {
275             get
276             {
277                 return CloseReason == null;
278             }
279         }
280
281         public bool AutoClose
282         {
283             get
284             {
285                 return m_sessionManager.AutoClose;
286             }
287             set
288             {
289                 m_sessionManager.AutoClose = value;
290             }
291         }
292
293         public IModel CreateModel()
294         {
295             ISession session = CreateSession();
296             IFullModel model = (IFullModel)Protocol.CreateModel(session);
297             model._Private_ChannelOpen("");
298             return model;
299         }
300
301         public ISession CreateSession()
302         {
303             return m_sessionManager.Create();
304         }
305         
306         public ISession CreateSession(int channelNumber)
307         {
308             return m_sessionManager.Create(channelNumber);
309         }
310
311         public bool SetCloseReason(ShutdownEventArgs reason)
312         {
313             lock (m_eventLock)
314             {
315                 if (m_closeReason == null)
316                 {
317                     m_closeReason = reason;
318                     return true;
319                 }
320                 else
321                 {
322                     return false;
323                 }
324             }
325         }
326         
327         public IList ShutdownReport
328         {
329             get
330             {
331                 return shutdownReport;
332             }
333         }
334
335         void IDisposable.Dispose()
336         {
337             Close();
338         }
339
340         ///<summary>API-side invocation of connection close.</summary>
341         public void Close()
342         {
343             Close(200, "Goodbye", Timeout.Infinite);
344         }
345         
346         ///<summary>API-side invocation of connection close with timeout.</summary>
347         public void Close(int timeout)
348         {
349             Close(200, "Goodbye", timeout);
350         }
351
352         public void Close(ShutdownEventArgs reason)
353         {
354             Close(reason, false, Timeout.Infinite);
355         }
356         
357         public void Close(ushort reasonCode, string reasonText, int timeout)
358         {
359             Close(new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText), false, timeout);
360         }
361         
362         
363         ///<summary>API-side invocation of connection abort.</summary>
364         public void Abort()
365         {
366             Abort(Timeout.Infinite);
367         }
368         
369         ///<summary>API-side invocation of connection abort with timeout.</summary>
370         public void Abort(int timeout)
371         {
372             Abort(200, "Connection close forced", timeout);
373         }
374         
375         public void Abort(ushort reasonCode, string reasonText, int timeout)
376         {
377             Abort(reasonCode, reasonText, ShutdownInitiator.Application, timeout);
378         }
379         
380         public void Abort(ushort reasonCode, string reasonText,
381                           ShutdownInitiator initiator, int timeout)
382         {
383             Close( new ShutdownEventArgs(initiator, reasonCode, reasonText),
384                   true, timeout);
385         }
386         
387         ///<summary>Try to close connection in a graceful way</summary>
388         ///<remarks>
389         ///<para>
390         ///Shutdown reason contains code and text assigned when closing the connection,
391         ///as well as the information about what initiated the close
392         ///</para>
393         ///<para>
394         ///Abort flag, if true, signals to close the ongoing connection immediately 
395         ///and do not report any errors if it was already closed.
396         ///</para>
397         ///<para>
398         ///Timeout determines how much time internal close operations should be given
399         ///to complete. Negative or Timeout.Infinite value mean infinity.
400         ///</para>
401         ///</remarks>
402         public void Close(ShutdownEventArgs reason, bool abort, int timeout)
403         {
404             if (!SetCloseReason(reason))
405                 if (abort)
406                 {
407                     if (!m_appContinuation.WaitOne(BlockingCell.validatedTimeout(timeout), true))
408                         m_frameHandler.Close();
409                     return;
410                 } else {
411                     throw new AlreadyClosedException(m_closeReason);
412                 }
413                                            
414             OnShutdown();
415             m_session0.SetSessionClosing(false);
416
417             try
418             {
419                 // Try to send connection close
420                 // Wait for CloseOk in the MainLoop
421                 m_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode,
422                                                           reason.ReplyText));
423             }
424             catch (IOException ioe) {
425                 if (m_model0.CloseReason == null)
426                 {
427                     if (!abort)
428                         throw ioe;
429                     else
430                         LogCloseError("Couldn't close connection cleanly. " 
431                                       + "Socket closed unexpectedly", ioe);
432                 }
433             }
434             finally
435             {
436                 TerminateMainloop();
437             }
438             if (!m_appContinuation.WaitOne(BlockingCell.validatedTimeout(timeout),true))
439                 m_frameHandler.Close();
440         }
441
442         public delegate void ConnectionCloseDelegate(ushort replyCode,
443                                                      string replyText,
444                                                      ushort classId,
445                                                      ushort methodId);
446
447         public void InternalClose(ShutdownEventArgs reason)
448         {
449             if (!SetCloseReason(reason))
450             {
451                 if (closed)
452                     throw new AlreadyClosedException(m_closeReason);
453                 // We are quiescing, but still allow for server-close
454             }
455             
456             OnShutdown();
457             m_session0.SetSessionClosing(true);
458             TerminateMainloop();
459         }
460
461         ///<remarks>
462         /// May be called more than once. Should therefore be idempotent.
463         ///</remarks>
464         public void TerminateMainloop()
465         {
466             m_running = false;
467         }
468
469         public void StartMainLoop()
470         {
471             Thread mainloopThread = new Thread(new ThreadStart(MainLoop));
472             mainloopThread.Name = "AMQP Connection " + Endpoint.ToString();
473             mainloopThread.Start();
474         }
475         
476         public void StartHeartbeatLoops()
477         {
478             if (Heartbeat != 0) {
479                 StartHeartbeatLoop(new ThreadStart(HeartbeatReadLoop), "Inbound");
480                 StartHeartbeatLoop(new ThreadStart(HeartbeatWriteLoop), "Outbound");
481             }
482         }
483         
484         public void StartHeartbeatLoop(ThreadStart loop, string name)
485         {
486             Thread heartbeatLoop = new Thread(loop);
487             heartbeatLoop.Name = "AMQP Heartbeat " + name + " for Connection " + Endpoint.ToString();
488             heartbeatLoop.Start();
489         }
490         
491         public void HeartbeatWriteLoop()
492         {
493             try
494             {
495                 while (!closed)
496                 {
497                     if (!m_heartbeatWrite.WaitOne(Heartbeat * 1000, false))
498                     {
499                         WriteFrame(heartbeatFrame);
500                     }
501                 }
502             } catch (Exception e) {
503                 HandleMainLoopException(new ShutdownEventArgs(
504                                                 ShutdownInitiator.Library,
505                                                 0,
506                                                 "End of stream",
507                                                 e));
508             }
509             
510             TerminateMainloop();
511             FinishClose();
512         }
513         
514         public void HeartbeatReadLoop()
515         {
516             while (!closed)
517             {
518                 if (!m_heartbeatRead.WaitOne(Heartbeat * 1000, false))
519                     m_missedHeartbeats++;
520                 else
521                     m_missedHeartbeats = 0;
522                     
523                 // Has to miss two full heartbeats to force socket close
524                 if (m_missedHeartbeats > 1)
525                 {
526                     EndOfStreamException eose = new EndOfStreamException(
527                                          "Heartbeat missing with heartbeat == " +
528                                          m_heartbeat + " seconds");
529                     HandleMainLoopException(new ShutdownEventArgs(
530                                                           ShutdownInitiator.Library,
531                                                           0,
532                                                           "End of stream",
533                                                           eose));
534                     break;
535                 }
536             }
537             
538             TerminateMainloop();
539             FinishClose();
540         }
541         
542         public void HandleHeartbeatFrame()
543         {
544             if (m_heartbeat == 0) {
545                 // Heartbeating not enabled for this connection.
546                 return;
547             }
548             
549             m_heartbeatRead.Set();
550         }
551
552         public void MainLoop()
553         {
554             bool shutdownCleanly = false;
555             try
556             {
557                 while (m_running)
558                 {
559                     try {
560                         MainLoopIteration();
561                     } catch (SoftProtocolException spe) {
562                         QuiesceChannel(spe);
563                     }
564                 }
565                 shutdownCleanly = true;
566             }
567             catch (EndOfStreamException eose)
568             {
569                 // Possible heartbeat exception
570                 HandleMainLoopException(new ShutdownEventArgs(
571                                                           ShutdownInitiator.Library,
572                                                           0,
573                                                           "End of stream",
574                                                           eose));
575             }
576             catch (HardProtocolException hpe)
577             {
578                 shutdownCleanly = HardProtocolExceptionHandler(hpe);
579             }
580             catch (Exception ex)
581             {
582                 HandleMainLoopException(new ShutdownEventArgs(ShutdownInitiator.Library,
583                                                           CommonFraming.Constants.InternalError,
584                                                           "Unexpected Exception",
585                                                           ex));
586             }
587             
588             // If allowed for clean shutdown
589             // Run limited version of the main loop
590             if (shutdownCleanly)
591             {
592                 ClosingLoop();
593             }
594             
595             FinishClose();
596
597             m_appContinuation.Set();
598         }
599         
600         public void MainLoopIteration()
601         {
602             Frame frame = m_frameHandler.ReadFrame();
603
604             // We have received an actual frame.
605             if (frame.Type == CommonFraming.Constants.FrameHeartbeat) {
606                 // Ignore it: we've already just reset the heartbeat
607                 // counter.
608                 HandleHeartbeatFrame();
609                 return;
610             }
611
612             if (frame.Channel == 0) {
613                 // In theory, we could get non-connection.close-ok
614                 // frames here while we're quiescing (m_closeReason !=
615                 // null). In practice, there's a limited number of
616                 // things the server can ask of us on channel 0 -
617                 // essentially, just connection.close. That, combined
618                 // with the restrictions on pipelining, mean that
619                 // we're OK here to handle channel 0 traffic in a
620                 // quiescing situation, even though technically we
621                 // should be ignoring everything except
622                 // connection.close-ok.
623                 m_session0.HandleFrame(frame);
624             } else {
625                 // If we're still m_running, but have a m_closeReason,
626                 // then we must be quiescing, which means any inbound
627                 // frames for non-zero channels (and any inbound
628                 // commands on channel zero that aren't
629                 // Connection.CloseOk) must be discarded.
630                 if (m_closeReason == null)
631                 {
632                     // No close reason, not quiescing the
633                     // connection. Handle the frame. (Of course, the
634                     // Session itself may be quiescing this particular
635                     // channel, but that's none of our concern.)
636                     ISession session = m_sessionManager.Lookup(frame.Channel);
637                     if (session == null) {
638                         throw new ChannelErrorException(frame.Channel);
639                     } else {
640                         session.HandleFrame(frame);
641                     }
642                 }
643             }
644         }
645         
646         // Only call at the end of the Mainloop or HeartbeatLoop
647         public void FinishClose()
648         {
649             // Notify hearbeat loops that they can leave
650             closed = true;
651             m_heartbeatRead.Set();
652             m_heartbeatWrite.Set();
653         
654             m_frameHandler.Close();                
655             m_model0.SetCloseReason(m_closeReason);
656             m_model0.FinishClose();
657         }
658             
659         public bool HardProtocolExceptionHandler(HardProtocolException hpe)
660         {
661             if (SetCloseReason(hpe.ShutdownReason))
662             {
663                 OnShutdown();
664                 m_session0.SetSessionClosing(false);
665                 try
666                 {
667                     m_session0.Transmit(ConnectionCloseWrapper(
668                                            hpe.ShutdownReason.ReplyCode,
669                                            hpe.ShutdownReason.ReplyText));
670                     return true;
671                 } catch (IOException ioe) {
672                     LogCloseError("Broker closed socket unexpectedly", ioe);
673                 }
674
675             } else
676                 LogCloseError("Hard Protocol Exception occured "
677                               + "while closing the connection", hpe);
678                 
679             return false;            
680         }
681         
682         ///<remarks>
683         /// Loop only used while quiescing. Use only to cleanly close connection
684         ///</remarks>
685         public void ClosingLoop()
686         {
687             m_frameHandler.Timeout = ConnectionCloseTimeout;
688             DateTime startTimeout = DateTime.Now;
689             try
690             {
691                 // Wait for response/socket closure or timeout
692                 while (!closed)
693                 {
694                     if ((DateTime.Now - startTimeout).TotalMilliseconds >= ConnectionCloseTimeout)
695                     {
696                         LogCloseError("Timeout, when waiting for server's response on close", null);
697                         break;
698                     }
699                     MainLoopIteration();
700                 }
701             }
702             catch (EndOfStreamException eose)
703             {
704                 if (m_model0.CloseReason == null)
705                     LogCloseError("Connection didn't close cleanly. "
706                                   + "Socket closed unexpectedly", eose);
707             }
708             catch (IOException ioe)
709             {
710                 LogCloseError("Connection didn't close cleanly. "
711                               + "Socket closed unexpectedly", ioe);
712             }
713             catch (Exception e)
714             {
715                 LogCloseError("Unexpected exception while closing: ", e);
716             }
717         }
718         
719         public void NotifyReceivedClose()
720         {
721             closed = true;
722             m_frameHandler.Close();
723         }
724         
725         ///<summary>
726         /// Sets the channel named in the SoftProtocolException into
727         /// "quiescing mode", where we issue a channel.close and
728         /// ignore everything up to the channel.close-ok reply that
729         /// should eventually arrive.
730         ///</summary>
731         ///<remarks>
732         ///<para>
733         /// Since a well-behaved peer will not wait indefinitely before
734         /// issuing the close-ok, we don't bother with a timeout here;
735         /// compare this to the case of a connection.close-ok, where a
736         /// timeout is necessary.
737         ///</para>
738         ///<para>
739         /// We need to send the close method and politely wait for a
740         /// reply before marking the channel as available for reuse.
741         ///</para>
742         ///<para>
743         /// As soon as SoftProtocolException is detected, we should stop
744         /// servicing ordinary application work, and should concentrate
745         /// on bringing down the channel as quickly and gracefully as
746         /// possible. The way this is done, as per the close-protocol,
747         /// is to signal closure up the stack *before* sending the
748         /// channel.close, by invoking ISession.Close. Once the upper
749         /// layers have been signalled, we are free to do what we need
750         /// to do to clean up and shut down the channel.
751         ///</para>
752         ///</remarks>
753         public void QuiesceChannel(SoftProtocolException pe) {
754             // First, construct the close request and QuiescingSession
755             // that we'll use during the quiesce process.
756
757             Command request;
758             int replyClassId;
759             int replyMethodId;
760             Protocol.CreateChannelClose(pe.ReplyCode,
761                                         pe.Message,
762                                         out request,
763                                         out replyClassId,
764                                         out replyMethodId);
765
766             ISession newSession = new QuiescingSession(this,
767                                                        pe.Channel,
768                                                        pe.ShutdownReason,
769                                                        replyClassId,
770                                                        replyMethodId);
771
772             // Here we detach the session from the connection. It's
773             // still alive: it just won't receive any further frames
774             // from the mainloop (once we return to the mainloop, of
775             // course). Instead, those frames will be directed at the
776             // new QuiescingSession.
777             ISession oldSession = m_sessionManager.Swap(pe.Channel, newSession);
778
779             // Now we have all the information we need, and the event
780             // flow of the *lower* layers is set up properly for
781             // shutdown. Signal channel closure *up* the stack, toward
782             // the model and application.
783             oldSession.Close(pe.ShutdownReason);
784
785             // The upper layers have been signalled. Now we can tell
786             // our peer. The peer will respond through the lower
787             // layers - specifically, through the QuiescingSession we
788             // installed above.
789             newSession.Transmit(request);
790         }
791
792         public void HandleMainLoopException(ShutdownEventArgs reason) {
793             if (!SetCloseReason(reason))
794             {
795                 LogCloseError("Unexpected Main Loop Exception while closing: "
796                                + reason.ToString(), null);
797                 return;
798             }
799             
800             OnShutdown();
801             LogCloseError("Unexpected connection closure: " + reason.ToString(), null);
802         }
803         
804         public void LogCloseError(String error, Exception ex)
805         {
806             shutdownReport.Add(new ShutdownReportEntry(error, ex));
807         }
808         
809         public void PrettyPrintShutdownReport()
810         {
811             if (ShutdownReport.Count == 0)
812             {
813                 Console.Error.WriteLine("No errors reported when closing connection {0}", this);
814             } else {
815                 Console.Error.WriteLine("Log of errors while closing connection {0}:", this);
816                 foreach(ShutdownReportEntry entry in ShutdownReport)
817                 {
818                     Console.Error.WriteLine(entry.ToString());
819                 }
820             }
821         }
822
823         ///<summary>Broadcasts notification of the final shutdown of the connection.</summary>
824         public void OnShutdown()
825         {
826             ConnectionShutdownEventHandler handler;
827             ShutdownEventArgs reason;
828             lock (m_eventLock)
829             {
830                 handler = m_connectionShutdown;
831                 reason = m_closeReason;
832                 m_connectionShutdown = null;
833             }
834             if (handler != null)
835             {
836                 foreach (ConnectionShutdownEventHandler h in handler.GetInvocationList()) {
837                     try {
838                         h(this, reason);
839                     } catch (Exception e) {
840                         CallbackExceptionEventArgs args = new CallbackExceptionEventArgs(e);
841                         args.Detail["context"] = "OnShutdown";
842                         OnCallbackException(args);
843                     }
844                 }
845             }
846         }
847
848         public void OnCallbackException(CallbackExceptionEventArgs args)
849         {
850             CallbackExceptionEventHandler handler;
851             lock (m_eventLock) {
852                 handler = m_callbackException;
853             }
854             if (handler != null) {
855                 foreach (CallbackExceptionEventHandler h in handler.GetInvocationList()) {
856                     try {
857                         h(this, args);
858                     } catch {
859                         // Exception in
860                         // Callback-exception-handler. That was the
861                         // app's last chance. Swallow the exception.
862                         // FIXME: proper logging
863                     }
864                 }
865             }
866         }
867
868         public IDictionary BuildClientPropertiesTable()
869         {
870             string version = this.GetType().Assembly.GetName().Version.ToString();
871             //TODO: Get the rest of this data from the Assembly Attributes
872             Hashtable table = new Hashtable();
873             table["product"] = Encoding.UTF8.GetBytes("RabbitMQ");
874             table["version"] = Encoding.UTF8.GetBytes(version);
875             table["platform"] = Encoding.UTF8.GetBytes(".NET");
876             table["copyright"] = Encoding.UTF8.GetBytes("Copyright (C) 2007-2008 LShift Ltd., " +
877                                                         "Cohesive Financial Technologies LLC., " +
878                                                         "and Rabbit Technologies Ltd.");
879             table["information"] = Encoding.UTF8.GetBytes("Licensed under the MPL.  " +
880                                                           "See http://www.rabbitmq.com/");
881             return table;
882         }
883         
884         public Command ConnectionCloseWrapper(ushort reasonCode, string reasonText)
885         {
886             Command request;
887             int replyClassId, replyMethodId;
888             Protocol.CreateConnectionClose(reasonCode,
889                                            reasonText,
890                                            out request,
891                                            out replyClassId,
892                                            out replyMethodId);
893             return request;
894         } 
895
896         private static uint NegotiatedMaxValue(uint clientValue, uint serverValue)
897         {
898             return (clientValue == 0 || serverValue == 0) ?
899                 Math.Max(clientValue, serverValue) :
900                 Math.Min(clientValue, serverValue);
901         }
902
903         public void Open(bool insist)
904         {
905             BlockingCell connectionStartCell = new BlockingCell();
906             m_model0.m_connectionStartCell = connectionStartCell;
907             m_frameHandler.Timeout = HandshakeTimeout;
908             m_frameHandler.SendHeader();
909
910             ConnectionStartDetails connectionStart = (ConnectionStartDetails)
911                 connectionStartCell.Value;
912
913             AmqpVersion serverVersion = new AmqpVersion(connectionStart.versionMajor,
914                                                         connectionStart.versionMinor);
915             if (!serverVersion.Equals(Protocol.Version))
916             {
917                 TerminateMainloop();
918                 FinishClose();
919                 throw new ProtocolVersionMismatchException(Protocol.MajorVersion,
920                                                            Protocol.MinorVersion,
921                                                            serverVersion.Major,
922                                                            serverVersion.Minor);
923             }
924
925             // FIXME: check that PLAIN is supported.
926             // FIXME: parse out locales properly!
927             ConnectionTuneDetails connectionTune =
928                 m_model0.ConnectionStartOk(BuildClientPropertiesTable(),
929                                            "PLAIN",
930                                            Encoding.UTF8.GetBytes("\0" + m_parameters.UserName +
931                                                                   "\0" + m_parameters.Password),
932                                            "en_US");
933
934             ushort channelMax = (ushort) NegotiatedMaxValue(m_parameters.RequestedChannelMax,
935                                                             connectionTune.channelMax);
936             ChannelMax = channelMax;
937
938             uint frameMax = NegotiatedMaxValue(m_parameters.RequestedFrameMax,
939                                                connectionTune.frameMax);
940             FrameMax = frameMax;
941
942             ushort heartbeat = (ushort) NegotiatedMaxValue(m_parameters.RequestedHeartbeat,
943                                                            connectionTune.heartbeat);
944             Heartbeat = heartbeat;
945
946             m_model0.ConnectionTuneOk(channelMax,
947                                       frameMax,
948                                       heartbeat);
949
950             string knownHosts = m_model0.ConnectionOpen(m_parameters.VirtualHost,
951                                                         "", // FIXME: make configurable?
952                                                         insist);
953             KnownHosts = AmqpTcpEndpoint.ParseMultiple(Protocol, knownHosts);
954         }
955
956         public override string ToString()
957         {
958             return string.Format("Connection({0},{1})", id, Endpoint);
959         }
960     }
961 }