C#高性能Socket服务器SocketAsyncEventArgs的实现(IOCP)
引言
我一直在探寻一个高性能的Socket客户端代码。以前,我使用Socket类写了一些基于传统异步编程模型的代码(BeginSend、BeginReceive,等等)也看过很多博客的知识,在linux中有poll和epoll来实现,在windows下面
微软MSDN中也提供了SocketAsyncEventArgs这个类来实现IOCP 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx
NET Framework中的APM也称为Begin/End模式。这是因为会调用Begin方法来启动异步操作,然后返回一个IAsyncResult 对象。可以选择将一个代理作为参数提供给Begin方法,异步操作完成时会调用该方法。或者,一个线程可以等待 IAsyncResult.AsyncWaitHandle。当回调被调用或发出等待信号时,就会调用End方法来获取异步操作的结果。这种模式很灵活,使用相对简单,在 .NET Framework 中非常常见。
但是,您必须注意,如果进行大量异步套接字操作,是要付出代价的。针对每次操作,都必须创建一个IAsyncResult对象,而且该对象不能被重复使用。由于大量使用对象分配和垃圾收集,这会影响性能。为了解决这个问题,新版本提供了另一个使用套接字上执行异步I/O的方法模式。这种新模式并不要求为每个套接字操作分配操作上下文对象。
目标
在上面微软提供的例子我觉得不是很完整,没有具体一个流程,只是受到客户端消息后发送相同内容给客户端,初学者不容易看懂流程,因为我花了一天的时间来实现一个功能齐全的IOCP服务器,
效果如下
代码
首先是ICOPServer.cs 这个类是IOCP服务器的核心类,目前这个类是网络上比较全的代码,MSDN上面的例子都没有我的全
view plain copy
[*]using System;
[*]using System.Collections.Generic;
[*]using System.Linq;
[*]using System.Text;
[*]using System.Net.Sockets;
[*]using System.Net;
[*]using System.Threading;
[*]
[*]namespace ServerTest
[*]{
[*] /// <summary>
[*] /// IOCP SOCKET服务器
[*] /// </summary>
[*] public class IOCPServer : IDisposable
[*] {
[*] const int opsToPreAlloc = 2;
[*] #region Fields
[*] /// <summary>
[*] /// 服务器程序允许的最大客户端连接数
[*] /// </summary>
[*] private int _maxClient;
[*]
[*] /// <summary>
[*] /// 监听Socket,用于接受客户端的连接请求
[*] /// </summary>
[*] private Socket _serverSock;
[*]
[*] /// <summary>
[*] /// 当前的连接的客户端数
[*] /// </summary>
[*] private int _clientCount;
[*]
[*] /// <summary>
[*] /// 用于每个I/O Socket操作的缓冲区大小
[*] /// </summary>
[*] private int _bufferSize = 1024;
[*]
[*] /// <summary>
[*] /// 信号量
[*] /// </summary>
[*] Semaphore _maxAcceptedClients;
[*]
[*] /// <summary>
[*] /// 缓冲区管理
[*] /// </summary>
[*] BufferManager _bufferManager;
[*]
[*] /// <summary>
[*] /// 对象池
[*] /// </summary>
[*] SocketAsyncEventArgsPool _objectPool;
[*]
[*] private bool disposed = false;
[*]
[*] #endregion
[*]
[*] #region Properties
[*]
[*] /// <summary>
[*] /// 服务器是否正在运行
[*] /// </summary>
[*] public bool IsRunning { get; private set; }
[*] /// <summary>
[*] /// 监听的IP地址
[*] /// </summary>
[*] public IPAddress Address { get; private set; }
[*] /// <summary>
[*] /// 监听的端口
[*] /// </summary>
[*] public int Port { get; private set; }
[*] /// <summary>
[*] /// 通信使用的编码
[*] /// </summary>
[*] public Encoding Encoding { get; set; }
[*]
[*] #endregion
[*]
[*] #region Ctors
[*]
[*] /// <summary>
[*] /// 异步IOCP SOCKET服务器
[*] /// </summary>
[*] /// <param name="listenPort">监听的端口</param>
[*] /// <param name="maxClient">最大的客户端数量</param>
[*] public IOCPServer(int listenPort,int maxClient)
[*] : this(IPAddress.Any, listenPort, maxClient)
[*] {
[*] }
[*]
[*] /// <summary>
[*] /// 异步Socket TCP服务器
[*] /// </summary>
[*] /// <param name="localEP">监听的终结点</param>
[*] /// <param name="maxClient">最大客户端数量</param>
[*] public IOCPServer(IPEndPoint localEP, int maxClient)
[*] : this(localEP.Address, localEP.Port,maxClient)
[*] {
[*] }
[*]
[*] /// <summary>
[*] /// 异步Socket TCP服务器
[*] /// </summary>
[*] /// <param name="localIPAddress">监听的IP地址</param>
[*] /// <param name="listenPort">监听的端口</param>
[*] /// <param name="maxClient">最大客户端数量</param>
[*] public IOCPServer(IPAddress localIPAddress, int listenPort, int maxClient)
[*] {
[*] this.Address = localIPAddress;
[*] this.Port = listenPort;
[*] this.Encoding = Encoding.Default;
[*]
[*] _maxClient = maxClient;
[*]
[*] _serverSock = new Socket(localIPAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
[*]
[*] _bufferManager = new BufferManager(_bufferSize * _maxClient * opsToPreAlloc,_bufferSize);
[*]
[*] _objectPool = new SocketAsyncEventArgsPool(_maxClient);
[*]
[*] _maxAcceptedClients = new Semaphore(_maxClient, _maxClient);
[*] }
[*]
[*] #endregion
[*]
[*]
[*] #region 初始化
[*]
[*] /// <summary>
[*] /// 初始化函数
[*] /// </summary>
[*] public void Init()
[*] {
[*] // Allocates one large byte buffer which all I/O operations use a piece of. This gaurds
[*] // against memory fragmentation
[*] _bufferManager.InitBuffer();
[*]
[*] // preallocate pool of SocketAsyncEventArgs objects
[*] SocketAsyncEventArgs readWriteEventArg;
[*]
[*] for (int i = 0; i < _maxClient; i++)
[*] {
[*] //Pre-allocate a set of reusable SocketAsyncEventArgs
[*] readWriteEventArg = new SocketAsyncEventArgs();
[*] readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);
[*] readWriteEventArg.UserToken = null;
[*]
[*] // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object
[*] _bufferManager.SetBuffer(readWriteEventArg);
[*]
[*] // add SocketAsyncEventArg to the pool
[*] _objectPool.Push(readWriteEventArg);
[*] }
[*]
[*] }
[*]
[*] #endregion
[*]
[*] #region Start
[*] /// <summary>
[*] /// 启动
[*] /// </summary>
[*] public void Start()
[*] {
[*] if (!IsRunning)
[*] {
[*] Init();
[*] IsRunning = true;
[*] IPEndPoint localEndPoint = new IPEndPoint(Address, Port);
[*] // 创建监听socket
[*] _serverSock = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
[*] //_serverSock.ReceiveBufferSize = _bufferSize;
[*] //_serverSock.SendBufferSize = _bufferSize;
[*] if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6)
[*] {
[*] // 配置监听socket为 dual-mode (IPv4 & IPv6)
[*] // 27 is equivalent to IPV6_V6ONLY socket option in the winsock snippet below,
[*] _serverSock.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false);
[*] _serverSock.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port));
[*] }
[*] else
[*] {
[*] _serverSock.Bind(localEndPoint);
[*] }
[*] // 开始监听
[*] _serverSock.Listen(this._maxClient);
[*] // 在监听Socket上投递一个接受请求。
[*] StartAccept(null);
[*] }
[*] }
[*] #endregion
[*]
[*] #region Stop
[*]
[*] /// <summary>
[*] /// 停止服务
[*] /// </summary>
[*] public void Stop()
[*] {
[*] if (IsRunning)
[*] {
[*] IsRunning = false;
[*] _serverSock.Close();
[*] //TODO 关闭对所有客户端的连接
[*]
[*] }
[*] }
[*]
[*] #endregion
[*]
[*]
[*] #region Accept
[*]
[*] /// <summary>
[*] /// 从客户端开始接受一个连接操作
[*] /// </summary>
[*] private void StartAccept(SocketAsyncEventArgs asyniar)
[*] {
[*] if (asyniar == null)
[*] {
[*] asyniar = new SocketAsyncEventArgs();
[*] asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);
[*] }
[*] else
[*] {
[*] //socket must be cleared since the context object is being reused
[*] asyniar.AcceptSocket = null;
[*] }
[*] _maxAcceptedClients.WaitOne();
[*] if (!_serverSock.AcceptAsync(asyniar))
[*] {
[*] ProcessAccept(asyniar);
[*] //如果I/O挂起等待异步则触发AcceptAsyn_Asyn_Completed事件
[*] //此时I/O操作同步完成,不会触发Asyn_Completed事件,所以指定BeginAccept()方法
[*] }
[*] }
[*]
[*] /// <summary>
[*] /// accept 操作完成时回调函数
[*] /// </summary>
[*] /// <param name="sender">Object who raised the event.</param>
[*] /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param>
[*] private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e)
[*] {
[*] ProcessAccept(e);
[*] }
[*]
[*] /// <summary>
[*] /// 监听Socket接受处理
[*] /// </summary>
[*] /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param>
[*] private void ProcessAccept(SocketAsyncEventArgs e)
[*] {
[*] if (e.SocketError == SocketError.Success)
[*] {
[*] Socket s = e.AcceptSocket;//和客户端关联的socket
[*] if (s.Connected)
[*] {
[*] try
[*] {
[*]
[*] Interlocked.Increment(ref _clientCount);//原子操作加1
[*] SocketAsyncEventArgs asyniar = _objectPool.Pop();
[*] asyniar.UserToken = s;
[*]
[*] Log4Debug(String.Format("客户 {0} 连入, 共有 {1} 个连接。", s.RemoteEndPoint.ToString(), _clientCount));
[*]
[*] if (!s.ReceiveAsync(asyniar))//投递接收请求
[*] {
[*] Proce***eceive(asyniar);
[*] }
[*] }
[*] catch (SocketException ex)
[*] {
[*] Log4Debug(String.Format("接收客户 {0} 数据出错, 异常信息: {1} 。", s.RemoteEndPoint, ex.ToString()));
[*] //TODO 异常处理
[*] }
[*] //投递下一个接受请求
[*] StartAccept(e);
[*] }
[*] }
[*] }
[*]
[*] #endregion
[*]
[*] #region 发送数据
[*]
[*] /// <summary>
[*] /// 异步的发送数据
[*] /// </summary>
[*] /// <param name="e"></param>
[*] /// <param name="data"></param>
[*] public void Send(SocketAsyncEventArgs e, byte[] data)
[*] {
[*] if (e.SocketError == SocketError.Success)
[*] {
[*] Socket s = e.AcceptSocket;//和客户端关联的socket
[*] if (s.Connected)
[*] {
[*] Array.Copy(data, 0, e.Buffer, 0, data.Length);//设置发送数据
[*]
[*] //e.SetBuffer(data, 0, data.Length); //设置发送数据
[*] if (!s.SendAsync(e))//投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件
[*] {
[*] // 同步发送时处理发送完成事件
[*] ProcessSend(e);
[*] }
[*] else
[*] {
[*] CloseClientSocket(e);
[*] }
[*] }
[*] }
[*] }
[*]
[*] /// <summary>
[*] /// 同步的使用socket发送数据
[*] /// </summary>
[*] /// <param name="socket"></param>
[*] /// <param name="buffer"></param>
[*] /// <param name="offset"></param>
[*] /// <param name="size"></param>
[*] /// <param name="timeout"></param>
[*] public void Send(Socket socket, byte[] buffer, int offset, int size, int timeout)
[*] {
[*] socket.SendTimeout = 0;
[*] int startTickCount = Environment.TickCount;
[*] int sent = 0; // how many bytes is already sent
[*] do
[*] {
[*] if (Environment.TickCount > startTickCount + timeout)
[*] {
[*] //throw new Exception("Timeout.");
[*] }
[*] try
[*] {
[*] sent += socket.Send(buffer, offset + sent, size - sent, SocketFlags.None);
[*] }
[*] catch (SocketException ex)
[*] {
[*] if (ex.SocketErrorCode == SocketError.WouldBlock ||
[*] ex.SocketErrorCode == SocketError.IOPending ||
[*] ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable)
[*] {
[*] // socket buffer is probably full, wait and try again
[*] Thread.Sleep(30);
[*] }
[*] else
[*] {
[*] throw ex; // any serious error occurr
[*] }
[*] }
[*] } while (sent < size);
[*] }
[*]
[*]
[*] /// <summary>
[*] /// 发送完成时处理函数
[*] /// </summary>
[*] /// <param name="e">与发送完成操作相关联的SocketAsyncEventArg对象</param>
[*] private void ProcessSend(SocketAsyncEventArgs e)
[*] {
[*] if (e.SocketError == SocketError.Success)
[*] {
[*] Socket s = (Socket)e.UserToken;
[*]
[*] //TODO
[*] }
[*] else
[*] {
[*] CloseClientSocket(e);
[*] }
[*] }
[*]
[*] #endregion
[*]
[*] #region 接收数据
[*]
[*]
[*] /// <summary>
[*] ///接收完成时处理函数
[*] /// </summary>
[*] /// <param name="e">与接收完成操作相关联的SocketAsyncEventArg对象</param>
[*] private void Proce***eceive(SocketAsyncEventArgs e)
[*] {
[*] if (e.SocketError == SocketError.Success)//if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
[*] {
[*] // 检查远程主机是否关闭连接
[*] if (e.BytesTransferred > 0)
[*] {
[*] Socket s = (Socket)e.UserToken;
[*] //判断所有需接收的数据是否已经完成
[*] if (s.Available == 0)
[*] {
[*] //从侦听者获取接收到的消息。
[*] //String received = Encoding.ASCII.GetString(e.Buffer, e.Offset, e.BytesTransferred);
[*] //echo the data received back to the client
[*] //e.SetBuffer(e.Offset, e.BytesTransferred);
[*]
[*] byte[] data = new byte;
[*] Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//从e.Buffer块中复制数据出来,保证它可重用
[*]
[*] string info=Encoding.Default.GetString(data);
[*] Log4Debug(String.Format("收到 {0} 数据为 {1}",s.RemoteEndPoint.ToString(),info));
[*] //TODO 处理数据
[*]
[*] //增加服务器接收的总字节数。
[*] }
[*]
[*] if (!s.ReceiveAsync(e))//为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件
[*] {
[*] //同步接收时处理接收完成事件
[*] Proce***eceive(e);
[*] }
[*] }
[*] }
[*] else
[*] {
[*] CloseClientSocket(e);
[*] }
[*] }
[*]
[*] #endregion
[*]
[*] #region 回调函数
[*]
[*] /// <summary>
[*] /// 当Socket上的发送或接收请求被完成时,调用此函数
[*] /// </summary>
[*] /// <param name="sender">激发事件的对象</param>
[*] /// <param name="e">与发送或接收完成操作相关联的SocketAsyncEventArg对象</param>
[*] private void OnIOCompleted(object sender, SocketAsyncEventArgs e)
[*] {
[*] // Determine which type of operation just completed and call the associated handler.
[*] switch (e.LastOperation)
[*] {
[*] case SocketAsyncOperation.Accept:
[*] ProcessAccept(e);
[*] break;
[*] case SocketAsyncOperation.Receive:
[*] Proce***eceive(e);
[*] break;
[*] default:
[*] throw new ArgumentException("The last operation completed on the socket was not a receive or send");
[*] }
[*] }
[*]
[*] #endregion
[*]
[*] #region Close
[*] /// <summary>
[*] /// 关闭socket连接
[*] /// </summary>
[*] /// <param name="e">SocketAsyncEventArg associated with the completed send/receive operation.</param>
[*] private void CloseClientSocket(SocketAsyncEventArgs e)
[*] {
[*] Log4Debug(String.Format("客户 {0} 断开连接!",((Socket)e.UserToken).RemoteEndPoint.ToString()));
[*] Socket s = e.UserToken as Socket;
[*] CloseClientSocket(s, e);
[*] }
[*]
[*] /// <summary>
[*] /// 关闭socket连接
[*] /// </summary>
[*] /// <param name="s"></param>
[*] /// <param name="e"></param>
[*] private void CloseClientSocket(Socket s, SocketAsyncEventArgs e)
[*] {
[*] try
[*] {
[*] s.Shutdown(SocketShutdown.Send);
[*] }
[*] catch (Exception)
[*] {
[*] // Throw if client has closed, so it is not necessary to catch.
[*] }
[*] finally
[*] {
[*] s.Close();
[*] }
[*] Interlocked.Decrement(ref _clientCount);
[*] _maxAcceptedClients.Release();
[*] _objectPool.Push(e);//SocketAsyncEventArg 对象被释放,压入可重用队列。
[*] }
[*] #endregion
[*]
[*] #region Dispose
[*] /// <summary>
[*] /// Performs application-defined tasks associated with freeing,
[*] /// releasing, or resetting unmanaged resources.
[*] /// </summary>
[*] public void Dispose()
[*] {
[*] Dispose(true);
[*] GC.SuppressFinalize(this);
[*] }
[*]
[*] /// <summary>
[*] /// Releases unmanaged and - optionally - managed resources
[*] /// </summary>
[*] /// <param name="disposing"><c>true</c> to release
[*] /// both managed and unmanaged resources; <c>false</c>
[*] /// to release only unmanaged resources.</param>
[*] protected virtual void Dispose(bool disposing)
[*] {
[*] if (!this.disposed)
[*] {
[*] if (disposing)
[*] {
[*] try
[*] {
[*] Stop();
[*] if (_serverSock != null)
[*] {
[*] _serverSock = null;
[*] }
[*] }
[*] catch (SocketException ex)
[*] {
[*] //TODO 事件
[*] }
[*] }
[*] disposed = true;
[*] }
[*] }
[*] #endregion
[*]
[*] public void Log4Debug(string msg)
[*] {
[*] Console.WriteLine("notice:"+msg);
[*] }
[*]
[*] }
[*]}
BufferManager.cs 这个类是缓存管理类,是采用MSDN上面的例子一样的 地址: https://msdn.microsoft.com/zh-cn/library/bb517542.aspx
SocketAsyncEventArgsPool.cs 这个类也是来自MSDN的 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx
需要的话自己到MSDN网站上去取,我就不贴出来了
服务器端
view plain copy
[*]static void Main(string[] args)
[*] {
[*]
[*] IOCPServer server = new IOCPServer(8088, 1024);
[*] server.Start();
[*] Console.WriteLine("服务器已启动....");
[*] System.Console.ReadLine();
[*] }
客户端
客户端代码也是很简单
view plain copy
[*]static void Main(string[] args)
[*] {
[*] IPAddress remote=IPAddress.Parse("192.168.3.4");
[*] client c = new client(8088,remote);
[*]
[*] c.connect();
[*] Console.WriteLine("服务器连接成功!");
[*] while (true)
[*] {
[*] Console.Write("send>");
[*] string msg=Console.ReadLine();
[*] if (msg == "exit")
[*] break;
[*] c.send(msg);
[*] }
[*] c.disconnect();
[*] Console.ReadLine();
[*] }
client.cs
view plain copy
[*]public class client
[*] {
[*]
[*] public TcpClient _client;
[*]
[*] public int port;
[*]
[*] public IPAddress remote;
[*]
[*] public client(int port,IPAddress remote)
[*] {
[*]
[*] this.port = port;
[*] this.remote = remote;
[*] }
[*]
[*] public void connect()
[*] {
[*] this._client=new TcpClient();
[*] _client.Connect(remote, port);
[*] }
[*] public void disconnect()
[*] {
[*] _client.Close();
[*] }
[*] public void send(string msg)
[*] {
[*] byte[] data=Encoding.Default.GetBytes(msg);
[*] _client.GetStream().Write(data, 0, data.Length);
[*] }
[*] }
IOCPClient类,使用SocketAsyncEventArgs类建立一个Socket客户端。虽然MSDN说这个类特别设计给网络服务器应用,但也没有限制在客户端代码中使用APM。下面给出了IOCPClient类的样例代码:
view plain copy
[*]public class IOCPClient
[*] {
[*] /// <summary>
[*] /// 连接服务器的socket
[*] /// </summary>
[*] private Socket _clientSock;
[*]
[*] /// <summary>
[*] /// 用于服务器执行的互斥同步对象
[*] /// </summary>
[*] private static Mutex mutex = new Mutex();
[*] /// <summary>
[*] /// Socket连接标志
[*] /// </summary>
[*] private Boolean _connected = false;
[*]
[*] private const int ReceiveOperation = 1, SendOperation = 0;
[*]
[*] private static AutoResetEvent[]
[*] autoSendReceiveEvents = new AutoResetEvent[]
[*] {
[*] new AutoResetEvent(false),
[*] new AutoResetEvent(false)
[*] };
[*]
[*] /// <summary>
[*] /// 服务器监听端点
[*] /// </summary>
[*] private IPEndPoint _remoteEndPoint;
[*]
[*] public IOCPClient(IPEndPoint local,IPEndPoint remote)
[*] {
[*] _clientSock = new Socket(local.AddressFamily,SocketType.Stream, ProtocolType.Tcp);
[*] _remoteEndPoint = remote;
[*] }
[*]
[*] #region 连接服务器
[*]
[*] /// <summary>
[*] /// 连接远程服务器
[*] /// </summary>
[*] public void Connect()
[*] {
[*] SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();
[*]
[*] connectArgs.UserToken = _clientSock;
[*] connectArgs.RemoteEndPoint = _remoteEndPoint;
[*] connectArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnConnected);
[*] mutex.WaitOne();
[*] if (!_clientSock.ConnectAsync(connectArgs))//异步连接
[*] {
[*] ProcessConnected(connectArgs);
[*] }
[*]
[*] }
[*] /// <summary>
[*] /// 连接上的事件
[*] /// </summary>
[*] /// <param name="sender"></param>
[*] /// <param name="e"></param>
[*] void OnConnected(object sender, SocketAsyncEventArgs e)
[*] {
[*] mutex.ReleaseMutex();
[*] //设置Socket已连接标志。
[*] _connected = (e.SocketError == SocketError.Success);
[*] }
[*] /// <summary>
[*] /// 处理连接服务器
[*] /// </summary>
[*] /// <param name="e"></param>
[*] private void ProcessConnected(SocketAsyncEventArgs e)
[*] {
[*] //TODO
[*] }
[*]
[*] #endregion
[*]
[*] #region 发送消息
[*] /// <summary>
[*] /// 向服务器发送消息
[*] /// </summary>
[*] /// <param name="data"></param>
[*] public void Send(byte[] data)
[*] {
[*] SocketAsyncEventArgs asyniar = new SocketAsyncEventArgs();
[*] asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendComplete);
[*] asyniar.SetBuffer(data, 0, data.Length);
[*] asyniar.UserToken = _clientSock;
[*] asyniar.RemoteEndPoint = _remoteEndPoint;
[*] autoSendReceiveEvents.WaitOne();
[*] if (!_clientSock.SendAsync(asyniar))//投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件
[*] {
[*] // 同步发送时处理发送完成事件
[*] ProcessSend(asyniar);
[*] }
[*] }
[*]
[*] /// <summary>
[*] /// 发送操作的回调方法
[*] /// </summary>
[*] /// <param name="sender"></param>
[*] /// <param name="e"></param>
[*] private void OnSendComplete(object sender, SocketAsyncEventArgs e)
[*] {
[*] //发出发送完成信号。
[*] autoSendReceiveEvents.Set();
[*] ProcessSend(e);
[*] }
[*]
[*] /// <summary>
[*] /// 发送完成时处理函数
[*] /// </summary>
[*] /// <param name="e">与发送完成操作相关联的SocketAsyncEventArg对象</param>
[*] private void ProcessSend(SocketAsyncEventArgs e)
[*] {
[*] //TODO
[*] }
[*] #endregion
[*]
[*] #region 接收消息
[*] /// <summary>
[*] /// 开始监听服务端数据
[*] /// </summary>
[*] /// <param name="e"></param>
[*] public void StartRecive(SocketAsyncEventArgs e)
[*] {
[*] //准备接收。
[*] Socket s = e.UserToken as Socket;
[*] byte[] receiveBuffer = new byte;
[*] e.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);
[*] e.Completed += new EventHandler<SocketAsyncEventArgs>(OnReceiveComplete);
[*] autoSendReceiveEvents.WaitOne();
[*] if (!s.ReceiveAsync(e))
[*] {
[*] Proce***eceive(e);
[*] }
[*] }
[*]
[*] /// <summary>
[*] /// 接收操作的回调方法
[*] /// </summary>
[*] /// <param name="sender"></param>
[*] /// <param name="e"></param>
[*] private void OnReceiveComplete(object sender, SocketAsyncEventArgs e)
[*] {
[*] //发出接收完成信号。
[*] autoSendReceiveEvents.Set();
[*] Proce***eceive(e);
[*] }
[*]
[*] /// <summary>
[*] ///接收完成时处理函数
[*] /// </summary>
[*] /// <param name="e">与接收完成操作相关联的SocketAsyncEventArg对象</param>
[*] private void Proce***eceive(SocketAsyncEventArgs e)
[*] {
[*] if (e.SocketError == SocketError.Success)
[*] {
[*] // 检查远程主机是否关闭连接
[*] if (e.BytesTransferred > 0)
[*] {
[*] Socket s = (Socket)e.UserToken;
[*] //判断所有需接收的数据是否已经完成
[*] if (s.Available == 0)
[*] {
[*] byte[] data = new byte;
[*] Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//从e.Buffer块中复制数据出来,保证它可重用
[*]
[*] //TODO 处理数据
[*] }
[*]
[*] if (!s.ReceiveAsync(e))//为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件
[*] {
[*] //同步接收时处理接收完成事件
[*] Proce***eceive(e);
[*] }
[*] }
[*] }
[*] }
[*]
[*] #endregion
[*]
[*]
[*] public void Close()
[*] {
[*] _clientSock.Disconnect(false);
[*] }
[*]
[*] /// <summary>
[*] /// 失败时关闭Socket,根据SocketError抛出异常。
[*] /// </summary>
[*] /// <param name="e"></param>
[*]
[*] private void ProcessError(SocketAsyncEventArgs e)
[*] {
[*] Socket s = e.UserToken as Socket;
[*] if (s.Connected)
[*] {
[*] //关闭与客户端关联的Socket
[*] try
[*] {
[*] s.Shutdown(SocketShutdown.Both);
[*] }
[*] catch (Exception)
[*] {
[*] //如果客户端处理已经关闭,抛出异常
[*] }
[*] finally
[*] {
[*] if (s.Connected)
[*] {
[*] s.Close();
[*] }
[*] }
[*] }
[*] //抛出SocketException
[*] throw new SocketException((Int32)e.SocketError);
[*] }
[*]
[*]
[*] /// <summary>
[*] /// 释放SocketClient实例
[*] /// </summary>
[*] public void Dispose()
[*] {
[*] mutex.Close();
[*] autoSendReceiveEvents.Close();
[*] autoSendReceiveEvents.Close();
[*] if (_clientSock.Connected)
[*] {
[*] _clientSock.Close();
[*] }
[*] }
[*]
[*] }
这个类我没有测试,但是理论上是没问题的。
文档来源:51CTO技术博客https://blog.51cto.com/u_11990719/3123598
页:
[1]