Kaa项目学习有一个月时间, 由于是代码量非常庞大的开源项目, 上手起来还是比较困难的, 本文章主要是要和大家分享我的学习过程,遇到的困难,以及我们应该从哪里入手阅读代码比较好。
部署Kaa官方虚拟机Demo,里面包含很多很有用的事例, 可以直接运行, 会有很直观的感受、方便了解Kaa的各个业务, 起到很好的入门作用。
根据官方文档从编译代码开始, 到安装部署调试环境。在搭建环境中我们可以了解到Kaa是运行在JVM环境中的, 具体使用的外部组件有:Zookeeper、PostgreSQL 、MongoDB。Kaa本身有4个服务, kaa-bootstrap(集群)、 kaa-operations(集群)、 kaa-control(主备)、 kaa-admin(用户管理界面)。我们需要了解这些服务的 具体分工 。了解PostgreSQL、MongoDB的基本查询操作。
阅读Kaa的 设计文档 ,了解各个业务类型的。
从Notification Demo入手(下载源码), 从官方给的源码,我们可以分析出客户端程序运行的入口。其实很简单, 只需要如下两行代码, 就可以将KaaClient启动起来。
/** new KaaClient 实例。 new DesktopKaaPlatformContext初始化Kaa上下文信息,主要是将sdk中client.properties信息同步到对象中; * 还有初始化4个Executor(KaaClient start时候, 执行init()才将线程实例化)。他们分别是: * lifeCycleExecutor(Kaa启动依赖本线程,如果该线程执行stop其他所有任务将停止,所以说他是kaaClient的生命周期), * apiExecutor(), * callbackExecutor(), * scheduledExecutor(延时任务,典型例子DefaultChannelManager.onServerFailed()) **/ kaaClient = Kaa.newClient(new DesktopKaaPlatformContext()); kaaClient.start(); // 将其启动起来
我们阅读KaaClient的代码时, 要抛弃读WebServer代码的思想。因为这里没有Spring, 没有依赖注入, 只有setter,对象会传来传去, 传到很深的地方,所以我们会看到很多set对象的地方。客户端是多线程的,我们能看到很多创建线程的地方, 我们要能够把其串联起来。
Kaa.newClient主要执行的代码逻辑在其父类 AbstractKaaClient
AbstractKaaClient(KaaClientPlatformContext context, KaaClientStateListener listener) throws IOException, GeneralSecurityException { /* * KaaClientPlatformContext 中包含: * 3个线程 + 1个定时任务线程 (Java Executor), 但未启动 SimpleExecutorContext * */ this.context = context; this.stateListener = listener; if (context.getProperties() != null) { this.properties = context.getProperties(); } else { // 客户端SDK示例代码 仅仅new了一个 DesktopKaaPlatformContext, 需要初始化client properties /* * 是对client.properties 文件进行解析 * 1.获取client.properties * 2.解析bootstrap server 列表 * 3.等其他sdk中包含的信息 */ this.properties = new KaaClientProperties(); } // 传递 Base64 编解码能力, BootstrapServers信息是Base64编码记录的 this.properties.setBase64(context.getBase64()); // 获取bootstrapServers信息 Map<TransportProtocolId, List<TransportConnectionInfo>> bootstrapServers = this.properties.getBootstrapServers(); if (bootstrapServers == null || bootstrapServers.isEmpty()) { throw new RuntimeException("Unable to obtain list of bootstrap servers."); // NOSONAR } // 对bootstrapServer列表进行打乱操作 for (Map.Entry<TransportProtocolId, List<TransportConnectionInfo>> cursor : bootstrapServers.entrySet()) { Collections.shuffle(cursor.getValue()); } /* * kaaClientState 数据初始化. kaaClient第一次启动是没有status.properties文件的, 当应用关闭时, Kaa status信息会持久化到status.properties文件中 * */ kaaClientState = new KaaClientPropertiesState(context.createPersistentStorage(), context.getBase64(), this.properties); /* * transportContext 实例化, 每个transport都有的功能是生成sync请求信息, 和接收sync请求服务器端的返回信息后,交给对应的manger处理具体业务. * notes: sync操作就是客户端向服务器发起请求的操作. 当请求内容要有定制的是, 这里是修改点. * */ TransportContext transportContext = buildTransportContext(properties, kaaClientState); /* * bootstrapManager主要功能是, 向bootstrapServer获取 operationServer的服务器列表, 和对operationServer列表的管理 * */ bootstrapManager = buildBootstrapManager(properties, kaaClientState, transportContext); /* * channelManager是对channel的管理, 目前有两个channel, bootStrapChannel(HTTP协议, 客户端和bootstrap交互的消息通道), operationChannel(MQTT协议, 客户端和operationServer交互的消息通道) * notes: mqtt协议是包装在TCP协议上次的协议, 实际编程就是sock编程, 只是多了一次用mqtt协议编解码的过程. * */ channelManager = buildChannelManager(bootstrapManager, bootstrapServers); /* * failoverManager是对channel发生故障后进行的业务操作 * */ failoverManager = buildFailoverManager(channelManager); /* * channelManager 和 failoverManager是相互依赖的 * */ channelManager.setFailoverManager(failoverManager); /* * 初始化 bootstrapChannel,operationsChannel. 将这两个channel添加到channelManger中管理. * 添加channel到channelManager的channel list中之后, 每添加一个channel会启动一个使用该channel的线程, 线程会去处理一个阻塞队列. 也就是说会有会有地方去往这个队列中放消息的. * 其实这两个阻塞队列就是sync操作的中转站. 当需要向服务端获取信息的时候, 向队列中放不同类型TransportType的SyncTask就会自动去执行sync操作了. * */ initializeChannels(channelManager, transportContext); bootstrapManager.setChannelManager(channelManager); bootstrapManager.setFailoverManager(failoverManager); /* * 如下manger是处理来着各自transport的操作. transport是来着 * */ profileManager = buildProfileManager(properties, kaaClientState, transportContext); notificationManager = buildNotificationManager(properties, kaaClientState, transportContext); eventManager = buildEventManager(properties, kaaClientState, transportContext); endpointRegistrationManager = buildRegistrationManager(properties, kaaClientState, transportContext); logCollector = buildLogCollector(properties, kaaClientState, transportContext); configurationManager = buildConfigurationManager(properties, kaaClientState, transportContext, context.getExecutorContext()); /* * 将manager注入到transport中, 为了将manger的业务处理能力透传给transport * */ transportContext.getRedirectionTransport().setBootstrapManager(bootstrapManager); transportContext.getBootstrapTransport().setBootstrapManager(bootstrapManager); transportContext.getProfileTransport().setProfileManager(profileManager); transportContext.getEventTransport().setEventManager(eventManager); transportContext.getNotificationTransport().setNotificationProcessor(notificationManager); transportContext.getConfigurationTransport().setConfigurationHashContainer(configurationManager.getConfigurationHashContainer()); transportContext.getConfigurationTransport().setConfigurationProcessor(configurationManager.getConfigurationProcessor()); transportContext.getUserTransport().setEndpointRegistrationProcessor(endpointRegistrationManager); transportContext.getLogTransport().setLogProcessor(logCollector); transportContext.initTransports(this.channelManager, this.kaaClientState); /* * 构建EventFamily, 给客户端编程暴露接口 * */ eventFamilyFactory = new EventFamilyFactory(eventManager, context.getExecutorContext()); }