• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

C#高性能Socket服务器SocketAsyncEventArgs的实现(IOCP)

c# 搞代码 4年前 (2022-01-09) 43次浏览 已收录 0个评论

原创性申明

本文作者:小竹zz 博客地址:http://www.gaodaima.com/转载请注明出处

引言

我一直在探寻一个高性能的Socket客户端代码。以前,我使用Socket类写了一些基于传统异步编程模型的代码(BeginSend、BeginReceive,等等)也看过很多博客的知识,在linux中有poll和epoll来实现,在windows下面
微软MSDN中也提供了SocketAsyncEventArgs这个类来实现IOCP 地址:http://www.gaodaima.com/
NET Framework中的APM也称为Begin/End模式。这是因为会调用Begin方法来启动异步操作,然后返回一个IAsyncResult 对象。可以选择将一个代理作为参数提供给Begin方法,异步操作完成时会调用该方法。或者,一个线程可以等待 IAsyncResult.AsyncWaitHandle。当回调被调用或发出等待信号时,就会调用End方法来获取异步操作的结果。这种模式很灵活,使用相对简单,在 .NET Framework 中非常常见。
但是,您必须注意,如果进行大量异步套接字操作,是要付出代价的。针对每次操作,都必须创建一个IAsyncResult对象,而且该对象不能被重复使用。由于大量使用对象分配和垃圾收集,这会影响性能。为了解决这个问题,新版本提供了另一个使用套接字上执行异步I/O的方法模式。这种新模式并不要求为每个套接字操作分配操作上下文对象。

代码下载:http://www.gaodaima.com/ 这里的代码优化了的

目标

在上面微软提供的例子我觉得不是很完整,没有具体一个流程,只是受到客户端消息后发送相同内容给客户端,初学者不容易看懂流程,因为我花了一天的时间来实现一个功能齐全的IOCP服务器,

效果如下

代码

首先是ICOPServer.cs 这个类是IOCP服务器的核心类,目前这个类是网络上比较全的代码,MSDN上面的例子都没有我的全

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Net.Sockets;using System.Net;using System.Threading;namespace ServerTest{    /// <summary>    /// IOCP SOCKET服务器    /// </summary>    public class IOCPServer : IDisposable    {        const int opsToPreAlloc = 2;        #region Fields        /// <summary>        /// 服务器程序允许的最大客户端连接数        /// </summary>        private int _maxClient;        /// <summary>        /// 监听Socket,用于接受客户端的连接请求        /// </summary>        private Socket _serverSock;        /// <summary>        /// 当前的连接的客户端数        /// </summary>        private int _clientCount;        /// <summary>        /// 用于每个I/O Socket操作的缓冲区大小        /// </summary>        private int _bufferSize = 1024;        /// <summary>        /// 信号量        /// </summary>        Semaphore _maxAcceptedClients;        /// <summary>        /// 缓冲区管理        /// </summary>        BufferManager _bufferManager;        /// <summary>        /// 对象池        /// </summary>        SocketAsyncEventArgsPool _objectPool;        private bool disposed = false;        #endregion        #region Properties        /// <summary>        /// 服务器是否正在运行        /// </summary>        public bool IsRunning { get; private set; }        /// <summary>        /// 监听的IP地址        /// </summary>        public IPAddress Address { get; private set; }        /// <summary>        /// 监听的端口        /// </summary>        public int Port { get; private set; }        /// <summary>        /// 通信使用的编码        /// </summary>        public Encoding Encoding { get; set; }        #endregion        #region Ctors        /// <summary>        /// 异步IOCP SOCKET服务器        /// </summary>        /// <param name="listenPort">监听的端口</param>        /// <param name="maxClient">最大的客户端数量</param>        public IOCPServer(int listenPort,int maxClient)            : this(IPAddress.Any, listenPort, maxClient)        {        }        /// <summary>        /// 异步Socket TCP服务器        /// </summary>        /// <param name="localEP">监听的终结点</param>        /// <param name="maxClient">最大客户端数量</param>        public IOCPServer(IPEndPoint localEP, int maxClient)            : this(localEP.Address, localEP.Port,maxClient)        {        }        /// <summary>        /// 异步Socket TCP服务器        /// </summary>        /// <param name="localIPAddress">监听的IP地址</param>        /// <param name="listenPort">监听的端口</param>        /// <param name="maxClient">最大客户端数量</param>        public IOCPServer(IPAddress localIPAddress, int listenPort, int maxClient)        {            this.Address = localIPAddress;            this.Port = listenPort;            this.Encoding = Encoding.Default;            _maxClient = maxClient;            _serverSock = new Socket(localIPAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);            _bufferManager = new BufferManager(_bufferSize * _maxClient * opsToPreAlloc,_bufferSize);            _objectPool = new SocketAsyncEventArgsPool(_maxClient);            _maxAcceptedClients = new Semaphore(_maxClient, _maxClient);         }        #endregion        #region 初始化        /// <summary>        /// 初始化函数        /// </summary>        public void Init()        {            // Allocates one large byte buffer which all I/O operations use a piece of.  This gaurds             // against memory fragmentation            _bufferManager.InitBuffer();            // preallocate pool of SocketAsyncEventArgs objects            SocketAsyncEventArgs readWriteEventArg;            for (int i = 0; i < _maxClient; i++)            {                //Pre-allocate a set of reusable SocketAsyncEventArgs                readWriteEventArg = new SocketAsyncEventArgs();                readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);                readWriteEventArg.UserToken = null;                // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object                _bufferManager.SetBuffer(readWriteEventArg);                // add SocketAsyncEventArg to the pool                _objectPool.Push(readWriteEventArg);            }        }        #endregion        #region Start       <div>本文来源gaodai.ma#com搞##代!^码7网</div> /// <summary>        /// 启动        /// </summary>        public void Start()        {            if (!IsRunning)            {                Init();                IsRunning = true;                IPEndPoint localEndPoint = new IPEndPoint(Address, Port);                // 创建监听socket                _serverSock = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);                //_serverSock.ReceiveBufferSize = _bufferSize;                //_serverSock.SendBufferSize = _bufferSize;                if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6)                {                    // 配置监听socket为 dual-mode (IPv4 & IPv6)                     // 27 is equivalent to IPV6_V6ONLY socket option in the winsock snippet below,                    _serverSock.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false);                    _serverSock.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port));                }                else                {                    _serverSock.Bind(localEndPoint);                }                // 开始监听                _serverSock.Listen(this._maxClient);                // 在监听Socket上投递一个接受请求。                StartAccept(null);            }        }        #endregion        #region Stop        /// <summary>        /// 停止服务        /// </summary>        public void Stop()        {            if (IsRunning)            {                IsRunning = false;                _serverSock.Close();                //TODO 关闭对所有客户端的连接            }        }        #endregion        #region Accept        /// <summary>        /// 从客户端开始接受一个连接操作        /// </summary>        private void StartAccept(SocketAsyncEventArgs asyniar)        {            if (asyniar == null)            {                asyniar = new SocketAsyncEventArgs();                asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);            }            else            {                //socket must be cleared since the context object is being reused                asyniar.AcceptSocket = null;            }            _maxAcceptedClients.WaitOne();            if (!_serverSock.AcceptAsync(asyniar))            {                ProcessAccept(asyniar);                //如果I/O挂起等待异步则触发AcceptAsyn_Asyn_Completed事件                //此时I/O操作同步完成,不会触发Asyn_Completed事件,所以指定BeginAccept()方法            }        }        /// <summary>        /// accept 操作完成时回调函数        /// </summary>        /// <param name="sender">Object who raised the event.</param>        /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param>        private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e)        {            ProcessAccept(e);        }        /// <summary>        /// 监听Socket接受处理        /// </summary>        /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param>        private void ProcessAccept(SocketAsyncEventArgs e)        {            if (e.SocketError == SocketError.Success)            {                Socket s = e.AcceptSocket;//和客户端关联的socket                if (s.Connected)                {                    try                    {                                                Interlocked.Increment(ref _clientCount);//原子操作加1                        SocketAsyncEventArgs asyniar = _objectPool.Pop();                        asyniar.UserToken = s;                        Log4Debug(String.Format("客户 {0} 连入, 共有 {1} 个连接。", s.RemoteEndPoint.ToString(), _clientCount));                                                if (!s.ReceiveAsync(asyniar))//投递接收请求                        {                            ProcessReceive(asyniar);                        }                    }                    catch (SocketException ex)                    {                        Log4Debug(String.Format("接收客户 {0} 数据出错, 异常信息: {1} 。", s.RemoteEndPoint, ex.ToString()));                        //TODO 异常处理                    }                    //投递下一个接受请求                    StartAccept(e);                }            }        }        #endregion        #region 发送数据        /// <summary>        /// 异步的发送数据        /// </summary>        /// <param name="e"></param>        /// <param name="data"></param>        public void Send(SocketAsyncEventArgs e, byte[] data)        {            if (e.SocketError == SocketError.Success)            {                Socket s = e.AcceptSocket;//和客户端关联的socket                if (s.Connected)                {                    Array.Copy(data, 0, e.Buffer, 0, data.Length);//设置发送数据                    //e.SetBuffer(data, 0, data.Length); //设置发送数据                    if (!s.SendAsync(e))//投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件                    {                        // 同步发送时处理发送完成事件                        ProcessSend(e);                    }                    else                    {                        CloseClientSocket(e);                    }                }            }        }        /// <summary>        /// 同步的使用socket发送数据        /// </summary>        /// <param name="socket"></param>        /// <param name="buffer"></param>        /// <param name="offset"></param>        /// <param name="size"></param>        /// <param name="timeout"></param>        public void Send(Socket socket, byte[] buffer, int offset, int size, int timeout)        {            socket.SendTimeout = 0;            int startTickCount = Environment.TickCount;            int sent = 0; // how many bytes is already sent            do            {                if (Environment.TickCount > startTickCount + timeout)                {                    //throw new Exception("Timeout.");                }                try                {                    sent += socket.Send(buffer, offset + sent, size - sent, SocketFlags.None);                }                catch (SocketException ex)                {                    if (ex.SocketErrorCode == SocketError.WouldBlock ||                    ex.SocketErrorCode == SocketError.IOPending ||                    ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable)                    {                        // socket buffer is probably full, wait and try again                        Thread.Sleep(30);                    }                    else                    {                        throw ex; // any serious error occurr                    }                }            } while (sent < size);        }        /// <summary>        /// 发送完成时处理函数        /// </summary>        /// <param name="e">与发送完成操作相关联的SocketAsyncEventArg对象</param>        private void ProcessSend(SocketAsyncEventArgs e)        {            if (e.SocketError == SocketError.Success)            {                Socket s = (Socket)e.UserToken;                //TODO            }            else            {                CloseClientSocket(e);            }        }        #endregion        #region 接收数据        /// <summary>        ///接收完成时处理函数        /// </summary>        /// <param name="e">与接收完成操作相关联的SocketAsyncEventArg对象</param>        private void ProcessReceive(SocketAsyncEventArgs e)        {            if (e.SocketError == SocketError.Success)//if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)            {                // 检查远程主机是否关闭连接                if (e.BytesTransferred > 0)                {                    Socket s = (Socket)e.UserToken;                    //判断所有需接收的数据是否已经完成                    if (s.Available == 0)                    {                        //从侦听者获取接收到的消息。                         //String received = Encoding.ASCII.GetString(e.Buffer, e.Offset, e.BytesTransferred);                        //echo the data received back to the client                        //e.SetBuffer(e.Offset, e.BytesTransferred);                        byte[] data = new byte[e.BytesTransferred];                        Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//从e.Buffer块中复制数据出来,保证它可重用                        string info=Encoding.Default.GetString(data);                        Log4Debug(String.Format("收到 {0} 数据为 {1}",s.RemoteEndPoint.ToString(),info));                        //TODO 处理数据                        //增加服务器接收的总字节数。                    }                    if (!s.ReceiveAsync(e))//为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件                    {                        //同步接收时处理接收完成事件                        ProcessReceive(e);                    }                }            }            else            {                CloseClientSocket(e);            }        }        #endregion        #region 回调函数        /// <summary>        /// 当Socket上的发送或接收请求被完成时,调用此函数        /// </summary>        /// <param name="sender">激发事件的对象</param>        /// <param name="e">与发送或接收完成操作相关联的SocketAsyncEventArg对象</param>        private void OnIOCompleted(object sender, SocketAsyncEventArgs e)        {            // Determine which type of operation just completed and call the associated handler.            switch (e.LastOperation)            {                case SocketAsyncOperation.Accept:                    ProcessAccept(e);                    break;                case SocketAsyncOperation.Receive:                    ProcessReceive(e);                    break;                default:                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");            }        }        #endregion        #region Close        /// <summary>        /// 关闭socket连接        /// </summary>        /// <param name="e">SocketAsyncEventArg associated with the completed send/receive operation.</param>        private void CloseClientSocket(SocketAsyncEventArgs e)        {            Log4Debug(String.Format("客户 {0} 断开连接!",((Socket)e.UserToken).RemoteEndPoint.ToString()));            Socket s = e.UserToken as Socket;            CloseClientSocket(s, e);        }        /// <summary>        /// 关闭socket连接        /// </summary>        /// <param name="s"></param>        /// <param name="e"></param>        private void CloseClientSocket(Socket s, SocketAsyncEventArgs e)        {            try            {                s.Shutdown(SocketShutdown.Send);            }            catch (Exception)            {                // Throw if client has closed, so it is not necessary to catch.            }            finally            {                s.Close();            }            Interlocked.Decrement(ref _clientCount);            _maxAcceptedClients.Release();            _objectPool.Push(e);//SocketAsyncEventArg 对象被释放,压入可重用队列。        }        #endregion        #region Dispose        /// <summary>        /// Performs application-defined tasks associated with freeing,         /// releasing, or resetting unmanaged resources.        /// </summary>        public void Dispose()        {            Dispose(true);            GC.SuppressFinalize(this);        }        /// <summary>        /// Releases unmanaged and - optionally - managed resources        /// </summary>        /// <param name="disposing"><c>true</c> to release         /// both managed and unmanaged resources; <c>false</c>         /// to release only unmanaged resources.</param>        protected virtual void Dispose(bool disposing)        {            if (!this.disposed)            {                if (disposing)                {                    try                    {                        Stop();                        if (_serverSock != null)                        {                            _serverSock = null;                        }                    }                    catch (SocketException ex)                    {                        //TODO 事件                    }                }                disposed = true;            }        }        #endregion        public void Log4Debug(string msg)        {            Console.WriteLine("notice:"+msg);        }    }}

BufferManager.cs 这个类是缓存管理类,是采用MSDN上面的例子一样的 地址: http://www.gaodaima.com/

SocketAsyncEventArgsPool.cs 这个类也是来自MSDN的 地址:http://www.gaodaima.com/

需要的话自己到MSDN网站上去取,我就不贴出来了

服务器端

static void Main(string[] args)        {            IOCPServer server = new IOCPServer(8088, 1024);            server.Start();            Console.WriteLine("服务器已启动....");            System.Console.ReadLine();        }

客户端

客户端代码也是很简单

static void Main(string[] args)        {            IPAddress remote=IPAddress.Parse("192.168.3.4");            client c = new client(8088,remote);            c.connect();            Console.WriteLine("服务器连接成功!");            while (true)            {                Console.Write("send>");                string msg=Console.ReadLine();                if (msg == "exit")                    break;                c.send(msg);            }            c.disconnect();            Console.ReadLine();        }

client.cs

public class client    {        public TcpClient _client;        public int port;        public IPAddress remote;        public client(int port,IPAddress remote)        {            this.port = port;            this.remote = remote;        }        public void connect()        {            this._client=new TcpClient();            _client.Connect(remote, port);        }        public void disconnect()        {            _client.Close();        }        public void send(string msg)        {            byte[] data=Encoding.Default.GetBytes(msg);            _client.GetStream().Write(data, 0, data.Length);        }    }

IOCPClient类,使用SocketAsyncEventArgs类建立一个Socket客户端。虽然MSDN说这个类特别设计给网络服务器应用,但也没有限制在客户端代码中使用APM。下面给出了IOCPClient类的样例代码:

 public class IOCPClient    {        /// <summary>        /// 连接服务器的socket        /// </summary>        private Socket _clientSock;        /// <summary>        /// 用于服务器执行的互斥同步对象        /// </summary>        private static Mutex mutex = new Mutex();        /// <summary>        /// Socket连接标志        /// </summary>        private Boolean _connected = false;        private const int ReceiveOperation = 1, SendOperation = 0;        private static AutoResetEvent[]                 autoSendReceiveEvents = new AutoResetEvent[]         {             new AutoResetEvent(false),             new AutoResetEvent(false)         };        /// <summary>        /// 服务器监听端点        /// </summary>        private IPEndPoint _remoteEndPoint;        public IOCPClient(IPEndPoint local,IPEndPoint remote)        {            _clientSock = new Socket(local.AddressFamily,SocketType.Stream, ProtocolType.Tcp);            _remoteEndPoint = remote;        }        #region 连接服务器        /// <summary>        /// 连接远程服务器        /// </summary>        public void Connect()        {            SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();            connectArgs.UserToken = _clientSock;            connectArgs.RemoteEndPoint = _remoteEndPoint;            connectArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnConnected);            mutex.WaitOne();            if (!_clientSock.ConnectAsync(connectArgs))//异步连接            {                ProcessConnected(connectArgs);            }                    }        /// <summary>        /// 连接上的事件        /// </summary>        /// <param name="sender"></param>        /// <param name="e"></param>        void OnConnected(object sender, SocketAsyncEventArgs e)        {            mutex.ReleaseMutex();            //设置Socket已连接标志。             _connected = (e.SocketError == SocketError.Success);        }        /// <summary>        /// 处理连接服务器        /// </summary>        /// <param name="e"></param>        private void ProcessConnected(SocketAsyncEventArgs e)        {            //TODO        }        #endregion        #region 发送消息        /// <summary>        /// 向服务器发送消息        /// </summary>        /// <param name="data"></param>        public void Send(byte[] data)        {            SocketAsyncEventArgs asyniar = new SocketAsyncEventArgs();            asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendComplete);            asyniar.SetBuffer(data, 0, data.Length);            asyniar.UserToken = _clientSock;            asyniar.RemoteEndPoint = _remoteEndPoint;            autoSendReceiveEvents[SendOperation].WaitOne();            if (!_clientSock.SendAsync(asyniar))//投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件            {                // 同步发送时处理发送完成事件                ProcessSend(asyniar);            }        }        /// <summary>        /// 发送操作的回调方法        /// </summary>        /// <param name="sender"></param>        /// <param name="e"></param>        private void OnSendComplete(object sender, SocketAsyncEventArgs e)        {            //发出发送完成信号。             autoSendReceiveEvents[SendOperation].Set();            ProcessSend(e);        }        /// <summary>        /// 发送完成时处理函数        /// </summary>        /// <param name="e">与发送完成操作相关联的SocketAsyncEventArg对象</param>        private void ProcessSend(SocketAsyncEventArgs e)        {            //TODO        }        #endregion        #region 接收消息        /// <summary>        /// 开始监听服务端数据        /// </summary>        /// <param name="e"></param>        public void StartRecive(SocketAsyncEventArgs e)        {            //准备接收。             Socket s = e.UserToken as Socket;            byte[] receiveBuffer = new byte[255];            e.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);            e.Completed += new EventHandler<SocketAsyncEventArgs>(OnReceiveComplete);            autoSendReceiveEvents[ReceiveOperation].WaitOne();            if (!s.ReceiveAsync(e))            {                ProcessReceive(e);            }        }        /// <summary>        /// 接收操作的回调方法        /// </summary>        /// <param name="sender"></param>        /// <param name="e"></param>        private void OnReceiveComplete(object sender, SocketAsyncEventArgs e)        {            //发出接收完成信号。             autoSendReceiveEvents[ReceiveOperation].Set();            ProcessReceive(e);        }        /// <summary>        ///接收完成时处理函数        /// </summary>        /// <param name="e">与接收完成操作相关联的SocketAsyncEventArg对象</param>        private void ProcessReceive(SocketAsyncEventArgs e)        {            if (e.SocketError == SocketError.Success)            {                // 检查远程主机是否关闭连接                if (e.BytesTransferred > 0)                {                    Socket s = (Socket)e.UserToken;                    //判断所有需接收的数据是否已经完成                    if (s.Available == 0)                    {                        byte[] data = new byte[e.BytesTransferred];                        Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//从e.Buffer块中复制数据出来,保证它可重用                        //TODO 处理数据                    }                    if (!s.ReceiveAsync(e))//为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件                    {                        //同步接收时处理接收完成事件                        ProcessReceive(e);                    }                }            }        }        #endregion        public void Close()        {            _clientSock.Disconnect(false);        }        /// <summary>        /// 失败时关闭Socket,根据SocketError抛出异常。        /// </summary>        /// <param name="e"></param>         private void ProcessError(SocketAsyncEventArgs e)        {            Socket s = e.UserToken as Socket;            if (s.Connected)            {                //关闭与客户端关联的Socket                try                {                    s.Shutdown(SocketShutdown.Both);                }                catch (Exception)                {                    //如果客户端处理已经关闭,抛出异常                 }                finally                {                    if (s.Connected)                    {                        s.Close();                    }                }            }            //抛出SocketException             throw new SocketException((Int32)e.SocketError);        }        /// <summary>        /// 释放SocketClient实例        /// </summary>        public void Dispose()        {            mutex.Close();            autoSendReceiveEvents[SendOperation].Close();            autoSendReceiveEvents[ReceiveOperation].Close();            if (_clientSock.Connected)            {                _clientSock.Close();            }        }    }

这个类我没有测试,但是理论上是没问题的。

以上就是C#,SocketAsyncEventArgs,服务器的内容,更多相关内容请关注搞代码(www.gaodaima.com)!


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:C#高性能Socket服务器SocketAsyncEventArgs的实现(IOCP)

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址