1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
6 //---------------------------------------------------------------------------
7 // Copyright (C) 2007, 2008 LShift Ltd., Cohesive Financial
8 // Technologies LLC., and Rabbit Technologies Ltd.
10 // Licensed under the Apache License, Version 2.0 (the "License");
11 // you may not use this file except in compliance with the License.
12 // You may obtain a copy of the License at
14 // http://www.apache.org/licenses/LICENSE-2.0
16 // Unless required by applicable law or agreed to in writing, software
17 // distributed under the License is distributed on an "AS IS" BASIS,
18 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 // See the License for the specific language governing permissions and
20 // limitations under the License.
21 //---------------------------------------------------------------------------
25 //---------------------------------------------------------------------------
26 // The contents of this file are subject to the Mozilla Public License
27 // Version 1.1 (the "License"); you may not use this file except in
28 // compliance with the License. You may obtain a copy of the License at
29 // http://www.rabbitmq.com/mpl.html
31 // Software distributed under the License is distributed on an "AS IS"
32 // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 // License for the specific language governing rights and limitations
36 // The Original Code is The RabbitMQ .NET Client.
38 // The Initial Developers of the Original Code are LShift Ltd.,
39 // Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
41 // Portions created by LShift Ltd., Cohesive Financial Technologies
42 // LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007, 2008
43 // LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
46 // All Rights Reserved.
48 // Contributor(s): ______________________________________.
50 //---------------------------------------------------------------------------
54 using System.Threading;
55 using System.Collections;
57 using RabbitMQ.Client;
58 using RabbitMQ.Client.Events;
59 using RabbitMQ.Client.Exceptions;
62 // We use spec version 0-9 for common constants such as frame types,
63 // error codes, and the frame end byte, since they don't vary *within
64 // the versions we support*. Obviously we may need to revisit this if
66 using CommonFraming = RabbitMQ.Client.Framing.v0_9;
68 namespace RabbitMQ.Client.Impl
70 public abstract class ConnectionBase : IConnection
72 ///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
73 public readonly Frame heartbeatFrame = new Frame(CommonFraming.Constants.FrameHeartbeat,
77 ///<summary>Timeout used while waiting for AMQP handshaking to
78 ///complete (milliseconds)</summary>
79 public static int HandshakeTimeout = 10000;
81 ///<summary>Timeout used while waiting for a
82 ///connection.close-ok reply to a connection close request
83 ///(milliseconds)</summary>
84 public static int ConnectionCloseTimeout = 10000;
86 public ConnectionParameters m_parameters;
87 public IFrameHandler m_frameHandler;
88 public uint m_frameMax = 0;
89 public ushort m_heartbeat = 0;
90 public AmqpTcpEndpoint[] m_knownHosts = null;
92 public MainSession m_session0;
93 public ModelBase m_model0;
95 public readonly SessionManager m_sessionManager;
97 public volatile bool m_running = true;
99 public readonly object m_eventLock = new object();
100 public ConnectionShutdownEventHandler m_connectionShutdown;
102 public volatile ShutdownEventArgs m_closeReason = null;
103 public CallbackExceptionEventHandler m_callbackException;
105 public ManualResetEvent m_appContinuation = new ManualResetEvent(false);
106 public AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
107 public AutoResetEvent m_heartbeatWrite = new AutoResetEvent(false);
108 public volatile bool closed = false;
110 public Guid id = Guid.NewGuid();
112 public int m_missedHeartbeats = 0;
114 public IList shutdownReport = ArrayList.Synchronized(new ArrayList());
116 public ConnectionBase(ConnectionParameters parameters,
118 IFrameHandler frameHandler)
120 m_parameters = parameters;
121 m_frameHandler = frameHandler;
123 m_sessionManager = new SessionManager(this);
124 m_session0 = new MainSession(this);
125 m_session0.Handler = NotifyReceivedClose;
126 m_model0 = (ModelBase)Protocol.CreateModel(m_session0);
130 StartHeartbeatLoops();
133 public event ConnectionShutdownEventHandler ConnectionShutdown
140 if (m_closeReason == null)
142 m_connectionShutdown += value;
148 value(this, m_closeReason);
155 m_connectionShutdown -= value;
160 public event CallbackExceptionEventHandler CallbackException
166 m_callbackException += value;
173 m_callbackException -= value;
178 public AmqpTcpEndpoint Endpoint
182 return m_frameHandler.Endpoint;
186 ///<summary>Explicit implementation of IConnection.Protocol.</summary>
187 IProtocol IConnection.Protocol
191 return Endpoint.Protocol;
195 ///<summary>Another overload of a Protocol property, useful
196 ///for exposing a tighter type.</summary>
197 public AbstractProtocolBase Protocol
201 return (AbstractProtocolBase)Endpoint.Protocol;
205 public void WriteFrame(Frame f)
207 m_frameHandler.WriteFrame(f);
208 m_heartbeatWrite.Set();
211 public ConnectionParameters Parameters
219 public ushort ChannelMax
223 return m_sessionManager.ChannelMax;
227 m_sessionManager.ChannelMax = value;
243 public ushort Heartbeat
252 // Socket read timeout is twice the hearbeat
253 // because when we hit the timeout socket is
255 m_frameHandler.Timeout = value * 2 * 1000;
259 public AmqpTcpEndpoint[] KnownHosts
261 get { return m_knownHosts; }
262 set { m_knownHosts = value; }
265 public ShutdownEventArgs CloseReason
269 return m_closeReason;
277 return CloseReason == null;
281 public bool AutoClose
285 return m_sessionManager.AutoClose;
289 m_sessionManager.AutoClose = value;
293 public IModel CreateModel()
295 ISession session = CreateSession();
296 IFullModel model = (IFullModel)Protocol.CreateModel(session);
297 model._Private_ChannelOpen("");
301 public ISession CreateSession()
303 return m_sessionManager.Create();
306 public ISession CreateSession(int channelNumber)
308 return m_sessionManager.Create(channelNumber);
311 public bool SetCloseReason(ShutdownEventArgs reason)
315 if (m_closeReason == null)
317 m_closeReason = reason;
327 public IList ShutdownReport
331 return shutdownReport;
335 void IDisposable.Dispose()
340 ///<summary>API-side invocation of connection close.</summary>
343 Close(200, "Goodbye", Timeout.Infinite);
346 ///<summary>API-side invocation of connection close with timeout.</summary>
347 public void Close(int timeout)
349 Close(200, "Goodbye", timeout);
352 public void Close(ShutdownEventArgs reason)
354 Close(reason, false, Timeout.Infinite);
357 public void Close(ushort reasonCode, string reasonText, int timeout)
359 Close(new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText), false, timeout);
363 ///<summary>API-side invocation of connection abort.</summary>
366 Abort(Timeout.Infinite);
369 ///<summary>API-side invocation of connection abort with timeout.</summary>
370 public void Abort(int timeout)
372 Abort(200, "Connection close forced", timeout);
375 public void Abort(ushort reasonCode, string reasonText, int timeout)
377 Abort(reasonCode, reasonText, ShutdownInitiator.Application, timeout);
380 public void Abort(ushort reasonCode, string reasonText,
381 ShutdownInitiator initiator, int timeout)
383 Close( new ShutdownEventArgs(initiator, reasonCode, reasonText),
387 ///<summary>Try to close connection in a graceful way</summary>
390 ///Shutdown reason contains code and text assigned when closing the connection,
391 ///as well as the information about what initiated the close
394 ///Abort flag, if true, signals to close the ongoing connection immediately
395 ///and do not report any errors if it was already closed.
398 ///Timeout determines how much time internal close operations should be given
399 ///to complete. Negative or Timeout.Infinite value mean infinity.
402 public void Close(ShutdownEventArgs reason, bool abort, int timeout)
404 if (!SetCloseReason(reason))
407 if (!m_appContinuation.WaitOne(BlockingCell.validatedTimeout(timeout), true))
408 m_frameHandler.Close();
411 throw new AlreadyClosedException(m_closeReason);
415 m_session0.SetSessionClosing(false);
419 // Try to send connection close
420 // Wait for CloseOk in the MainLoop
421 m_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode,
424 catch (IOException ioe) {
425 if (m_model0.CloseReason == null)
430 LogCloseError("Couldn't close connection cleanly. "
431 + "Socket closed unexpectedly", ioe);
438 if (!m_appContinuation.WaitOne(BlockingCell.validatedTimeout(timeout),true))
439 m_frameHandler.Close();
442 public delegate void ConnectionCloseDelegate(ushort replyCode,
447 public void InternalClose(ShutdownEventArgs reason)
449 if (!SetCloseReason(reason))
452 throw new AlreadyClosedException(m_closeReason);
453 // We are quiescing, but still allow for server-close
457 m_session0.SetSessionClosing(true);
462 /// May be called more than once. Should therefore be idempotent.
464 public void TerminateMainloop()
469 public void StartMainLoop()
471 Thread mainloopThread = new Thread(new ThreadStart(MainLoop));
472 mainloopThread.Name = "AMQP Connection " + Endpoint.ToString();
473 mainloopThread.Start();
476 public void StartHeartbeatLoops()
478 if (Heartbeat != 0) {
479 StartHeartbeatLoop(new ThreadStart(HeartbeatReadLoop), "Inbound");
480 StartHeartbeatLoop(new ThreadStart(HeartbeatWriteLoop), "Outbound");
484 public void StartHeartbeatLoop(ThreadStart loop, string name)
486 Thread heartbeatLoop = new Thread(loop);
487 heartbeatLoop.Name = "AMQP Heartbeat " + name + " for Connection " + Endpoint.ToString();
488 heartbeatLoop.Start();
491 public void HeartbeatWriteLoop()
497 if (!m_heartbeatWrite.WaitOne(Heartbeat * 1000, false))
499 WriteFrame(heartbeatFrame);
502 } catch (Exception e) {
503 HandleMainLoopException(new ShutdownEventArgs(
504 ShutdownInitiator.Library,
514 public void HeartbeatReadLoop()
518 if (!m_heartbeatRead.WaitOne(Heartbeat * 1000, false))
519 m_missedHeartbeats++;
521 m_missedHeartbeats = 0;
523 // Has to miss two full heartbeats to force socket close
524 if (m_missedHeartbeats > 1)
526 EndOfStreamException eose = new EndOfStreamException(
527 "Heartbeat missing with heartbeat == " +
528 m_heartbeat + " seconds");
529 HandleMainLoopException(new ShutdownEventArgs(
530 ShutdownInitiator.Library,
542 public void HandleHeartbeatFrame()
544 if (m_heartbeat == 0) {
545 // Heartbeating not enabled for this connection.
549 m_heartbeatRead.Set();
552 public void MainLoop()
554 bool shutdownCleanly = false;
561 } catch (SoftProtocolException spe) {
565 shutdownCleanly = true;
567 catch (EndOfStreamException eose)
569 // Possible heartbeat exception
570 HandleMainLoopException(new ShutdownEventArgs(
571 ShutdownInitiator.Library,
576 catch (HardProtocolException hpe)
578 shutdownCleanly = HardProtocolExceptionHandler(hpe);
582 HandleMainLoopException(new ShutdownEventArgs(ShutdownInitiator.Library,
583 CommonFraming.Constants.InternalError,
584 "Unexpected Exception",
588 // If allowed for clean shutdown
589 // Run limited version of the main loop
597 m_appContinuation.Set();
600 public void MainLoopIteration()
602 Frame frame = m_frameHandler.ReadFrame();
604 // We have received an actual frame.
605 if (frame.Type == CommonFraming.Constants.FrameHeartbeat) {
606 // Ignore it: we've already just reset the heartbeat
608 HandleHeartbeatFrame();
612 if (frame.Channel == 0) {
613 // In theory, we could get non-connection.close-ok
614 // frames here while we're quiescing (m_closeReason !=
615 // null). In practice, there's a limited number of
616 // things the server can ask of us on channel 0 -
617 // essentially, just connection.close. That, combined
618 // with the restrictions on pipelining, mean that
619 // we're OK here to handle channel 0 traffic in a
620 // quiescing situation, even though technically we
621 // should be ignoring everything except
622 // connection.close-ok.
623 m_session0.HandleFrame(frame);
625 // If we're still m_running, but have a m_closeReason,
626 // then we must be quiescing, which means any inbound
627 // frames for non-zero channels (and any inbound
628 // commands on channel zero that aren't
629 // Connection.CloseOk) must be discarded.
630 if (m_closeReason == null)
632 // No close reason, not quiescing the
633 // connection. Handle the frame. (Of course, the
634 // Session itself may be quiescing this particular
635 // channel, but that's none of our concern.)
636 ISession session = m_sessionManager.Lookup(frame.Channel);
637 if (session == null) {
638 throw new ChannelErrorException(frame.Channel);
640 session.HandleFrame(frame);
646 // Only call at the end of the Mainloop or HeartbeatLoop
647 public void FinishClose()
649 // Notify hearbeat loops that they can leave
651 m_heartbeatRead.Set();
652 m_heartbeatWrite.Set();
654 m_frameHandler.Close();
655 m_model0.SetCloseReason(m_closeReason);
656 m_model0.FinishClose();
659 public bool HardProtocolExceptionHandler(HardProtocolException hpe)
661 if (SetCloseReason(hpe.ShutdownReason))
664 m_session0.SetSessionClosing(false);
667 m_session0.Transmit(ConnectionCloseWrapper(
668 hpe.ShutdownReason.ReplyCode,
669 hpe.ShutdownReason.ReplyText));
671 } catch (IOException ioe) {
672 LogCloseError("Broker closed socket unexpectedly", ioe);
676 LogCloseError("Hard Protocol Exception occured "
677 + "while closing the connection", hpe);
683 /// Loop only used while quiescing. Use only to cleanly close connection
685 public void ClosingLoop()
687 m_frameHandler.Timeout = ConnectionCloseTimeout;
688 DateTime startTimeout = DateTime.Now;
691 // Wait for response/socket closure or timeout
694 if ((DateTime.Now - startTimeout).TotalMilliseconds >= ConnectionCloseTimeout)
696 LogCloseError("Timeout, when waiting for server's response on close", null);
702 catch (EndOfStreamException eose)
704 if (m_model0.CloseReason == null)
705 LogCloseError("Connection didn't close cleanly. "
706 + "Socket closed unexpectedly", eose);
708 catch (IOException ioe)
710 LogCloseError("Connection didn't close cleanly. "
711 + "Socket closed unexpectedly", ioe);
715 LogCloseError("Unexpected exception while closing: ", e);
719 public void NotifyReceivedClose()
722 m_frameHandler.Close();
726 /// Sets the channel named in the SoftProtocolException into
727 /// "quiescing mode", where we issue a channel.close and
728 /// ignore everything up to the channel.close-ok reply that
729 /// should eventually arrive.
733 /// Since a well-behaved peer will not wait indefinitely before
734 /// issuing the close-ok, we don't bother with a timeout here;
735 /// compare this to the case of a connection.close-ok, where a
736 /// timeout is necessary.
739 /// We need to send the close method and politely wait for a
740 /// reply before marking the channel as available for reuse.
743 /// As soon as SoftProtocolException is detected, we should stop
744 /// servicing ordinary application work, and should concentrate
745 /// on bringing down the channel as quickly and gracefully as
746 /// possible. The way this is done, as per the close-protocol,
747 /// is to signal closure up the stack *before* sending the
748 /// channel.close, by invoking ISession.Close. Once the upper
749 /// layers have been signalled, we are free to do what we need
750 /// to do to clean up and shut down the channel.
753 public void QuiesceChannel(SoftProtocolException pe) {
754 // First, construct the close request and QuiescingSession
755 // that we'll use during the quiesce process.
760 Protocol.CreateChannelClose(pe.ReplyCode,
766 ISession newSession = new QuiescingSession(this,
772 // Here we detach the session from the connection. It's
773 // still alive: it just won't receive any further frames
774 // from the mainloop (once we return to the mainloop, of
775 // course). Instead, those frames will be directed at the
776 // new QuiescingSession.
777 ISession oldSession = m_sessionManager.Swap(pe.Channel, newSession);
779 // Now we have all the information we need, and the event
780 // flow of the *lower* layers is set up properly for
781 // shutdown. Signal channel closure *up* the stack, toward
782 // the model and application.
783 oldSession.Close(pe.ShutdownReason);
785 // The upper layers have been signalled. Now we can tell
786 // our peer. The peer will respond through the lower
787 // layers - specifically, through the QuiescingSession we
789 newSession.Transmit(request);
792 public void HandleMainLoopException(ShutdownEventArgs reason) {
793 if (!SetCloseReason(reason))
795 LogCloseError("Unexpected Main Loop Exception while closing: "
796 + reason.ToString(), null);
801 LogCloseError("Unexpected connection closure: " + reason.ToString(), null);
804 public void LogCloseError(String error, Exception ex)
806 shutdownReport.Add(new ShutdownReportEntry(error, ex));
809 public void PrettyPrintShutdownReport()
811 if (ShutdownReport.Count == 0)
813 Console.Error.WriteLine("No errors reported when closing connection {0}", this);
815 Console.Error.WriteLine("Log of errors while closing connection {0}:", this);
816 foreach(ShutdownReportEntry entry in ShutdownReport)
818 Console.Error.WriteLine(entry.ToString());
823 ///<summary>Broadcasts notification of the final shutdown of the connection.</summary>
824 public void OnShutdown()
826 ConnectionShutdownEventHandler handler;
827 ShutdownEventArgs reason;
830 handler = m_connectionShutdown;
831 reason = m_closeReason;
832 m_connectionShutdown = null;
836 foreach (ConnectionShutdownEventHandler h in handler.GetInvocationList()) {
839 } catch (Exception e) {
840 CallbackExceptionEventArgs args = new CallbackExceptionEventArgs(e);
841 args.Detail["context"] = "OnShutdown";
842 OnCallbackException(args);
848 public void OnCallbackException(CallbackExceptionEventArgs args)
850 CallbackExceptionEventHandler handler;
852 handler = m_callbackException;
854 if (handler != null) {
855 foreach (CallbackExceptionEventHandler h in handler.GetInvocationList()) {
860 // Callback-exception-handler. That was the
861 // app's last chance. Swallow the exception.
862 // FIXME: proper logging
868 public IDictionary BuildClientPropertiesTable()
870 string version = this.GetType().Assembly.GetName().Version.ToString();
871 //TODO: Get the rest of this data from the Assembly Attributes
872 Hashtable table = new Hashtable();
873 table["product"] = Encoding.UTF8.GetBytes("RabbitMQ");
874 table["version"] = Encoding.UTF8.GetBytes(version);
875 table["platform"] = Encoding.UTF8.GetBytes(".NET");
876 table["copyright"] = Encoding.UTF8.GetBytes("Copyright (C) 2007-2008 LShift Ltd., " +
877 "Cohesive Financial Technologies LLC., " +
878 "and Rabbit Technologies Ltd.");
879 table["information"] = Encoding.UTF8.GetBytes("Licensed under the MPL. " +
880 "See http://www.rabbitmq.com/");
884 public Command ConnectionCloseWrapper(ushort reasonCode, string reasonText)
887 int replyClassId, replyMethodId;
888 Protocol.CreateConnectionClose(reasonCode,
896 private static uint NegotiatedMaxValue(uint clientValue, uint serverValue)
898 return (clientValue == 0 || serverValue == 0) ?
899 Math.Max(clientValue, serverValue) :
900 Math.Min(clientValue, serverValue);
903 public void Open(bool insist)
905 BlockingCell connectionStartCell = new BlockingCell();
906 m_model0.m_connectionStartCell = connectionStartCell;
907 m_frameHandler.Timeout = HandshakeTimeout;
908 m_frameHandler.SendHeader();
910 ConnectionStartDetails connectionStart = (ConnectionStartDetails)
911 connectionStartCell.Value;
913 AmqpVersion serverVersion = new AmqpVersion(connectionStart.versionMajor,
914 connectionStart.versionMinor);
915 if (!serverVersion.Equals(Protocol.Version))
919 throw new ProtocolVersionMismatchException(Protocol.MajorVersion,
920 Protocol.MinorVersion,
922 serverVersion.Minor);
925 // FIXME: check that PLAIN is supported.
926 // FIXME: parse out locales properly!
927 ConnectionTuneDetails connectionTune =
928 m_model0.ConnectionStartOk(BuildClientPropertiesTable(),
930 Encoding.UTF8.GetBytes("\0" + m_parameters.UserName +
931 "\0" + m_parameters.Password),
934 ushort channelMax = (ushort) NegotiatedMaxValue(m_parameters.RequestedChannelMax,
935 connectionTune.channelMax);
936 ChannelMax = channelMax;
938 uint frameMax = NegotiatedMaxValue(m_parameters.RequestedFrameMax,
939 connectionTune.frameMax);
942 ushort heartbeat = (ushort) NegotiatedMaxValue(m_parameters.RequestedHeartbeat,
943 connectionTune.heartbeat);
944 Heartbeat = heartbeat;
946 m_model0.ConnectionTuneOk(channelMax,
950 string knownHosts = m_model0.ConnectionOpen(m_parameters.VirtualHost,
951 "", // FIXME: make configurable?
953 KnownHosts = AmqpTcpEndpoint.ParseMultiple(Protocol, knownHosts);
956 public override string ToString()
958 return string.Format("Connection({0},{1})", id, Endpoint);