评论

收藏

[C++] C#SocketAsyncEventArgs实现高效能多并发TCPSocket通信 (服务器实现)

编程语言 编程语言 发布于:2021-07-18 10:42 | 阅读数:532 | 评论:0

http://freshflower.iteye.com/blog/2285272
想着当初到处找不到相关资料来实现.net的Socket通信的痛苦与心酸, 于是将自己写的代码公布给大家, 让大家少走点弯路, 以供参考. 若是觉得文中的思路有哪里不正确的地方, 欢迎大家指正, 共同进步.
说到Socket通信, 必须要有个服务端, 打开一个端口进行监听(废话!) 可能大家都会把socket.Accept方法放在一个while(true)的循环里, 当然也没有错, 但个人认为这个不科学, 极大可能地占用服务资源. 赞成的请举手. 所以我想从另外一个方面解决这个问题. 之后是在MSDN找到SocketAsyncEventArgs的一个实例, 然后拿来改改, 有需要的同学可以看看MSDN的官方实例.https://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx
需要了解客户端写法的, 请参考: 客户端实现http://freshflower.iteye.com/blog/2285286
不多说, 接下来贴代码, 这个实例中需要用到几个类:
1. BufferManager类, 管理传输流的大小  原封不动地拷贝过来,
2. SocketEventPool类: 管理SocketAsyncEventArgs的一个应用池. 有效地重复使用.
3. AsyncUserToken类: 这个可以根据自己的实际情况来定义.主要作用就是存储客户端的信息.
4. SocketManager类: 核心,实现Socket监听,收发信息等操作.
BufferManager类
C#代码
using System;  
using System.Collections.Generic;  
using System.Linq;  
using System.Net.Sockets;  
using System.Text;  
  
namespace Plates.Service  
{  
  class BufferManager  
  {  
    int m_numBytes;         // the total number of bytes controlled by the buffer pool  
    byte[] m_buffer;        // the underlying byte array maintained by the Buffer Manager  
    Stack<int> m_freeIndexPool;   //   
    int m_currentIndex;  
    int m_bufferSize;  
  
    public BufferManager(int totalBytes, int bufferSize)  
    {  
      m_numBytes = totalBytes;  
      m_currentIndex = 0;  
      m_bufferSize = bufferSize;  
      m_freeIndexPool = new Stack<int>();  
    }  
  
    // Allocates buffer space used by the buffer pool  
    public void InitBuffer()  
    {  
      // create one big large buffer and divide that   
      // out to each SocketAsyncEventArg object  
      m_buffer = new byte[m_numBytes];  
    }  
  
    // Assigns a buffer from the buffer pool to the   
    // specified SocketAsyncEventArgs object  
    //  
    // <returns>true if the buffer was successfully set, else false</returns>  
    public bool SetBuffer(SocketAsyncEventArgs args)  
    {  
  
      if (m_freeIndexPool.Count > 0)  
      {  
        args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);  
      }  
      else  
      {  
        if ((m_numBytes - m_bufferSize) < m_currentIndex)  
        {  
          return false;  
        }  
        args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);  
        m_currentIndex += m_bufferSize;  
      }  
      return true;  
    }  
  
    // Removes the buffer from a SocketAsyncEventArg object.  
    // This frees the buffer back to the buffer pool  
    public void FreeBuffer(SocketAsyncEventArgs args)  
    {  
      m_freeIndexPool.Push(args.Offset);  
      args.SetBuffer(null, 0, 0);  
    }  
  }  
}
SocketEventPool类:
C#代码   DSC0000.png
using System;  
using System.Collections.Generic;  
using System.Linq;  
using System.Net.Sockets;  
using System.Text;  
  
namespace Plates.Service  
{  
  class SocketEventPool  
  {  
    Stack<SocketAsyncEventArgs> m_pool;  
  
  
    public SocketEventPool(int capacity)  
    {  
      m_pool = new Stack<SocketAsyncEventArgs>(capacity);  
    }  
  
    public void Push(SocketAsyncEventArgs item)  
    {  
      if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }  
      lock (m_pool)  
      {  
        m_pool.Push(item);  
      }  
    }  
  
    // Removes a SocketAsyncEventArgs instance from the pool  
    // and returns the object removed from the pool  
    public SocketAsyncEventArgs Pop()  
    {  
      lock (m_pool)  
      {  
        return m_pool.Pop();  
      }  
    }  
  
    // The number of SocketAsyncEventArgs instances in the pool  
    public int Count  
    {  
      get { return m_pool.Count; }  
    }  
  
    public void Clear()  
    {  
      m_pool.Clear();  
    }  
  }  
}
AsyncUserToken类
C#代码  
using System;  
using System.Collections;  
using System.Collections.Generic;  
using System.Linq;  
using System.Net;  
using System.Net.Sockets;  
using System.Text;  
  
namespace Plates.Service  
{  
  class AsyncUserToken  
  {  
    /// <summary>  
    /// 客户端IP地址  
    /// </summary>  
    public IPAddress IPAddress { get; set; }  
  
    /// <summary>  
    /// 远程地址  
    /// </summary>  
    public EndPoint Remote { get; set; }  
  
    /// <summary>  
    /// 通信SOKET  
    /// </summary>  
    public Socket Socket { get; set; }  
  
    /// <summary>  
    /// 连接时间  
    /// </summary>  
    public DateTime ConnectTime { get; set; }  
  
    /// <summary>  
    /// 所属用户信息  
    /// </summary>  
    public UserInfoModel UserInfo { get; set; }  
  
  
    /// <summary>  
    /// 数据缓存区  
    /// </summary>  
    public List<byte> Buffer { get; set; }  
  
  
    public AsyncUserToken()  
    {  
      this.Buffer = new List<byte>();  
    }  
  }  
}

SocketManager类
C#代码  
using Plates.Common;  
using System;  
using System.Collections;  
using System.Collections.Generic;  
using System.Linq;  
using System.Net;  
using System.Net.Sockets;  
using System.Text;  
using System.Threading;  
  
namespace Plates.Service  
{  
  class SocketManager  
  {  
  
    private int m_maxConnectNum;  //最大连接数  
    private int m_revBufferSize;  //最大接收字节数  
    BufferManager m_bufferManager;  
    const int opsToAlloc = 2;  
    Socket listenSocket;      //监听Socket  
    SocketEventPool m_pool;  
    int m_clientCount;        //连接的客户端数量  
    Semaphore m_maxNumberAcceptedClients;  
  
    List<AsyncUserToken> m_clients; //客户端列表  
 
    #region 定义委托  
  
    /// <summary>  
    /// 客户端连接数量变化时触发  
    /// </summary>  
    /// <param name="num">当前增加客户的个数(用户退出时为负数,增加时为正数,一般为1)</param>  
    /// <param name="token">增加用户的信息</param>  
    public delegate void OnClientNumberChange(int num, AsyncUserToken token);  
  
    /// <summary>  
    /// 接收到客户端的数据  
    /// </summary>  
    /// <param name="token">客户端</param>  
    /// <param name="buff">客户端数据</param>  
    public delegate void OnReceiveData(AsyncUserToken token, byte[] buff);  
 
    #endregion  
 
    #region 定义事件  
    /// <summary>  
    /// 客户端连接数量变化事件  
    /// </summary>  
    public event OnClientNumberChange ClientNumberChange;  
  
    /// <summary>  
    /// 接收到客户端的数据事件  
    /// </summary>  
    public event OnReceiveData ReceiveClientData;  
 
 
    #endregion  
 
    #region 定义属性  
  
    /// <summary>  
    /// 获取客户端列表  
    /// </summary>  
    public List<AsyncUserToken> ClientList { get { return m_clients; } }  
 
    #endregion  
  
    /// <summary>  
    /// 构造函数  
    /// </summary>  
    /// <param name="numConnections">最大连接数</param>  
    /// <param name="receiveBufferSize">缓存区大小</param>  
    public SocketManager(int numConnections, int receiveBufferSize)  
    {  
      m_clientCount = 0;  
      m_maxConnectNum = numConnections;  
      m_revBufferSize = receiveBufferSize;  
      // allocate buffers such that the maximum number of sockets can have one outstanding read and   
      //write posted to the socket simultaneously  
      m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize);  
  
      m_pool = new SocketEventPool(numConnections);  
      m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);  
    }  
  
    /// <summary>  
    /// 初始化  
    /// </summary>  
    public void Init()  
    {  
      // Allocates one large byte buffer which all I/O operations use a piece of.  This gaurds   
      // against memory fragmentation  
      m_bufferManager.InitBuffer();  
      m_clients = new List<AsyncUserToken>();  
      // preallocate pool of SocketAsyncEventArgs objects  
      SocketAsyncEventArgs readWriteEventArg;  
  
      for (int i = 0; i < m_maxConnectNum; i++)  
      {
readWriteEventArg = new SocketAsyncEventArgs();  
        readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);  
        readWriteEventArg.UserToken = new AsyncUserToken();  
  
        // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object  
        m_bufferManager.SetBuffer(readWriteEventArg);  
        // add SocketAsyncEventArg to the pool  
        m_pool.Push(readWriteEventArg);  
      }  
    }  
  
  
    /// <summary>  
    /// 启动服务  
    /// </summary>  
    /// <param name="localEndPoint"></param>  
    public bool Start(IPEndPoint localEndPoint)  
    {  
      try  
      {  
        m_clients.Clear();  
        listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);  
        listenSocket.Bind(localEndPoint);  
        // start the server with a listen backlog of 100 connections  
        listenSocket.Listen(m_maxConnectNum);  
        // post accepts on the listening socket  
        StartAccept(null);  
        return true;  
      }  
      catch (Exception)  
      {  
        return false;  
      }  
    }  
  
    /// <summary>  
    /// 停止服务  
    /// </summary>  
    public void Stop()  
    {  
      foreach (AsyncUserToken token in m_clients)  
      {  
        try  
        {  
          token.Socket.Shutdown(SocketShutdown.Both);  
        }  
        catch (Exception) { }  
      }  
      try  
      {  
        listenSocket.Shutdown(SocketShutdown.Both);  
      }  
      catch (Exception) { }  
  
      listenSocket.Close();  
      int c_count = m_clients.Count;  
      lock (m_clients) { m_clients.Clear(); }  
  
      if (ClientNumberChange != null)  
        ClientNumberChange(-c_count, null);  
    }  
  
  
    public void CloseClient(AsyncUserToken token)  
    {  
      try  
      {  
        token.Socket.Shutdown(SocketShutdown.Both);  
      }  
      catch (Exception) { }  
    }  
  
  
    // Begins an operation to accept a connection request from the client   
    //  
    // <param name="acceptEventArg">The context object to use when issuing   
    // the accept operation on the server's listening socket</param>  
    public void StartAccept(SocketAsyncEventArgs acceptEventArg)  
    {  
      if (acceptEventArg == null)  
      {  
        acceptEventArg = new SocketAsyncEventArgs();  
        acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);  
      }  
      else  
      {  
        // socket must be cleared since the context object is being reused  
        acceptEventArg.AcceptSocket = null;  
      }  
  
      m_maxNumberAcceptedClients.WaitOne();  
      if (!listenSocket.AcceptAsync(acceptEventArg))  
      {  
        ProcessAccept(acceptEventArg);  
      }  
    }  
  
    // This method is the callback method associated with Socket.AcceptAsync   
    // operations and is invoked when an accept operation is complete  
    //  
    void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)  
    {  
      ProcessAccept(e);  
    }  
  
    private void ProcessAccept(SocketAsyncEventArgs e)  
    {  
      try  
      {  
        Interlocked.Increment(ref m_clientCount);  
        // Get the socket for the accepted client connection and put it into the   
        //ReadEventArg object user token  
        SocketAsyncEventArgs readEventArgs = m_pool.Pop();  
        AsyncUserToken userToken = (AsyncUserToken)readEventArgs.UserToken;  
        userToken.Socket = e.AcceptSocket;  
        userToken.ConnectTime = DateTime.Now;  
        userToken.Remote = e.AcceptSocket.RemoteEndPoint;  
        userToken.IPAddress = ((IPEndPoint)(e.AcceptSocket.RemoteEndPoint)).Address;  
  
        lock (m_clients) { m_clients.Add(userToken); }  
  
        if (ClientNumberChange != null)  
          ClientNumberChange(1, userToken);  
        if (!e.AcceptSocket.ReceiveAsync(readEventArgs))  
        {  
          Proce***eceive(readEventArgs);  
        }  
      }  
      catch (Exception me)  
      {  
        RuncomLib.Log.LogUtils.Info(me.Message + "\r\n" + me.StackTrace);  
      }  
  
      // Accept the next connection request  
      if (e.SocketError == SocketError.OperationAborted) return;  
      StartAccept(e);  
    }  
  
  
    void IO_Completed(object sender, SocketAsyncEventArgs e)  
    {  
      // determine which type of operation just completed and call the associated handler  
      switch (e.LastOperation)  
      {  
        case SocketAsyncOperation.Receive:  
          Proce***eceive(e);  
          break;  
        case SocketAsyncOperation.Send:  
          ProcessSend(e);  
          break;  
        default:  
          throw new ArgumentException("The last operation completed on the socket was not a receive or send");  
      }  
  
    }  
  
  
    // This method is invoked when an asynchronous receive operation completes.   
    // If the remote host closed the connection, then the socket is closed.  
    // If data was received then the data is echoed back to the client.  
    //  
    private void Proce***eceive(SocketAsyncEventArgs e)  
    {  
      try  
      {  
        // check if the remote host closed the connection  
        AsyncUserToken token = (AsyncUserToken)e.UserToken;  
        if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)  
        {  
          //读取数据  
          byte[] data = new byte[e.BytesTransferred];  
          Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);  
          lock (token.Buffer)  
          {  
            token.Buffer.AddRange(data);  
          }  
          //注意:你一定会问,这里为什么要用do-while循环?   
          //如果当客户发送大数据流的时候,e.BytesTransferred的大小就会比客户端发送过来的要小,  
          //需要分多次接收.所以收到包的时候,先判断包头的大小.够一个完整的包再处理.  
          //如果客户短时间内发送多个小数据包时, 服务器可能会一次性把他们全收了.  
          //这样如果没有一个循环来控制,那么只会处理第一个包,  
          //剩下的包全部留在token.Buffer中了,只有等下一个数据包过来后,才会放出一个来.  
          do  
          {  
            //判断包的长度  
            byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray();  
            int packageLen = BitConverter.ToInt32(lenBytes, 0);  
            if (packageLen > token.Buffer.Count - 4)  
            {   //长度不够时,退出循环,让程序继续接收  
              break;  
            }  
  
            //包够长时,则提取出来,交给后面的程序去处理  
            byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray();  
            //从数据池中移除这组数据  
            lock (token.Buffer)  
            {  
              token.Buffer.RemoveRange(0, packageLen + 4);  
            }  
            //将数据包交给后台处理,这里你也可以新开个线程来处理.加快速度.  
            if(ReceiveClientData != null)  
              ReceiveClientData(token, rev);  
            //这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收.  
            //若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了.  
          } while (token.Buffer.Count > 4);  
  
          //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明  
          if (!token.Socket.ReceiveAsync(e))  
            this.Proce***eceive(e);  
        }  
        else  
        {  
          CloseClientSocket(e);  
        }  
      }  
      catch (Exception xe)  
      {  
        RuncomLib.Log.LogUtils.Info(xe.Message + "\r\n" + xe.StackTrace);  
      }  
    }  
  
    // This method is invoked when an asynchronous send operation completes.  
    // The method issues another receive on the socket to read any additional   
    // data sent from the client  
    //  
    // <param name="e"></param>  
    private void ProcessSend(SocketAsyncEventArgs e)  
    {  
      if (e.SocketError == SocketError.Success)  
      {  
        // done echoing data back to the client  
        AsyncUserToken token = (AsyncUserToken)e.UserToken;  
        // read the next block of data send from the client  
        bool willRaiseEvent = token.Socket.ReceiveAsync(e);  
        if (!willRaiseEvent)  
        {  
          Proce***eceive(e);  
        }  
      }  
      else  
      {  
        CloseClientSocket(e);  
      }  
    }  
  
    //关闭客户端  
    private void CloseClientSocket(SocketAsyncEventArgs e)  
    {  
      AsyncUserToken token = e.UserToken as AsyncUserToken;  
  
      lock (m_clients) { m_clients.Remove(token); }  
      //如果有事件,则调用事件,发送客户端数量变化通知  
      if (ClientNumberChange != null)  
        ClientNumberChange(-1, token);  
      // close the socket associated with the client  
      try  
      {  
        token.Socket.Shutdown(SocketShutdown.Send);  
      }  
      catch (Exception) { }  
      token.Socket.Close();  
      // decrement the counter keeping track of the total number of clients connected to the server  
      Interlocked.Decrement(ref m_clientCount);  
      m_maxNumberAcceptedClients.Release();  
      // Free the SocketAsyncEventArg so they can be reused by another client  
      e.UserToken = new AsyncUserToken();  
      m_pool.Push(e);  
    }  
  
  
  
    /// <summary>  
    /// 对数据进行打包,然后再发送  
    /// </summary>  
    /// <param name="token"></param>  
    /// <param name="message"></param>  
    /// <returns></returns>  
    public void SendMessage(AsyncUserToken token, byte[] message)  
    {  
      if (token == null || token.Socket == null || !token.Socket.Connected)  
        return;  
      try  
      {  
        //对要发送的消息,制定简单协议,头4字节指定包的大小,方便客户端接收(协议可以自己定)  
        byte[] buff = new byte[message.Length + 4];  
        byte[] len = BitConverter.GetBytes(message.Length);  
        Array.Copy(len, buff, 4);  
        Array.Copy(message, 0, buff, 4, message.Length);  
        //token.Socket.Send(buff);  //这句也可以发送, 可根据自己的需要来选择  
        //新建异步发送对象, 发送消息  
        SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();  
        sendArg.UserToken = token;  
        sendArg.SetBuffer(buff, 0, buff.Length);  //将数据放置进去.  
        token.Socket.SendAsync(sendArg);  
      }  
      catch (Exception e){  
        RuncomLib.Log.LogUtils.Info("SendMessage - Error:" + e.Message);  
      }  
    }  
  }  
}
调用方法:
C#代码  
SocketManager m_socket = new SocketManager(200, 1024);  
m_socket.Init();  
m_socket.Start(new IPEndPoint(IPAddress.Any, 13909));
好了,大功告成, 当初自己在写这些代码的时候, 一个地方就卡上很久, 烧香拜菩萨都没有用, 只能凭网上零星的一点代码给点提示. 现在算是做个总结吧. 让大家一看就明白, Socket通信就是这样, 可简单可复杂.
上面说的是服务器,那客户端的请参考
C#如何利用SocketAsyncEventArgs实现高效能TCPSocket通信 (客户端实现)

关注下面的标签,发现更多相似文章