转载

Kaa学习指南(Java版)

Kaa项目学习有一个月时间, 由于是代码量非常庞大的开源项目, 上手起来还是比较困难的, 本文章主要是要和大家分享我的学习过程,遇到的困难,以及我们应该从哪里入手阅读代码比较好。

环境搭建

部署Kaa官方虚拟机Demo,里面包含很多很有用的事例, 可以直接运行, 会有很直观的感受、方便了解Kaa的各个业务, 起到很好的入门作用。

根据官方文档从编译代码开始, 到安装部署调试环境。在搭建环境中我们可以了解到Kaa是运行在JVM环境中的, 具体使用的外部组件有:Zookeeper、PostgreSQL 、MongoDB。Kaa本身有4个服务, kaa-bootstrap(集群)、 kaa-operations(集群)、 kaa-control(主备)、 kaa-admin(用户管理界面)。我们需要了解这些服务的 具体分工 。了解PostgreSQL、MongoDB的基本查询操作。

阅读文档

阅读Kaa的 设计文档 ,了解各个业务类型的。

阅读代码

从Notification Demo入手(下载源码), 从官方给的源码,我们可以分析出客户端程序运行的入口。其实很简单, 只需要如下两行代码, 就可以将KaaClient启动起来。

/** new KaaClient 实例。 new DesktopKaaPlatformContext初始化Kaa上下文信息,主要是将sdk中client.properties信息同步到对象中;   * 还有初始化4个Executor(KaaClient start时候, 执行init()才将线程实例化)。他们分别是:   * lifeCycleExecutor(Kaa启动依赖本线程,如果该线程执行stop其他所有任务将停止,所以说他是kaaClient的生命周期),   * apiExecutor(),   * callbackExecutor(),             * scheduledExecutor(延时任务,典型例子DefaultChannelManager.onServerFailed())  **/ kaaClient = Kaa.newClient(new DesktopKaaPlatformContext()); kaaClient.start(); // 将其启动起来

我们阅读KaaClient的代码时, 要抛弃读WebServer代码的思想。因为这里没有Spring, 没有依赖注入, 只有setter,对象会传来传去, 传到很深的地方,所以我们会看到很多set对象的地方。客户端是多线程的,我们能看到很多创建线程的地方, 我们要能够把其串联起来。

Kaa.newClient主要执行的代码逻辑在其父类 AbstractKaaClient

AbstractKaaClient(KaaClientPlatformContext context, KaaClientStateListener listener) throws IOException, GeneralSecurityException {         /*         * KaaClientPlatformContext 中包含:         * 3个线程 + 1个定时任务线程 (Java Executor), 但未启动 SimpleExecutorContext         * */         this.context = context;         this.stateListener = listener;         if (context.getProperties() != null) {             this.properties = context.getProperties();         } else { // 客户端SDK示例代码 仅仅new了一个 DesktopKaaPlatformContext, 需要初始化client properties             /*             *  是对client.properties 文件进行解析             *  1.获取client.properties             *  2.解析bootstrap server 列表             *  3.等其他sdk中包含的信息             */             this.properties = new KaaClientProperties();         }          // 传递 Base64 编解码能力, BootstrapServers信息是Base64编码记录的         this.properties.setBase64(context.getBase64());          // 获取bootstrapServers信息         Map<TransportProtocolId, List<TransportConnectionInfo>> bootstrapServers = this.properties.getBootstrapServers();         if (bootstrapServers == null || bootstrapServers.isEmpty()) {             throw new RuntimeException("Unable to obtain list of bootstrap servers."); // NOSONAR         }          // 对bootstrapServer列表进行打乱操作         for (Map.Entry<TransportProtocolId, List<TransportConnectionInfo>> cursor : bootstrapServers.entrySet()) {             Collections.shuffle(cursor.getValue());         }          /*         * kaaClientState 数据初始化. kaaClient第一次启动是没有status.properties文件的, 当应用关闭时, Kaa status信息会持久化到status.properties文件中         * */         kaaClientState = new KaaClientPropertiesState(context.createPersistentStorage(), context.getBase64(), this.properties);          /*         * transportContext 实例化, 每个transport都有的功能是生成sync请求信息, 和接收sync请求服务器端的返回信息后,交给对应的manger处理具体业务.         * notes: sync操作就是客户端向服务器发起请求的操作. 当请求内容要有定制的是, 这里是修改点.         * */         TransportContext transportContext = buildTransportContext(properties, kaaClientState);          /*         * bootstrapManager主要功能是, 向bootstrapServer获取 operationServer的服务器列表, 和对operationServer列表的管理         * */         bootstrapManager = buildBootstrapManager(properties, kaaClientState, transportContext);          /*         * channelManager是对channel的管理, 目前有两个channel, bootStrapChannel(HTTP协议, 客户端和bootstrap交互的消息通道), operationChannel(MQTT协议, 客户端和operationServer交互的消息通道)         * notes: mqtt协议是包装在TCP协议上次的协议, 实际编程就是sock编程, 只是多了一次用mqtt协议编解码的过程.         * */         channelManager = buildChannelManager(bootstrapManager, bootstrapServers);          /*         * failoverManager是对channel发生故障后进行的业务操作         * */         failoverManager = buildFailoverManager(channelManager);          /*         * channelManager 和 failoverManager是相互依赖的         * */         channelManager.setFailoverManager(failoverManager);          /*         *  初始化 bootstrapChannel,operationsChannel. 将这两个channel添加到channelManger中管理.         *  添加channel到channelManager的channel list中之后, 每添加一个channel会启动一个使用该channel的线程, 线程会去处理一个阻塞队列. 也就是说会有会有地方去往这个队列中放消息的.         *  其实这两个阻塞队列就是sync操作的中转站. 当需要向服务端获取信息的时候, 向队列中放不同类型TransportType的SyncTask就会自动去执行sync操作了.         * */         initializeChannels(channelManager, transportContext);          bootstrapManager.setChannelManager(channelManager);         bootstrapManager.setFailoverManager(failoverManager);          /*         * 如下manger是处理来着各自transport的操作. transport是来着         * */         profileManager = buildProfileManager(properties, kaaClientState, transportContext);         notificationManager = buildNotificationManager(properties, kaaClientState, transportContext);         eventManager = buildEventManager(properties, kaaClientState, transportContext);         endpointRegistrationManager = buildRegistrationManager(properties, kaaClientState, transportContext);         logCollector = buildLogCollector(properties, kaaClientState, transportContext);         configurationManager = buildConfigurationManager(properties, kaaClientState, transportContext, context.getExecutorContext());          /*         * 将manager注入到transport中, 为了将manger的业务处理能力透传给transport         * */         transportContext.getRedirectionTransport().setBootstrapManager(bootstrapManager);         transportContext.getBootstrapTransport().setBootstrapManager(bootstrapManager);         transportContext.getProfileTransport().setProfileManager(profileManager);         transportContext.getEventTransport().setEventManager(eventManager);         transportContext.getNotificationTransport().setNotificationProcessor(notificationManager);         transportContext.getConfigurationTransport().setConfigurationHashContainer(configurationManager.getConfigurationHashContainer());         transportContext.getConfigurationTransport().setConfigurationProcessor(configurationManager.getConfigurationProcessor());         transportContext.getUserTransport().setEndpointRegistrationProcessor(endpointRegistrationManager);         transportContext.getLogTransport().setLogProcessor(logCollector);         transportContext.initTransports(this.channelManager, this.kaaClientState);          /*         * 构建EventFamily, 给客户端编程暴露接口         * */         eventFamilyFactory = new EventFamilyFactory(eventManager, context.getExecutorContext()); }
原文  http://segmentfault.com/a/1190000004389762
正文到此结束
Loading...