博客概述
我是java线的工程师,但是技术栈有点全栈的意思。在某次项目中需要用到c#的socket通讯,查询之后惊喜的发现c#也有netty框架。
dotnetty,github托管地址: https://github.com/Azure/DotNetty
于是乎开始用netty框架来愉快的实现业务,过程中用到了c#的客户端和服务端,因为语言的差异性,关于同步异步多线程部分踩了不少坑。过程我就不多阐述了,这里贴上代码,有需要的同学可以拿去用。
场景与实现
这次c#的使用场景是WPF。
C#服务器端实现
启动netty服务,作为c#服务器端
Thread th = new Thread(() => { try { NettyLuncher.getInstance().Start().Wait(); } catch (AggregateException ee) { App.Current.Dispatcher.Invoke(new Action(() => { MessageBoxX.Info(LanguageUtils.GetCurrentLanuageStrByKey("App.PortOccupy")); System.Environment.Exit(0); })); } }); th.Start();
c#的netty启动类,具体的业务handler因为保密的原因就不贴出了。代码里面用了单例的方法。
using DotNetty.Handlers.Logging; using DotNetty.Handlers.Tls; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using System.Security.Cryptography.X509Certificates; using spms.protocol; using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading; using DotNetty.Codecs; using DotNetty.Buffers; namespace spms.server { public class NettyLuncher { private volatile static NettyLuncher instance = new NettyLuncher(); public static NettyLuncher getInstance() { return instance; } private NettyLuncher() { } private ManualResetEvent _mainThread = new ManualResetEvent(false); public void ShutdownGracefully() { _mainThread.Set(); } public async Task Start() { int port = 6860; string p = ConfigurationManager.AppSettings["NettyPort"]; if (p == null || "".Equals(p)) { port = int.Parse(p); } var bossGroup = new MultithreadEventLoopGroup(1); var workerGroup = new MultithreadEventLoopGroup(); try { var bootstrap = new ServerBootstrap(); bootstrap .Group(bossGroup, workerGroup) // .Channel<TcpServerSocketChannel>() // .Option(ChannelOption.SoBacklog, 100) // .Handler(new LoggingHandler("SRV-LSTN")) // .ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel => { // IChannelPipeline pipeline = channel.Pipeline; IByteBuffer delimiter = Unpooled.Buffer(); ; delimiter.WriteByte((byte)0x7E); pipeline.AddLast(new DelimiterBasedFrameDecoder(1024, delimiter)); pipeline.AddLast(new LoggingHandler("SRV-CONN")); pipeline.AddLast("tcpHandler", new ProtocolHandler()); })); // bootstrap bind port IChannel boundChannel = await bootstrap.BindAsync(port); //线程阻塞在这 _mainThread.WaitOne(); //关闭服务 await boundChannel.CloseAsync(); } finally { await Task.WhenAll( bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))); } } } }
app里面的随机启动代码
Thread hbth = new Thread(() => { HeartbeatClient heartbeatClient = new HeartbeatClient(); while (true) { try { BodyStrongMessage bodyStrongMessage = new BodyStrongMessage { MessageType = BodyStrongMessage.Types.MessageType.Heaerbeatreq, //可能为null HeartbeatRequest = TcpHeartBeatUtils.GetHeartBeatByCurrent() }; HeartbeatClient.RunClientAsync(bodyStrongMessage).Wait(); } catch (Exception exception) { //exception.Message(); TcpHeartBeatUtils.WriteLogFile("连接云平台线程发送失败"+ exception.StackTrace); } finally { Thread.Sleep(5000); } } }); hbth.Start();
netty作为客户端的具体实现代码,主要用于心跳,目前的代码写法有弊端,以后有空会优化。
using Com.Bdl.Proto; using DotNetty.Codecs.Protobuf; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace spms.heartbeat { public class HeartbeatClient { public static async Task RunClientAsync(BodyStrongMessage msg) { var group = new MultithreadEventLoopGroup(); try { var bootstrap = new Bootstrap(); bootstrap .Group(group) .Channel<TcpSocketChannel>() .Option(ChannelOption.TcpNodelay, true) .Handler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast("frameDecoder", new ProtobufVarint32FrameDecoder()); pipeline.AddLast("decoder", new ProtobufDecoder(BodyStrongMessage.Parser)); pipeline.AddLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); pipeline.AddLast("encoder", new ProtobufEncoder()); pipeline.AddLast("tcpHandler", new HeartbeatHandler()); })); IChannel bootstrapChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse("192.168.1.109"), 60000)); await bootstrapChannel.WriteAndFlushAsync(msg); //Console.ReadLine(); await bootstrapChannel.CloseAsync(); } finally { group.ShutdownGracefullyAsync().Wait(1000); } } } }
优化之后的netty客户端的写法,这一版本就对上面那个传一次创建一次的写法做了优化。经过测试没有问题。
using System; using System.Threading; using System.Threading.Tasks; using AISports.HeartBeat; using Com.Bdl.Proto; using DotNetty.Codecs.Protobuf; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; namespace AISports.HeartBeat { public delegate void ReceiveMessageEvent(object message); public class ProtoBufSocket { public event ReceiveMessageEvent ReceiveMessage; private AutoResetEvent ChannelInitilizedEvent = new AutoResetEvent(false); private Bootstrap SocketBootstrap = new Bootstrap(); private MultithreadEventLoopGroup WorkGroup = new MultithreadEventLoopGroup(); private volatile bool Connected = false; private IChannel Channel; public ProtoBufSocket() { InitBootstrap(); } private void InitBootstrap() { SocketBootstrap.Group(WorkGroup) .Channel<TcpSocketChannel>() .Option(ChannelOption.TcpNodelay, true) .Option(ChannelOption.SoKeepalive, true) .Handler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast("frameDecoder", new ProtobufVarint32FrameDecoder()); pipeline.AddLast("decoder", new ProtobufDecoder(BodyStrongMessage.Parser)); pipeline.AddLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); pipeline.AddLast("encoder", new ProtobufEncoder()); pipeline.AddLast("tcpHandler", new HeartbeatHandler()); })); } public void Connect() { //var thread = new Thread(new ThreadStart(DoConnect().Wait)); //thread.Start(); DoConnect().Wait(); } public void Disconnect() { WorkGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)); } public void SendMessage(object message) { if (!Connected) //Connected = ChannelInitilizedEvent.WaitOne(); Connect(); Channel.WriteAndFlushAsync(message); } private async Task DoConnect() { Connected = false; var connected = false; do { try { var clientChannel = await SocketBootstrap.ConnectAsync("192.168.43.95", 60000); Channel = clientChannel; ChannelInitilizedEvent.Set(); connected = true; } catch (Exception /*ce*/) { //Console.WriteLine(ce.StackTrace); Console.WriteLine("Reconnect server after 5 seconds..."); Thread.Sleep(5000); } } while (!connected); } } }