数据平台目前通过Hive SQL的方式提供数据分析服务,系统使用多台HiveServer(JDBCServer)接收客户端连接请求,实际使用场景中频频出现HiveServer内存消耗过多导致机器Swap过高,需要重启HiveServer来缓解问题,但在某些任务比较集中的时间段往往会导致大量任务执行失败(我们配备了相应的任务重试机制,但也会带来一定的影响)。
在与运维同学的流通过程中,偶然提到是否可能某台HiveServer接收的连接请求过多,执行过程中负担过重引起的(暂不考虑其它可能因素),引出如下思考:
(1)目前平台内置两种HiveServer选择算法:随机选择器(通过随机数对HiveServer数目取余)、哈希选择器(通过UUID的哈希值对HiveServer数目取余),目前采用随机选择器;
(2)目前平台两个模块需要与HiveServer频繁交互:作业管理、数据查询,随着业务发展还会有其它逐步衍生其它模块;
问题:随机(哈希)选择算法仅仅考虑单个模块内连接的均衡,而且没有考虑HiveServer正在执行的语句条数(表示繁忙程度,粒度比较粗),在这样的场景下确实可能会出现某台HiveServer在特殊时段内负担相对较重。
思路
当需要与HiveServer建立连接时,应该有一个接口能够给出当前所有HiveServer各自的工作负载情况(根据当前正在执行的SQL语句条数判断),选择工作负载最低的HiveServer进行连接。
因为一次连接存在期间,可以执行多条SQL语句,而且多个模块的存在,在每个模块内置入代码是非常不方便的,因此考虑在HiveServer内部实现。
实现
要给出当前所有HiveServer各自的工作负载情况,就需要有数据源提供当前所有HiveServer各自正在执行哪些SQL语句(条数),因此HiveServer在执行语句之前、之后需要能够给出相应通知,并持久化到数据源,依据该数据源中的实时数据,即可计算工作负载。
假设数据源为MySQL,我们至少需要这样的一张表:
server | HiveServer名称,同一台机器可以开启多个HiveServer实例 |
hook | PRE_EXEC_HOOK、POST_EXEC_HOOK,分别表示语句执行开始之前、执行线束之后 |
queryId | HiveServer内部语句唯一ID |
queryStr | 具体执行语句,可根据语句复杂度计算工作负载 |
jobName | 平台自己设置,我们仅仅考虑会转化为MR任务的语句,jobName即为MR JobName |
preTime | 语句执行之前的时间戳 |
postTime | 语句执行之后的时间戳 |
问题:HiveServer如何在执行语句之前、之后发出相应的通知?
我们使用Hive Hook机制,需要实出接口ExecuteWithHookContext,核心代码如下:
QueryPlan queryPlan = hookContext.getQueryPlan(); HiveConf conf = hookContext.getConf(); String queryId = queryPlan.getQueryId(); if (StringUtils.isEmpty(queryId)) { LOGGER.warn("queryId is null or empty, return"); return; } LOGGER.info("queryId: " + queryId); String queryStr = URLEncoder.encode(queryPlan.getQueryStr(), CharEncoding.UTF_8); if (StringUtils.isEmpty(queryStr)) { LOGGER.warn("queryStr is null or empty, return"); return; } LOGGER.info("queryStr: " + queryStr); String jobName = conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME); LOGGER.info("jobName: " + jobName); if (StringUtils.isEmpty(jobName)) { LOGGER.warn("jobName is null or empty, return"); return; } String server = conf.get("hiveserver.execute.hook.server"); if (StringUtils.isEmpty(server)) { LOGGER.warn("server is null or empty, return"); return; } LOGGER.info("server: " + server); String rest = conf.get("hiveserver.execute.hook.rest"); LOGGER.info("rest: " + rest); if (StringUtils.isEmpty(rest)) { LOGGER.warn("rest is null or empty, return"); return; } Map<String, String> params = new HashMap<String, String>(); params.put("server", server); params.put("hook", hookContext.getHookType().toString()); params.put("queryId", queryId); params.put("queryStr", queryStr); params.put("jobName", jobName); params.put("timestamp", String.valueOf(DatetimeUtil.now())); try { HttpClientUtil.doPost(rest, params); } catch (Exception e) { LOGGER.error("do post error: " + ExceptionUtils.getFullStackTrace(e)); }
可以看出,该Hook同时可以用于PreHook与PostHook,而且为了保持代码的“轻量级”,并没有直接进行数据的持久化,而是通过Rest API交由外部模块处理。
此外需要在hive-site.xml文件中进行相应的设置,如下:
启动HiveServer时,需要指定该实例的名称与具体的Rest API地址,如下:
hive --service hiveserver --hiveconf hiveserver.execute.hook.server=localhost:10000 --hiveconf hiveserver.execute.hook.rest=http://localhost:8088/rest/hiveserver/send
总结
通过以上的机制可以获取到所有(指定)HiveServer当前(时段内)正在执行的语句条数(如果只有PreTime值,而PostTime值为Null,则表示该条语句正在执行),依此可以计算各自的工作负载,从而选取工作负载最小的HiveServer进行连接。
后期进一步优化时还可以充分利用JobName,平台每次执行查询任务时均会设置mapred.job.name属性,用以统计一次查询(可以有多条SQL语句)所处理的数据量、消耗的时间等信息,可以利用这些历史数据信息,用于估计当前正在执行的SQL语句的可能消耗时间,进行优化工作负载的计算。