// ------------------------------------------------------------------------------ // // Copyright (c) Microsoft Corporation. All rights reserved. // // ------------------------------------------------------------------------------ // namespace System.Net { using System.Collections; using System.IO; using System.Security.Cryptography.X509Certificates ; using System.Net.Sockets; using System.Security.Permissions; using System.Text; using System.Threading; using System.Security.Authentication; /// /// /// Impliments basic sending and receiving of network commands. /// Handles generic parsing of server responses and provides /// a Pipeline sequencing mechnism for sending the commands to the /// server. /// /// internal class CommandStream : PooledStream { private static readonly AsyncCallback m_WriteCallbackDelegate = new AsyncCallback(WriteCallback); private static readonly AsyncCallback m_ReadCallbackDelegate = new AsyncCallback(ReadCallback); private bool m_RecoverableFailure; // // Active variables used for the command state machine // protected WebRequest m_Request; protected bool m_Async; private bool m_Aborted; protected PipelineEntry [] m_Commands; protected int m_Index; private bool m_DoRead; private bool m_DoSend; private ResponseDescription m_CurrentResponseDescription; protected string m_AbortReason; const int _WaitingForPipeline = 1; const int _CompletedPipeline = 2; /// /// /// Setups and Creates a NetworkStream connection to the server /// perform any initalization if needed /// /// internal CommandStream( ConnectionPool connectionPool, TimeSpan lifetime, bool checkLifetime ) : base(connectionPool, lifetime, checkLifetime) { m_Decoder = m_Encoding.GetDecoder(); } internal virtual void Abort(Exception e) { GlobalLog.Print("CommandStream"+ValidationHelper.HashString(this)+"::Abort() - closing control Stream"); lock (this) { if (m_Aborted) return; m_Aborted = true; CanBePooled = false; } try { base.Close(0); } finally { if (e != null) { InvokeRequestCallback(e); } else { InvokeRequestCallback(null); } } } /// /// Used to reset the connection /// protected override void Dispose(bool disposing) { GlobalLog.Print("CommandStream"+ValidationHelper.HashString(this)+"::Close()"); InvokeRequestCallback(null); // Do not call base.Dispose(bool), which would close the web request. // This stream effectively should be a wrapper around a web // request that does not own the web request. } /// /// A WebRequest can use a different Connection after an Exception is set, or a null is passed /// to mark completion. We shouldn't continue calling the Request.RequestCallback after that point /// protected void InvokeRequestCallback(object obj) { WebRequest webRequest = m_Request; if (webRequest != null) { webRequest.RequestCallback(obj); } } /// /// Indicates that we caught an error that should allow us to resubmit a request /// internal bool RecoverableFailure { get { return m_RecoverableFailure; } } /// /// We only offer recovery, if we're at the start of the first command /// protected void MarkAsRecoverableFailure() { if (m_Index <= 1) { m_RecoverableFailure = true; } } /// /// /// Setups and Creates a NetworkStream connection to the server /// perform any initalization if needed /// /// internal Stream SubmitRequest(WebRequest request, bool async, bool readInitalResponseOnConnect) { ClearState(); UpdateLifetime(); PipelineEntry [] commands = BuildCommandsList(request); InitCommandPipeline(request, commands, async); if(readInitalResponseOnConnect && JustConnected){ m_DoSend = false; m_Index = -1; } return ContinueCommandPipeline(); } protected virtual void ClearState() { InitCommandPipeline(null, null, false); } protected virtual PipelineEntry [] BuildCommandsList(WebRequest request) { return null; } protected Exception GenerateException(WebExceptionStatus status, Exception innerException) { return new WebException( NetRes.GetWebStatusString("net_connclosed", status), innerException, status, null /* no response */ ); } protected Exception GenerateException(FtpStatusCode code, string statusDescription, Exception innerException) { return new WebException(SR.GetString(SR.net_servererror,NetRes.GetWebStatusCodeString(code, statusDescription)), innerException,WebExceptionStatus.ProtocolError,null ); } protected void InitCommandPipeline(WebRequest request, PipelineEntry [] commands, bool async) { m_Commands = commands; m_Index = 0; m_Request = request; m_Aborted = false; m_DoRead = true; m_DoSend = true; m_CurrentResponseDescription = null; m_Async = async; m_RecoverableFailure = false; m_AbortReason = string.Empty; } internal void CheckContinuePipeline() { if (m_Async) return; try { ContinueCommandPipeline(); } catch (Exception e) { Abort(e); } } /// Pipelined command resoluton, how this works: /// a list of commands that need to be sent to the FTP server are spliced together into a array, /// each command such STOR, PORT, etc, is sent to the server, then the response is parsed into a string, /// with the response, the delegate is called, which returns an instruction (either continue, stop, or read additional /// responses from server). /// /// When done calling Close() to Notify ConnectionGroup that we are free protected Stream ContinueCommandPipeline() { // In async case, The BeginWrite can actually result in a // series of synchronous completions that eventually close // the connection. So we need to save the members that // we need to access, since they may not be valid after // BeginWrite returns bool async = m_Async; while (m_Index < m_Commands.Length) { if (m_DoSend) { if (m_Index < 0) throw new InternalException(); byte[] sendBuffer = Encoding.GetBytes(m_Commands[m_Index].Command); if (Logging.On) { string sendCommand = m_Commands[m_Index].Command.Substring(0, m_Commands[m_Index].Command.Length-2); if (m_Commands[m_Index].HasFlag(PipelineEntryFlags.DontLogParameter)) { int index = sendCommand.IndexOf(' '); if (index != -1) sendCommand = sendCommand.Substring(0, index) + " ********"; } Logging.PrintInfo(Logging.Web, this, SR.GetString(SR.net_log_sending_command, sendCommand)); } try { if (async) { BeginWrite(sendBuffer, 0, sendBuffer.Length, m_WriteCallbackDelegate, this); } else { Write(sendBuffer, 0, sendBuffer.Length); } } catch (IOException) { MarkAsRecoverableFailure(); throw; } catch { throw; } if (async) { return null; } } Stream stream = null; bool isReturn = PostSendCommandProcessing(ref stream); if (isReturn) { return stream; } } lock (this) { Close(); } return null; } // private bool PostSendCommandProcessing(ref Stream stream) { /* ** I don;t see how this code can be still relevant, remove it of no problems observed ** // // This is a general race condition in [....] mode, if the server returns an error // after we open the data connection, we will be off reading the data connection, // and not the control connection. The best we can do is try to poll, and in the // the worst case, we will timeout on establishing the data connection. // if (!m_DoRead && !m_Async) { m_DoRead = Poll(100 * 1000, SelectMode.SelectRead); // Poll is in Microseconds. } */ if (m_DoRead) { // In async case, The next call can actually result in a // series of synchronous completions that eventually close // the connection. So we need to save the members that // we need to access, since they may not be valid after the // next call returns bool async = m_Async; int index = m_Index; PipelineEntry[] commands = m_Commands; try { ResponseDescription response = ReceiveCommandResponse(); if (async) { return true; } m_CurrentResponseDescription = response; } catch { // If we get an exception on the QUIT command (which is // always the last command), ignore the final exception // and continue with the pipeline regardlss of [....]/async if (index < 0 || index >= commands.Length || commands[index].Command != "QUIT\r\n") throw; } } return PostReadCommandProcessing(ref stream); } // private bool PostReadCommandProcessing(ref Stream stream) { if (m_Index >= m_Commands.Length) return false; // Set up front to prevent a race condition on result == PipelineInstruction.Pause m_DoSend = false; m_DoRead = false; PipelineInstruction result; PipelineEntry entry; if(m_Index == -1) entry = null; else entry = m_Commands[m_Index]; // Final QUIT command may get exceptions since the connectin // may be already closed by the server. So there is no response // to process, just advance the pipeline to continue if (m_CurrentResponseDescription == null && entry.Command == "QUIT\r\n") result = PipelineInstruction.Advance; else result = PipelineCallback(entry, m_CurrentResponseDescription, false, ref stream); if (result == PipelineInstruction.Abort) { Exception exception; if (m_AbortReason != string.Empty) exception = new WebException(m_AbortReason); else exception = GenerateException(WebExceptionStatus.ServerProtocolViolation, null); Abort(exception); throw exception; } else if (result == PipelineInstruction.Advance) { m_CurrentResponseDescription = null; m_DoSend = true; m_DoRead = true; m_Index++; } else if (result == PipelineInstruction.Pause) { // // PipelineCallback did an async operation and will have to re-enter again // Hold on for now // return true; } else if (result == PipelineInstruction.GiveStream) { // // We will have another response coming, don't send // m_CurrentResponseDescription = null; m_DoRead = true; if (m_Async) { // If they block in the requestcallback we should still continue the pipeline ContinueCommandPipeline(); InvokeRequestCallback(stream); } return true; } else if (result == PipelineInstruction.Reread) { // Another response is expected after this one m_CurrentResponseDescription = null; m_DoRead = true; } return false; } internal enum PipelineInstruction { Abort, // aborts the pipeline Advance, // advances to the next pipelined command Pause, // Let async callback to continue the pipeline Reread, // rereads from the command socket GiveStream, // returns with open data stream, let stream close to continue } [Flags] internal enum PipelineEntryFlags { UserCommand = 0x1, GiveDataStream = 0x2, CreateDataConnection = 0x4, DontLogParameter = 0x8 } internal class PipelineEntry { internal PipelineEntry(string command) { Command = command; } internal PipelineEntry(string command, PipelineEntryFlags flags) { Command = command; Flags = flags; } internal bool HasFlag(PipelineEntryFlags flags) { return (Flags & flags) != 0; } internal string Command; internal PipelineEntryFlags Flags; } protected virtual PipelineInstruction PipelineCallback(PipelineEntry entry, ResponseDescription response, bool timeout, ref Stream stream) { return PipelineInstruction.Abort; } // // I/O callback methods // /// /// Provides a wrapper for the async operations, so that the code can be shared with [....] /// private static void ReadCallback(IAsyncResult asyncResult) { ReceiveState state = (ReceiveState)asyncResult.AsyncState; try { Stream stream = (Stream)state.Connection; int bytesRead = 0; try { bytesRead = stream.EndRead(asyncResult); if (bytesRead == 0) state.Connection.CloseSocket(); } catch (IOException) { state.Connection.MarkAsRecoverableFailure(); throw; } catch { throw; } state.Connection.ReceiveCommandResponseCallback(state, bytesRead); } catch (Exception e) { state.Connection.Abort(e); } } /// /// Provides a wrapper for the async write operations /// private static void WriteCallback(IAsyncResult asyncResult) { CommandStream connection = (CommandStream)asyncResult.AsyncState; try { try { connection.EndWrite(asyncResult); } catch (IOException) { connection.MarkAsRecoverableFailure(); throw; } catch { throw; } Stream stream = null; if (connection.PostSendCommandProcessing(ref stream)) return; connection.ContinueCommandPipeline(); } catch (Exception e) { connection.Abort(e); } } // // Read parsing methods and privates // private string m_Buffer = string.Empty; private Encoding m_Encoding = Encoding.UTF8; private Decoder m_Decoder; protected Encoding Encoding { get { return m_Encoding; } set { m_Encoding = value; m_Decoder = m_Encoding.GetDecoder(); } } /// /// This function is called a derived class to determine whether a response is valid, and when it is complete. /// protected virtual bool CheckValid(ResponseDescription response, ref int validThrough, ref int completeLength) { return false; } /// /// Kicks off an asynchronous or [....] request to receive a response from the server. /// Uses the Encoding encoding to transform the bytes received into a string to be /// returned in the GeneralResponseDescription's StatusDescription field. /// private ResponseDescription ReceiveCommandResponse() { // These are the things that will be needed to maintain state ReceiveState state = new ReceiveState(this); try { // If a string of nonzero length was decoded from the buffered bytes after the last complete response, then we // will use this string as our first string to append to the response StatusBuffer, and we will // forego a Connection.Receive here. if(m_Buffer.Length > 0) { ReceiveCommandResponseCallback(state, -1); } else { int bytesRead; try { if (m_Async) { BeginRead(state.Buffer, 0, state.Buffer.Length, m_ReadCallbackDelegate, state); return null; } else { bytesRead = Read(state.Buffer, 0, state.Buffer.Length); if (bytesRead == 0) CloseSocket(); ReceiveCommandResponseCallback(state, bytesRead); } } catch (IOException) { MarkAsRecoverableFailure(); throw; } catch { throw; } } } catch(Exception e) { if (e is WebException) throw; throw GenerateException(WebExceptionStatus.ReceiveFailure, e); } return state.Resp; } /// /// ReceiveCommandResponseCallback is the main "while loop" of the ReceiveCommandResponse function family. /// In general, what is does is perform an EndReceive() to complete the previous retrieval of bytes from the /// server (unless it is using a buffered response) It then processes what is received by using the /// implementing class's CheckValid() function, as described above. If the response is complete, it returns the single complete /// response in the GeneralResponseDescription created in BeginReceiveComamndResponse, and buffers the rest as described above. /// /// If the resposne is not complete, it issues another Connection.BeginReceive, with callback ReceiveCommandResponse2, /// so the action will continue at the next invocation of ReceiveCommandResponse2. /// /// /// private void ReceiveCommandResponseCallback(ReceiveState state, int bytesRead) { // completeLength will be set to a nonnegative number by CheckValid if the response is complete: // it will set completeLength to the length of a complete response. int completeLength = -1; while (true) { int validThrough = state.ValidThrough; // passed to checkvalid // If we have a Buffered response (ie data was received with the last response that was past the end of that response) // deal with it as if we had just received it now instead of actually doing another receive if(m_Buffer.Length > 0) { // Append the string we got from the buffer, and flush it out. state.Resp.StatusBuffer.Append(m_Buffer); m_Buffer = string.Empty; // invoke checkvalid. if(!CheckValid(state.Resp, ref validThrough, ref completeLength)) { throw GenerateException(WebExceptionStatus.ServerProtocolViolation, null); } } else // we did a Connection.BeginReceive. Note that in this case, all bytes received are in the receive buffer (because bytes from // the buffer were transferred there if necessary { // this indicates the connection was closed. if(bytesRead <= 0) { throw GenerateException(WebExceptionStatus.ServerProtocolViolation, null); } // decode the bytes in the receive buffer into a string, append it to the statusbuffer, and invoke checkvalid. // Decoder automatically takes care of caching partial codepoints at the end of a buffer. char[] chars = new char[m_Decoder.GetCharCount(state.Buffer, 0, bytesRead)]; int numChars = m_Decoder.GetChars(state.Buffer, 0, bytesRead, chars, 0, false); string szResponse = new string(chars, 0, numChars); state.Resp.StatusBuffer.Append(szResponse); if(!CheckValid(state.Resp, ref validThrough, ref completeLength)) { throw GenerateException(WebExceptionStatus.ServerProtocolViolation, null); } // If the response is complete, then determine how many characters are left over...these bytes need to be set into Buffer. if(completeLength >= 0) { int unusedChars = state.Resp.StatusBuffer.Length - completeLength; if (unusedChars > 0) { m_Buffer = szResponse.Substring(szResponse.Length-unusedChars, unusedChars); } } } // Now, in general, if the response is not complete, update the "valid through" length for the efficiency of checkValid. // and perform the next receive. // Note that there may NOT be bytes in the beginning of the receive buffer (even if there were partial characters left over after the // last encoding), because they get tracked in the Decoder. if(completeLength < 0) { state.ValidThrough = validThrough; try { if (m_Async) { BeginRead(state.Buffer, 0, state.Buffer.Length, m_ReadCallbackDelegate, state); return; } else { bytesRead = Read(state.Buffer, 0, state.Buffer.Length); if (bytesRead == 0) CloseSocket(); continue; } } catch (IOException) { MarkAsRecoverableFailure(); throw; } catch { throw; } } // the response is completed break; } // Otherwise, we have a complete response. string responseString = state.Resp.StatusBuffer.ToString(); state.Resp.StatusDescription = responseString.Substring(0, completeLength); // set the StatusDescription to the complete part of the response. Note that the Buffer has already been taken care of above. if (Logging.On) Logging.PrintInfo(Logging.Web, this, SR.GetString(SR.net_log_received_response, responseString.Substring(0, completeLength-2))); if (m_Async) { // Tell who is listening what was received. if (state.Resp != null) { m_CurrentResponseDescription = state.Resp; } Stream stream = null; if (PostReadCommandProcessing(ref stream)) return; ContinueCommandPipeline(); } } } // class CommandStream /// /// Contains the parsed status line from the server /// internal class ResponseDescription { internal const int NoStatus = -1; internal bool Multiline = false; internal int Status = NoStatus; internal string StatusDescription; internal StringBuilder StatusBuffer = new StringBuilder(); internal string StatusCodeString; internal bool PositiveIntermediate { get { return (Status >= 100 && Status <= 199); } } internal bool PositiveCompletion { get { return (Status >= 200 && Status <= 299); } } //internal bool PositiveAuthRelated { get { return (Status >= 300 && Status <= 399); } } internal bool TransientFailure { get { return (Status >= 400 && Status <= 499); } } internal bool PermanentFailure { get { return (Status >= 500 && Status <= 599); } } internal bool InvalidStatusCode { get { return (Status < 100 || Status > 599); } } } /// /// State information that is used during ReceiveCommandResponse()'s async operations /// internal class ReceiveState { private const int bufferSize = 1024; internal ResponseDescription Resp; internal int ValidThrough; internal byte[] Buffer; internal CommandStream Connection; internal ReceiveState(CommandStream connection) { Connection = connection; Resp = new ResponseDescription(); Buffer = new byte[bufferSize]; //1024 ValidThrough = 0; } } } // namespace System.Net