Cowboy 開源 WebSocket 網絡(luo)庫
Cowboy.WebSockets 是一個托管在 上的(de)基(ji)于 .NET/C# 實現(xian)(xian)的(de)開源 WebSocket 網絡庫,其完整的(de)實現(xian)(xian)了 協議(yi)標(biao)準,并部(bu)分實現(xian)(xian)了 協議(yi)標(biao)準。
WebSocket 可理解為建立在 TCP 連接通道(dao)上的更進一步的握手,并(bing)確(que)定了消息封(feng)裝格(ge)式。

通過定(ding)義控制幀(zhen) (Control Frame) 和數據(ju)幀(zhen) (Data Frame) 來控制通道內的通信和數據(ju)傳輸,下圖用使用 ABNF 格式描述了幀(zhen)頭部的格式。

Cowboy.WebSockets 中對(dui)于 WebSocket 的(de) Client/Server 分別做了實現,分別對(dui)應代碼中的(de):
的內部實現是基于 Cowboy.Sockets 中的 TAP 模式的 AsyncTcpSocketServer 和 AsyncTcpSocketClient 。關于 Cowboy.Sockets 可以參考文章《C#高性能TCP服務的多種實現方式》。
可通過 NuGet 查找(zhao) Cowboy 來獲取 nuget 包。

WebSocket 服務端應用
實現 AsyncWebSocketServerModule 抽(chou)象類(lei),其中(zhong)(zhong) ModulePath 對應著 "ws://host:port/path" 中(zhong)(zhong)的 path 部分。可以(yi)實現多個 Module,將多個 Module 注入到 AsyncWebSocketServerModuleCatalog 中(zhong)(zhong),或者采用反射機(ji)制等自動(dong)發(fa)現 Module。
public class TestWebSocketModule : AsyncWebSocketServerModule { public TestWebSocketModule() : base(@"/test") { } public override async Task OnSessionStarted(AsyncWebSocketSession session) { Console.WriteLine(string.Format("WebSocket session [{0}] has connected.", session.RemoteEndPoint)); await Task.CompletedTask; } public override async Task OnSessionTextReceived(AsyncWebSocketSession session, string text) { Console.Write(string.Format("WebSocket session [{0}] received Text --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendTextAsync(text); } public override async Task OnSessionBinaryReceived(AsyncWebSocketSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("WebSocket session [{0}] received Binary --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendBinaryAsync(Encoding.UTF8.GetBytes(text)); } public override async Task OnSessionClosed(AsyncWebSocketSession session) { Console.WriteLine(string.Format("WebSocket session [{0}] has disconnected.", session.RemoteEndPoint)); await Task.CompletedTask; } }
實(shi)例化 AsyncWebSocketServer,并將 AsyncWebSocketServerModuleCatalog 實(shi)例注入,即可啟動 WebSocket 的服務端監聽。
class Program { static AsyncWebSocketServer _server; static void Main(string[] args) { NLogLogger.Use(); try { var catalog = new AsyncWebSocketServerModuleCatalog(); catalog.RegisterModule(new TestWebSocketModule()); var config = new AsyncWebSocketServerConfiguration(); //config.SslEnabled = true; //config.SslServerCertificate = new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.pfx", "Cowboy"); //config.SslPolicyErrorsBypassed = true; _server = new AsyncWebSocketServer(22222, catalog, config); _server.Listen(); Console.WriteLine("WebSocket server has been started on [{0}].", _server.ListenedEndPoint); Console.WriteLine("Type something to send to clients..."); while (true) { try { string text = Console.ReadLine(); if (text == "quit") break; Task.Run(async () => { //await _server.BroadcastText(text); //Console.WriteLine("WebSocket server [{0}] broadcasts text -> [{1}].", _server.ListenedEndPoint, text); await _server.BroadcastBinaryAsync(Encoding.UTF8.GetBytes(text)); Console.WriteLine("WebSocket server [{0}] broadcasts binary -> [{1}].", _server.ListenedEndPoint, text); }); } catch (Exception ex) { Console.WriteLine(ex.Message); } } _server.Shutdown(); Console.WriteLine("WebSocket server has been stopped on [{0}].", _server.ListenedEndPoint); } catch (Exception ex) { Logger.Get<Program>().Error(ex.Message, ex); } Console.ReadKey(); } }
WebSocket 客戶端應用
客戶端側(ce)在實例化(hua) AsyncWebSocketClient 時有兩種方式:
- 實現 IAsyncWebSocketClientMessageDispatcher 接口;
- 直接構造函數注入接受各種事件的 Func<> 實現;
public interface IAsyncWebSocketClientMessageDispatcher { Task OnServerConnected(AsyncWebSocketClient client); Task OnServerTextReceived(AsyncWebSocketClient client, string text); Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerDisconnected(AsyncWebSocketClient client); Task OnServerFragmentationStreamOpened(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerFragmentationStreamContinued(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerFragmentationStreamClosed(AsyncWebSocketClient client, byte[] data, int offset, int count); }
下(xia)面的 DEMO 采用了方式(shi)二。
class Program { static AsyncWebSocketClient _client; static void Main(string[] args) { NLogLogger.Use(); Task.Run(async () => { try { var config = new AsyncWebSocketClientConfiguration(); //config.SslTargetHost = "Cowboy"; //config.SslClientCertificates.Add(new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.cer")); //config.SslPolicyErrorsBypassed = true; //var uri = new Uri("ws://echo.websocket.org/"); //var uri = new Uri("wss://127.0.0.1:22222/test"); var uri = new Uri("ws://127.0.0.1:22222/test"); _client = new AsyncWebSocketClient(uri, OnServerTextReceived, OnServerBinaryReceived, OnServerConnected, OnServerDisconnected, config); await _client.Connect(); Console.WriteLine("WebSocket client has connected to server [{0}].", uri); Console.WriteLine("Type something to send to server..."); while (_client.State == WebSocketState.Open) { try { string text = Console.ReadLine(); if (text == "quit") break; Task.Run(async () => { //await _client.SendText(text); //Console.WriteLine("Client [{0}] send text -> [{1}].", _client.LocalEndPoint, text); await _client.SendBinaryAsync(Encoding.UTF8.GetBytes(text)); Console.WriteLine("Client [{0}] send binary -> [{1}].", _client.LocalEndPoint, text); }).Forget(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } await _client.Close(WebSocketCloseCode.NormalClosure); Console.WriteLine("WebSocket client has disconnected from server [{0}].", uri); } catch (Exception ex) { Logger.Get<Program>().Error(ex.Message, ex); } }).Wait(); Console.ReadKey(); } private static async Task OnServerConnected(AsyncWebSocketClient client) { Console.WriteLine(string.Format("WebSocket server [{0}] has connected.", client.RemoteEndPoint)); await Task.CompletedTask; } private static async Task OnServerTextReceived(AsyncWebSocketClient client, string text) { Console.Write(string.Format("WebSocket server [{0}] received Text --> ", client.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await Task.CompletedTask; } private static async Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("WebSocket server [{0}] received Binary --> ", client.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await Task.CompletedTask; } private static async Task OnServerDisconnected(AsyncWebSocketClient client) { Console.WriteLine(string.Format("WebSocket server [{0}] has disconnected.", client.RemoteEndPoint)); await Task.CompletedTask; } }
相關資料
本篇文章《Cowboy 開源 WebSocket 網絡庫》由 Dennis Gao 發表自博客園個人博客,未經作者本人同意禁止以任何的形式轉載,任何自動的或人為的爬蟲轉載行為均為耍流氓。
