转载

【MINA】用mina做业务服之间的通信,实现业务负载均衡思路

学习mina目的还是搭建通信架构,学完mina我们了解了如何实现客户端和服务端,也就是一个正常channel我们是知道怎么建立的

但是问题是,我们应用环境通信分为两种

1.前后端通信

其实这个比较好实现,提供一个mina server端,供前端语言通过socket建连接就行,这个通信就算是ok了,编解码等通信解析的细节这里不讲了

以前的游戏服务端架构业务多用短连接,聊天用长连接,聊天的部分其实就是上面表述的情况

现在是长连接的天下,聊天依旧是长连接,业务也做成长连接,实现了真正意义上的长连接游戏架构,这其实就表述了一种当下典型架构,

就是后端提供两个开放的通信端口【即两个mina server】,供前端的socket连接,一个负责聊天,登录,注册,另一个负责其他业务,这样就实现了协议通信的负载均衡

2.后端的业务服通信【这是本文的重点】

那么后端的业务就不需要负载均衡吗?比如job,异步更新db,活动副本等

当然也是需要的,怎么做那,先拿1中的做个解释

mainserevr[聊天,登录,注册]---nodeserver[其他业务]

这两个mina sever端已经建立起来了,但是两个server之间还不能通信,我们有两个选择,要么在mainserevr上起个mina client去连nodeserver,要么在nodeserver

上起个mina client去连mainserevr,思路肯定是这样的,一旦这个通道建立了,其实互为server和client的,会有一个iosession被通道持有,只要有这个iosession,

就可以主动write,当然对于通道的另一端可以response,也可以通过取得iosession来主动写

实现方式,我们在nodeserevr上提供一个mainserverClient这样一个spring的bean去连接mainserver,这样在nodeserver上就可以向mainserevr发消息了

3.带着这个思路设计一下

我把游戏中的业务分为

 public static final String SERVER_TYPE_NODE_STR = "nodeserver";// game node  public static final String SERVER_TYPE_MAIN_STR = "mainserver";// 主server  public static final String SERVER_TYPE_JOB_STR = "jobserver";// job server  public static final String SERVER_TYPE_ASYNCDB_STR = "asyncdbserver";// 异步DB  public static final String SERVER_TYPE_ACTIVE_STR = "activityserver";// 活动  public static final String SERVER_TYPE_OTHER_STR = "other";// 其他  public static final String SERVER_TYPE_GM_STR = "GM";//管理端 

每次启动一种server时,首先启动一次mina serevr,然后启动多个mina client去连接其他的mina server,

比如启动nodeserevr 服务端,然后启动多个client分别连接mainserevr,jobserevr等的服务端,这样我就可以

在nodeserver上给其他业务serevr发请求了,具体启动哪些client看需要

搞一个启动server类型的方法

public static ClassPathXmlApplicationContext start(String serverTypeStr) {         try {   //关闭连接池的钩子线程  ProxoolFacade.disableShutdownHook();   //spring 的核心配置文件  String xmlFile = "applicationContext.xml";  ....  log.info("启动 {} server................", serverTypeName);  // 设置到系统环境变量  System.setProperty(NodeSessionMgr.SERVER_TYPE_KEY, serverType + "");  System.setProperty(NodeSessionMgr.SERVER_TYPE_NAME_KEY,          serverTypeName);  // final ClassPathXmlApplicationContext parent = new  // ClassPathXmlApplicationContext(  // xmlFile);  String fileName = null;    //这是把spring的住配置文件拆分了一部分内容出来,目前是只加载本server需要的bean  if (serverType == NodeSessionMgr.SERVER_TYPE_NODE) {      fileName = "wolf/app_nodeserver.xml";  } else {      fileName = "wolf/app_server.xml";  }  //手动启动spring  final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(          new String[] { xmlFile, fileName });  if (context != null) {      ServiceLocator.getInstance().setApplicationContext(context);  }  // 启动socket server  final WolfServer server = (WolfServer) ServiceLocator          .getSpringBean("wolf_server");  server.setServerType(serverType);   //这个调用就是我们熟悉的启动mina server端  server.start();  //这个动用做两件事,选区需要的serevr类型建立mina client连接  startClient(server);   //钩子线程用来监听应用停止,为了做停止时的后续处理  Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {      public void run() {          _shutdown();      }  }, "shutdownHookThread"));    //为了支持web,springMVC,内置一个web server  if (NodeSessionMgr.SERVER_TYPE_MAIN_STR          .equalsIgnoreCase(serverTypeStr)) {      JettyServer jettyServer = (JettyServer) ServiceLocator   .getSpringBean("jettyServer");      jettyServer.start();  }  log.info("start {} end................", serverTypeName);  return context;         } catch (Exception e) {  e.printStackTrace();  shutdown();         } finally {         }         return null;     } 

在看下startClient(server);

private static void startClient(WolfServer server) {  // asyncdbServer只会被连接,不会主动连接其他server   // 这部分目的是过滤那些不需要主动连比人的serevr,比武我这里的异步db,和活动服  if (server.getServerType() == NodeSessionMgr.SERVER_TYPE_ASYNCDB   || server.getServerType() == NodeSessionMgr.SERVER_TYPE_ACTIVE) {      return;  }  // 发送game Server ip port到mainserver  Map<String, Object> params = new HashMap<String, Object>();  params.put("nodeServerIp", server.getIp());  params.put("nodeServerPort", server.getPort());  params.put("serverType", server.getServerType());  //我需要mainserevr的client,就弄个bean在本服  final IWolfClientService mainServerClient = (IWolfClientService) ServiceLocator   .getSpringBean("mainServerClient");  //这个位置其实就是mina的client连server端  mainServerClient.init();  Object localAddress = mainServerClient.registerNode(params);   //同上,需要jobserevr的client  final IWolfClientService jobServerClient = (IWolfClientService) ServiceLocator   .getSpringBean("jobServerClient");  if (jobServerClient != null) {      jobServerClient.init();      Map<String, Object> params1 = new HashMap<String, Object>();      params1.putAll(params);      jobServerClient.registerNode(params1);  }  // }   .....     } 

再看下WolfClientService.init()

public void init() {   if (start)    return;   if (wolfClient == null) {    log.error("wolf client is null");    return;   }    //mina 的client 连接 mina server   wolfClient.start();   if (wolfClient.isConnected())    start = true;  } 

再看下wolfclient.start()

/**      * 连接一个服务器,并指定处理接收到的消息的处理方法      *       */     public void start() {  // this.context.put("resultMgr", this.resultMgr);   logger.info(com.youxigu.dynasty2.i18n.MarkupMessages   .getString("WolfClient_9"), processorNum);  logger.info(com.youxigu.dynasty2.i18n.MarkupMessages   .getString("WolfClient_0"), corePoolSize);  logger.info(com.youxigu.dynasty2.i18n.MarkupMessages   .getString("WolfClient_4"), maxPoolSize);  if (this.serverIp == null || this.serverIp.equals("")) {      logger.error(clientName + "没有配置serverIp,不启动.........");      return;  }  String threadPrefix = clientName + "[" + this.serverIp + ":"   + this.serverPort + "]";  // exector = Executors.newCachedThreadPool(new  // NamingThreadFactory(threadPrefix));  processor = new SimpleIoProcessorPool<NioSession>(NioProcessor.class,   processorNum);  // connector = new NioSocketConnector((Executor) exector, processor);  connector = new NioSocketConnector(processor);  // connector.getSessionConfig().setReuseAddress(true);  DefaultIoFilterChainBuilder chain = connector.getFilterChain();  if (useLogFilter == 2) {      chain.addLast("logging", new LoggingFilter());  }  // codec filter要放在ExecutorFilter前,因为读写同一个socket connection的socket  // buf不能并发(事实上主要是读,写操作mina已经封装成一个write Queue)  chain.addLast("codec", new ProtocolCodecFilter(codecFactory)); // 设置编码过滤器  // 添加心跳过滤器,客户端只接受服务端的心跳请求,不发送心跳请求  // connector.getSessionConfig().setReaderIdleTime(readIdleTimeOut);  // 这里的KeepAliveFilter必须在codec之后,因为KeepAliveMessageFactoryImpl返回的是Object,如果KeepAliveMessageFactoryImpl返回的是IOBuffer,则可以在codec之前  // KeepAliveFilter到底在ExecutorFilter之前好还是之后好,我也不确定  KeepAliveFilter filter = new KeepAliveFilter(   new KeepAliveMessageFactoryImpl(keepAliveRequestInterval <= 0),   IdleStatus.READER_IDLE, new RequestTimeoutCloseHandler(),   keepAliveRequestInterval <= 0 ? 600 : keepAliveRequestInterval,   30);  chain.addLast("ping", filter);  // 添加执行线程池  executor = new UnorderedThreadPoolExecutor(corePoolSize, maxPoolSize,   keepAliveTime, TimeUnit.SECONDS, new NamingThreadFactory(    threadPrefix));  // 这里是预先启动corePoolSize个处理线程  executor.prestartAllCoreThreads();  chain.addLast("exec", new ExecutorFilter(executor,   IoEventType.EXCEPTION_CAUGHT, IoEventType.MESSAGE_RECEIVED,   IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,   IoEventType.SESSION_OPENED));  if (useWriteThreadPool) {      executorWrite = new UnorderedThreadPoolExecutor(corePoolSize,       maxPoolSize, keepAliveTime, TimeUnit.SECONDS,       new NamingThreadFactory(threadPrefix + "write"));      executorWrite.prestartAllCoreThreads();      chain.addLast("execWrite", new ExecutorFilter(executorWrite,       IoEventType.WRITE, IoEventType.MESSAGE_SENT));  }  // ,logger.isDebugEnabled() ? new  // LoggingIoEventQueueHandler("execWrite") : nulls  // 配置handler的 logger,在codec之后,打印的是decode前或者encode后的消息的log  // 可以配置在ExecutorFilter之后:是为了在工作线程中打印log,不是在NioProcessor中打印  if (useLogFilter == 1) {      chain.addLast("logging", new LoggingFilter());  }  connector.setHandler(handler);  connector.getSessionConfig().setReuseAddress(true);  connector.getSessionConfig().setTcpNoDelay(tcpNoDelay);  logger.info(com.youxigu.dynasty2.i18n.MarkupMessages   .getString("WolfClient_1")   + serverIp + ":" + serverPort);  ConnectFuture cf = null;  long start = System.currentTimeMillis();  while (true) {    //这地很关键,是个无线循环,每10秒连接一次,直到可以和服务端建立连接,否则一支循环下去      cf = connector.connect(serverAddress);// 建立连接      cf.awaitUninterruptibly(10000L);      if (!cf.isConnected()) {   if ((System.currentTimeMillis() - start) > timeout) {       throw new RuntimeException(        com.youxigu.dynasty2.i18n.MarkupMessages         .getString("WolfClient_5")         + serverIp + ":" + serverPort);   }   if (cf.getException() != null) {       logger.error(com.youxigu.dynasty2.i18n.MarkupMessages        .getString("WolfClient_6"), serverIp + ":"        + serverPort, cf.getException().getMessage());   }   try {       Thread.sleep(10000);   } catch (Exception e) {   }   continue;      }    //这就是终极目标了,我们的目的就是在serevr的客户端的bean里,可以拿到这个iosession      this.setSession(cf.getSession());      logger.info(com.youxigu.dynasty2.i18n.MarkupMessages       .getString("WolfClient_10")       + serverIp + ":" + serverPort);      shutDown = false;      if (handler instanceof WolfMessageChain) {   WolfMessageChain wmc = WolfMessageChain.class.cast(handler);   wmc.init(context);      }      break;  }     } 

这样后端的业务通信网就可以轻松的建立起来,之后想怎么通信就看你的了

正文到此结束
Loading...