转载

Netty框架的C#版本

博客概述

我是java线的工程师,但是技术栈有点全栈的意思。在某次项目中需要用到c#的socket通讯,查询之后惊喜的发现c#也有netty框架。

dotnetty,github托管地址: https://github.com/Azure/DotNetty

于是乎开始用netty框架来愉快的实现业务,过程中用到了c#的客户端和服务端,因为语言的差异性,关于同步异步多线程部分踩了不少坑。过程我就不多阐述了,这里贴上代码,有需要的同学可以拿去用。

场景与实现

这次c#的使用场景是WPF。

C#服务器端实现

启动netty服务,作为c#服务器端

Thread th = new Thread(() =>
            {
                try
                {
                    NettyLuncher.getInstance().Start().Wait();

                }
                catch (AggregateException ee)
                {
                    App.Current.Dispatcher.Invoke(new Action(() =>
                    {
                        MessageBoxX.Info(LanguageUtils.GetCurrentLanuageStrByKey("App.PortOccupy"));
                        System.Environment.Exit(0);
                    }));
                }
            });
            th.Start();

c#的netty启动类,具体的业务handler因为保密的原因就不贴出了。代码里面用了单例的方法。

using DotNetty.Handlers.Logging;
using DotNetty.Handlers.Tls;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using System.Security.Cryptography.X509Certificates;
using spms.protocol;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using DotNetty.Codecs;
using DotNetty.Buffers;

namespace spms.server
{
    public class NettyLuncher
    {
        private volatile static NettyLuncher instance = new NettyLuncher();

        public static NettyLuncher getInstance()
        {
            return instance;
        }
        private NettyLuncher()
        {

        }

        private ManualResetEvent _mainThread = new ManualResetEvent(false);
        public void ShutdownGracefully()
        {
            _mainThread.Set();
        }

        public async Task Start()
        {

            int port = 6860;
            string p = ConfigurationManager.AppSettings["NettyPort"];
            if (p == null || "".Equals(p))
            {
                port = int.Parse(p);
            }
        
            var bossGroup = new MultithreadEventLoopGroup(1);
            var workerGroup = new MultithreadEventLoopGroup();
            try
            {
                var bootstrap = new ServerBootstrap();
                bootstrap
                    .Group(bossGroup, workerGroup) //  
                    .Channel<TcpServerSocketChannel>() // 
                    .Option(ChannelOption.SoBacklog, 100) //  
                    .Handler(new LoggingHandler("SRV-LSTN")) // 
                    .ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel =>
                    { // 
                        IChannelPipeline pipeline = channel.Pipeline;
                      
                         IByteBuffer delimiter = Unpooled.Buffer(); ;
                        delimiter.WriteByte((byte)0x7E);
                        pipeline.AddLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                        pipeline.AddLast(new LoggingHandler("SRV-CONN"));
                        pipeline.AddLast("tcpHandler", new ProtocolHandler());
                    }));

                // bootstrap bind port 
                IChannel boundChannel = await bootstrap.BindAsync(port);
                //线程阻塞在这
               
                _mainThread.WaitOne();
                //关闭服务
                await boundChannel.CloseAsync();
            }
            finally
            {
                await Task.WhenAll(
                    bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
                    workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)));
            }
        }
    }
}

C#客户端的写法

app里面的随机启动代码

Thread hbth = new Thread(() =>
            {     
                    HeartbeatClient heartbeatClient = new HeartbeatClient();
                    while (true)
                    {
                        try
                        {
                            BodyStrongMessage bodyStrongMessage = new BodyStrongMessage
                            {
                                MessageType = BodyStrongMessage.Types.MessageType.Heaerbeatreq,
                                //可能为null
                                HeartbeatRequest = TcpHeartBeatUtils.GetHeartBeatByCurrent()
                            };

                       
                        HeartbeatClient.RunClientAsync(bodyStrongMessage).Wait();
                   
                        

                    }
                     catch (Exception exception)
                        {
                            //exception.Message();
                            TcpHeartBeatUtils.WriteLogFile("连接云平台线程发送失败"+ exception.StackTrace);
                        }
                        finally {
                            Thread.Sleep(5000);
                        }
                    }
 
            });
            hbth.Start();

netty作为客户端的具体实现代码,主要用于心跳,目前的代码写法有弊端,以后有空会优化。

using Com.Bdl.Proto;
using DotNetty.Codecs.Protobuf;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace spms.heartbeat
{
    public class HeartbeatClient
    {

        public static async Task RunClientAsync(BodyStrongMessage msg)
        {        
            var group = new MultithreadEventLoopGroup();
            try
            {
                var bootstrap = new Bootstrap();
                bootstrap
                    .Group(group)
                    .Channel<TcpSocketChannel>()
                    .Option(ChannelOption.TcpNodelay, true)
                    .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
                    { 
                        IChannelPipeline pipeline = channel.Pipeline;
                        pipeline.AddLast("frameDecoder", new ProtobufVarint32FrameDecoder());
                        pipeline.AddLast("decoder", new ProtobufDecoder(BodyStrongMessage.Parser));
                        pipeline.AddLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
                        pipeline.AddLast("encoder", new ProtobufEncoder());
                        pipeline.AddLast("tcpHandler", new HeartbeatHandler());

                    }));

                IChannel bootstrapChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse("192.168.1.109"), 60000));

                await bootstrapChannel.WriteAndFlushAsync(msg);
                //Console.ReadLine();

                await bootstrapChannel.CloseAsync();
            }
            finally
            {
                group.ShutdownGracefullyAsync().Wait(1000);
            }
        }
    }
  
}

优化之后的netty客户端的写法,这一版本就对上面那个传一次创建一次的写法做了优化。经过测试没有问题。

using System;
using System.Threading;
using System.Threading.Tasks;
using AISports.HeartBeat;
using Com.Bdl.Proto;
using DotNetty.Codecs.Protobuf;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;

namespace AISports.HeartBeat
{
    public delegate void ReceiveMessageEvent(object message);


    public class ProtoBufSocket
    {
        public event ReceiveMessageEvent ReceiveMessage;

        private AutoResetEvent ChannelInitilizedEvent = new AutoResetEvent(false);
        private Bootstrap SocketBootstrap = new Bootstrap();
        private MultithreadEventLoopGroup WorkGroup = new MultithreadEventLoopGroup();
        private volatile bool Connected = false;
        private IChannel Channel;

        public ProtoBufSocket()
        {
            InitBootstrap();
        }

        private void InitBootstrap()
        {
            SocketBootstrap.Group(WorkGroup)
                .Channel<TcpSocketChannel>()
                .Option(ChannelOption.TcpNodelay, true)
                .Option(ChannelOption.SoKeepalive, true)
                .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
                {
                    IChannelPipeline pipeline = channel.Pipeline;
                    pipeline.AddLast("frameDecoder", new ProtobufVarint32FrameDecoder());
                    pipeline.AddLast("decoder", new ProtobufDecoder(BodyStrongMessage.Parser));
                    pipeline.AddLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
                    pipeline.AddLast("encoder", new ProtobufEncoder());
                    pipeline.AddLast("tcpHandler", new HeartbeatHandler());
                }));
        }


        public void Connect()
        {
            //var thread = new Thread(new ThreadStart(DoConnect().Wait));
            //thread.Start();
            DoConnect().Wait();
        }
        public void Disconnect()
        {
            WorkGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
        }

        public void SendMessage(object message)
        {
            if (!Connected)
                //Connected = ChannelInitilizedEvent.WaitOne();
                Connect();

            Channel.WriteAndFlushAsync(message);
        }

        private async Task DoConnect()
        {
            Connected = false;
            var connected = false;
            do
            {
                try
                {
                    var clientChannel = await SocketBootstrap.ConnectAsync("192.168.43.95", 60000);
                    Channel = clientChannel;
                    ChannelInitilizedEvent.Set();
                    connected = true;
                }
                catch (Exception /*ce*/)
                {
                    //Console.WriteLine(ce.StackTrace);
                    Console.WriteLine("Reconnect server after 5 seconds...");
                    Thread.Sleep(5000);
                }
            } while (!connected);
        }

    }
}
原文  http://www.cnblogs.com/tuyile006/p/11597536.html
正文到此结束
Loading...