转载

HiveServer连接优化

引言

数据平台目前通过Hive SQL的方式提供数据分析服务,系统使用多台HiveServer(JDBCServer)接收客户端连接请求,实际使用场景中频频出现HiveServer内存消耗过多导致机器Swap过高,需要重启HiveServer来缓解问题,但在某些任务比较集中的时间段往往会导致大量任务执行失败(我们配备了相应的任务重试机制,但也会带来一定的影响)。

在与运维同学的流通过程中,偶然提到是否可能某台HiveServer接收的连接请求过多,执行过程中负担过重引起的(暂不考虑其它可能因素),引出如下思考:

(1)目前平台内置两种HiveServer选择算法:随机选择器(通过随机数对HiveServer数目取余)、哈希选择器(通过UUID的哈希值对HiveServer数目取余),目前采用随机选择器;

(2)目前平台两个模块需要与HiveServer频繁交互:作业管理、数据查询,随着业务发展还会有其它逐步衍生其它模块;

问题:随机(哈希)选择算法仅仅考虑单个模块内连接的均衡,而且没有考虑HiveServer正在执行的语句条数(表示繁忙程度,粒度比较粗),在这样的场景下确实可能会出现某台HiveServer在特殊时段内负担相对较重。

思路

当需要与HiveServer建立连接时,应该有一个接口能够给出当前所有HiveServer各自的工作负载情况(根据当前正在执行的SQL语句条数判断),选择工作负载最低的HiveServer进行连接。

因为一次连接存在期间,可以执行多条SQL语句,而且多个模块的存在,在每个模块内置入代码是非常不方便的,因此考虑在HiveServer内部实现。

实现

要给出当前所有HiveServer各自的工作负载情况,就需要有数据源提供当前所有HiveServer各自正在执行哪些SQL语句(条数),因此HiveServer在执行语句之前、之后需要能够给出相应通知,并持久化到数据源,依据该数据源中的实时数据,即可计算工作负载。

假设数据源为MySQL,我们至少需要这样的一张表:

server HiveServer名称,同一台机器可以开启多个HiveServer实例
hook PRE_EXEC_HOOK、POST_EXEC_HOOK,分别表示语句执行开始之前、执行线束之后
queryId HiveServer内部语句唯一ID
queryStr 具体执行语句,可根据语句复杂度计算工作负载
jobName 平台自己设置,我们仅仅考虑会转化为MR任务的语句,jobName即为MR JobName
preTime 语句执行之前的时间戳
postTime 语句执行之后的时间戳

问题:HiveServer如何在执行语句之前、之后发出相应的通知?

我们使用Hive Hook机制,需要实出接口ExecuteWithHookContext,核心代码如下:

         QueryPlan queryPlan = hookContext.getQueryPlan();    HiveConf conf = hookContext.getConf();    String queryId = queryPlan.getQueryId();    if (StringUtils.isEmpty(queryId)) {    LOGGER.warn("queryId is null or empty, return");     return;   }    LOGGER.info("queryId: " + queryId);    String queryStr = URLEncoder.encode(queryPlan.getQueryStr(),     CharEncoding.UTF_8);    if (StringUtils.isEmpty(queryStr)) {    LOGGER.warn("queryStr is null or empty, return");     return;   }    LOGGER.info("queryStr: " + queryStr);    String jobName = conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME);    LOGGER.info("jobName: " + jobName);    if (StringUtils.isEmpty(jobName)) {    LOGGER.warn("jobName is null or empty, return");     return;   }    String server = conf.get("hiveserver.execute.hook.server");    if (StringUtils.isEmpty(server)) {    LOGGER.warn("server is null or empty, return");     return;   }    LOGGER.info("server: " + server);    String rest = conf.get("hiveserver.execute.hook.rest");    LOGGER.info("rest: " + rest);    if (StringUtils.isEmpty(rest)) {    LOGGER.warn("rest is null or empty, return");     return;   }    Map<String, String> params = new HashMap<String, String>();    params.put("server", server);    params.put("hook", hookContext.getHookType().toString());    params.put("queryId", queryId);    params.put("queryStr", queryStr);    params.put("jobName", jobName);    params.put("timestamp", String.valueOf(DatetimeUtil.now()));    try {    HttpClientUtil.doPost(rest, params);   } catch (Exception e) {    LOGGER.error("do post error: "      + ExceptionUtils.getFullStackTrace(e));   }

可以看出,该Hook同时可以用于PreHook与PostHook,而且为了保持代码的“轻量级”,并没有直接进行数据的持久化,而是通过Rest API交由外部模块处理。

此外需要在hive-site.xml文件中进行相应的设置,如下:

HiveServer连接优化

HiveServer连接优化

启动HiveServer时,需要指定该实例的名称与具体的Rest API地址,如下:

hive --service hiveserver --hiveconf hiveserver.execute.hook.server=localhost:10000 --hiveconf hiveserver.execute.hook.rest=http://localhost:8088/rest/hiveserver/send

总结

 

通过以上的机制可以获取到所有(指定)HiveServer当前(时段内)正在执行的语句条数(如果只有PreTime值,而PostTime值为Null,则表示该条语句正在执行),依此可以计算各自的工作负载,从而选取工作负载最小的HiveServer进行连接。

后期进一步优化时还可以充分利用JobName,平台每次执行查询任务时均会设置mapred.job.name属性,用以统计一次查询(可以有多条SQL语句)所处理的数据量、消耗的时间等信息,可以利用这些历史数据信息,用于估计当前正在执行的SQL语句的可能消耗时间,进行优化工作负载的计算。

正文到此结束
Loading...