中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

C# 高性能(neng) TCP 服務(wu)的(de)多種(zhong)實現方(fang)式

哎~~ 想想大部分園友應該對 "高性能" 字樣更感興趣,為了吸引眼球所以標題中一定要突出,其實我更喜歡的標題是《猴賽雷,C# 編寫 TCP 服務的花樣姿勢!》

本篇文章的(de)主旨是使用 .NET/C# 實現 TCP 高(gao)性(xing)能服務的(de)不同(tong)方式,包括但不限于如下內容(rong):

在(zai) .NET/C# 中對于 Socket 的(de)(de)支持均是基于 完(wan)成端口技術的(de)(de)封裝(zhuang),通過(guo)不同(tong)的(de)(de) Non-Blocking 封裝(zhuang)結構來滿足不同(tong)的(de)(de)編程需求。以上方式均已在(zai) 中有完(wan)整實現,并(bing)且(qie) APM 和(he) TAP 方式已經在(zai)實際項目中應用。 還在(zai)不斷(duan)的(de)(de)進(jin)化和(he)完(wan)善中,如有任何問題請及時指(zhi)正(zheng)。

雖然有這么多種實現方式,但抽象的看,它們是一樣一樣的,用兩個 Loop 即可描述:Accept LoopRead Loop,如下圖所示。(這里提及的 "Loop" 指的是一種循環方式,而非特(te)指 while/for 等關鍵字。)

  • 在任何 TCP Server 的實現中,一定存在一個 Accept Socket Loop,用于接收 Client 端的 Connect 請求以建立 TCP Connection。
  • 在任何 TCP Server 的實現中,一定存在一個 Read Socket Loop,用于接收 Client 端 Write 過來的數據。

如果 Accept 循環(huan)阻塞(sai)(sai),則(ze)會導(dao)致(zhi)無法(fa)快速(su)的建(jian)立連接(jie),服務端(duan) Pending Backlog 滿(man),進而導(dao)致(zhi) Client 端(duan)收(shou)到 Connect Timeout 的異常。如果 Read 循環(huan)阻塞(sai)(sai),則(ze)顯然會導(dao)致(zhi)無法(fa)及時(shi)收(shou)到 Client 端(duan)發過(guo)來(lai)的數(shu)據,進而導(dao)致(zhi) Client 端(duan) Send Buffer 滿(man),無法(fa)再發送(song)數(shu)據。

從實(shi)現(xian)細節(jie)的角(jiao)度看,能夠導致服務阻塞的位置可能在:

  1. Accept 到新的 Socket,構建新的 Connection 需要分配各種資源,分配資源慢;
  2. Accept 到新的 Socket,沒有及時觸發下一次 Accept;
  3. Read 到新的 Buffer,判定 Payload 消息長度,判定過程長;
  4. Read 到新的 Buffer,發現 Payload 還沒有收全,繼續 Read,則 "可能" 會導致一次 Buffer Copy;
  5. Payload 接收完畢,進行 De-Serialization 轉成可識別的 Protocol Message,反序列化慢;
  6. 由 Business Module 來處理相應的 Protocol Message,處理過程慢;

1-2 涉及到 Accept 過程和 Connection 的建立(li)過程,3-4 涉及到 ReceiveBuffer 的處(chu)理(li)過程,5-6 涉及到應用邏(luo)輯側的實現。

Java 中著(zhu)名的(de) Netty 網絡庫從 4.0 版本開始對(dui)于 Buffer 部分做了全新(xin)的(de)嘗(chang)試(shi)(shi),采用了名叫(jiao) 的(de)設計(ji),實現(xian) Buffer Zero Copy 以減少高并發(fa)條(tiao)件下(xia) Buffer 拷貝帶(dai)來(lai)的(de)性能損失和 GC 壓力(li)。, , 等項(xiang)目正(zheng)在嘗(chang)試(shi)(shi)在 C# 中進行類似的(de) ByteBuf 的(de)實現(xian)。

APM 方式:TcpSocketServer

 的(de)(de)實現是基于 .NET Framework 自帶的(de)(de)  和  的(de)(de)更進一步的(de)(de)封裝,采(cai)用基于 APM 的(de)(de) BeginXXX 和 EndXXX 接口實現。

TcpSocketServer 中的 Accept Loop 指的就(jiu)是,

  • BeginAccept -> EndAccept-> BeginAccept -> EndAccept -> BeginAccept -> ...

每(mei)一個建立成功(gong)的(de) Connection 由 來(lai)處理,所以 TcpSocketSession 中會包含 Read Loop,

  • BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...

TcpSocketServer 通過暴露(lu) Event 來實現 Connection 的(de)建(jian)立與斷開和數據(ju)接收的(de)通知。

  event EventHandler<TcpClientConnectedEventArgs> ClientConnected;
  event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;
  event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;

使用也是簡單(dan)直(zhi)接,直(zhi)接訂閱事件(jian)通(tong)知。

  private static void StartServer()
  {
      _server = new TcpSocketServer(22222);
      _server.ClientConnected += server_ClientConnected;
      _server.ClientDisconnected += server_ClientDisconnected;
      _server.ClientDataReceived += server_ClientDataReceived;
      _server.Listen();
  }
  
  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));
  }
  
  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));
  }
  
  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e)
  {
      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
      Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
      Console.WriteLine(string.Format("{0}", text));
      _server.Broadcast(Encoding.UTF8.GetBytes(text));
  }

TAP 方式:AsyncTcpSocketServer

 的(de)(de)實現(xian)是基于 .NET Framework 自(zi)帶的(de)(de)  和  的(de)(de)更進一步的(de)(de)封裝,采用基于 TAP 的(de)(de) async/await 的(de)(de) XXXAsync 接口實現(xian)。

然(ran)而(er),實(shi)際(ji)上(shang) 并沒有創(chuang)建(jian)什么神奇(qi)的效(xiao)果,其(qi)內(nei)部實(shi)現只(zhi)是將 APM 的方法轉換成(cheng)了 TAP 的調用(yong)方式(shi)。

  //************* Task-based async public methods *************************
  [HostProtection(ExternalThreading = true)]
  public Task<Socket> AcceptSocketAsync()
  {
      return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);
  }
  
  [HostProtection(ExternalThreading = true)]
  public Task<TcpClient> AcceptTcpClientAsync()
  {
      return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
  }

 中的(de)(de) Accept Loop 指(zhi)的(de)(de)就(jiu)是,

  while (IsListening)
  {
      var tcpClient = await _listener.AcceptTcpClientAsync();
  }

每一個(ge)建立(li)成功的 Connection 由(you)  來處理(li),所以 AsyncTcpSocketSession 中(zhong)會包(bao)含 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
  }

為了將 async/await 異步到底, 所暴露的接口也(ye)同樣是 Awaitable 的。

  public interface IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }

使用時僅需將一個實現了(le)該(gai)接(jie)口的對象注入到 AsyncTcpSocketServer 的構造(zao)函數(shu)中(zhong)即(ji)可。

  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {
      public async Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }

當然,對(dui)于接口的實現(xian)也(ye)不是強制了,也(ye)可以在構造函數中直(zhi)接注入(ru)方法的實現(xian)。

  public AsyncTcpSocketServer(
      IPEndPoint listenedEndPoint,
      Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
      Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
      Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
      AsyncTcpSocketServerConfiguration configuration = null)
  {}

SAEA 方式:TcpSocketSaeaServer

SAEA 是 的(de)簡寫。SocketAsyncEventArgs 是 開始(shi)支(zhi)持的(de)一種(zhong)支(zhi)持高性(xing)能 Socket 通(tong)信的(de)實現。SocketAsyncEventArgs 相比于 APM 方式的(de)主要(yao)優點(dian)可以描(miao)述如下:

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.

也就是說(shuo),優點就是無(wu)需為每次調(diao)用都生成&nbsp;IAsyncResult 等對象,向原(yuan)生 Socket 更靠近一些。

使用 SocketAsyncEventArgs 的如下(xia):

  1. Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
  2. Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).
  3. Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.
  4. If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.
  5. If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.
  6. Reuse the context for another operation, put it back in the pool, or discard it.

重(zhong)點在于池化(Pooling),池化的目的就是為了重(zhong)用和減少(shao)運行(xing)時(shi)分配和垃圾(ji)回收的壓(ya)力。

即是對(dui) SocketAsyncEventArgs 的應(ying)用和封裝,并實現了 Pooling 技術(shu)。 中(zhong)的重點是 SaeaAwaitable 類,SaeaAwaitable 中(zhong)內置了一個 SocketAsyncEventArgs,并通過 GetAwaiter 返(fan)回 SaeaAwaiter 來支持(chi) async/await 操作(zuo)。同時,通過 SaeaExtensions 擴(kuo)展方法(fa)對(dui)來擴(kuo)展 SocketAsyncEventArgs 的 Awaitable 實現。

  public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable)
  public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)

SaeaPool 則(ze)是一(yi)個(ge) QueuedObjectPool<SaeaAwaitable> 的(de)衍生實現(xian),用于池化 SaeaAwaitable 實例。同時(shi),為了(le)減少(shao) TcpSocketSaeaSession 的(de)構(gou)建(jian)過程,也實現(xian)了(le) SessionPool 即 QueuedObjectPool<TcpSocketSaeaSession>。

 中的 Accept Loop 指(zhi)的就是,

  while (IsListening)
  {
      var saea = _acceptSaeaPool.Take();
  
      var socketError = await _listener.AcceptAsync(saea);
      if (socketError == SocketError.Success)
      {
          var acceptedSocket = saea.Saea.AcceptSocket;
      }
  
      _acceptSaeaPool.Return(saea);
  }

每一(yi)個建(jian)立成(cheng)功(gong)的 Connection 由  來處理(li),所以 TcpSocketSaeaSession 中會包含 Read Loop,

  var saea = _saeaPool.Take();
  saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);
  
  while (State == TcpSocketConnectionState.Connected)
  {
      saea.Saea.SetBuffer(0, _receiveBuffer.Length);
  
      var socketError = await _socket.ReceiveAsync(saea);
      if (socketError != SocketError.Success)
          break;
  
      var receiveCount = saea.Saea.BytesTransferred;
      if (receiveCount == 0)
          break;
  }

同樣(yang), 對(dui)外(wai)所暴露的(de)接(jie)口也同樣(yang)是 Awaitable 的(de)。

  public interface ITcpSocketSaeaServerMessageDispatcher
  {
      Task OnSessionStarted(TcpSocketSaeaSession session);
      Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(TcpSocketSaeaSession session);
  }

使用起(qi)來也(ye)是簡單直接:

  public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher
  {
      public async Task OnSessionStarted(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }

RIO 方式:TcpSocketRioServer

從 Windows 8.1 / Windows Server 2012 R2 開始,微軟(ruan)推出(chu)了(le) 來(lai)支(zhi)持高(gao)性能 Socket 服務的(de)實(shi)現(xian),簡稱 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue
  • RIOCreateCompletionQueue
  • RIOCreateRequestQueue
  • RIODequeueCompletion
  • RIODeregisterBuffer
  • RIONotify
  • RIOReceive
  • RIOReceiveEx
  • RIORegisterBuffer
  • RIOResizeCompletionQueue
  • RIOResizeRequestQueue
  • RIOSend
  • RIOSendEx

到(dao)目(mu)前(qian)為止,.NET Framework 還沒有推(tui)出對 RIO 的支持,所以若想在(zai) C# 中實現 RIO 則只能通過 P/Invoke 方式, 是(shi)開源項目(mu)中的一個比較完整的實現。

直接引用了 的源代碼,放置在 Cowboy.Sockets.Experimental 名空間下,以供實驗和測試使用(yong)。

同樣,通過(guo) 來(lai)實現(xian) Accept Loop,

_listener.OnAccepted = (acceptedSocket) =>
{
    Task.Run(async () =>
    {
        await Process(acceptedSocket);
    })
    .Forget();
};

通過 來處理 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
      if (receiveCount == 0)
          break;
  }

測(ce)試(shi)代碼一如既往的類似:

  public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher
  {
      public async Task OnSessionStarted(TcpSocketRioSession session)
      {
          //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          Console.WriteLine(string.Format("TCP session has connected {0}.", session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.Write(string.Format("Client : --> "));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(TcpSocketRioSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }

參考資料

本篇文章《C#高性能TCP服務的多種實現方式》由 Dennis Gao 發表自博客園個人博客,未經作者本人同意禁止以任何的形式轉載,任何自動的或人為的爬蟲轉載行為均為耍流氓。

posted @ 2016-02-05 14:42  sangmado  閱讀(96283)  評論(112)    收藏  舉報