当前项目需要一个拨测系统来检测服务是否正常运行,拨测系统需要满足以下需求:
拨测系统原理上就是定时检查服务,那是否可以偷懒,拿开源的定时任务系统来改造呢。基于这种想法,在研究多个开源项目之后,选择了 xxl-job
(当前版本 2.2.1
)。
以上是 xxl-job
添加定时任务的界面,先修改 jobinfo.index.ftl
文件,隐藏掉跟 监控
无关的字段,隐藏的字段相当于采用了默认值。效果如下:
这里保留了:
BEAN+JobHandler
即可满足, 当拨测的接口协议很复杂,无法使用通用的拨测方法时,这里可以选择 GUL
模式来自定义请求脚本。 拨测类别
,当前仅提供了 接口HTTP拨测
,后续会添加其他的 拨测类型
,比如 redis检查
。 拨测的URL
。 在 executor
项目开发处理器(xxl-job分为 admin
和 executor
两个项目, admin
是管理、分发, executor
执行定时任务业务逻辑),拨测URL等参数由 任务参数
来输入,这里选择 json
作为输入格式,定义了几个 json字段
:
POST
HTPP拨测Handler代码:
@Component @Slf4j public class DialTestHandler { private WebClient webClient; public DialTestHandler() { webClient = WebClient.builder() .clientConnector(new ReactorClientHttpConnector( // 允许重定向 HttpClient.create().followRedirect(true) )).build(); } /** * @param param * @return com.xxl.job.core.biz.model.ReturnT<java.lang.String> * @author * @date */ @XxlJob("httpDialTestHandler") public ReturnT<String> httpDialTestHandler(String param) { Stopwatch stopwatch = Stopwatch.createStarted(); XxlJobLogger.log("begin httpDialTestHandler, param: {}", param); try { DialTestConfig dialTestConfig = JSON.parseObject(param, DialTestConfig.class); if (StringUtils.isBlank(dialTestConfig.getUrl())) { throw new Exception("url required!"); } if (StringUtils.isBlank(dialTestConfig.getMethod())) { dialTestConfig.setMethod("POST"); } Mono<String> resultMono = webClient.method(HttpMethod.valueOf(dialTestConfig.getMethod())).uri(dialTestConfig.getUrl()) .contentType(MediaType.APPLICATION_JSON_UTF8) .accept(MediaType.APPLICATION_JSON_UTF8) .bodyValue(Optional.ofNullable(dialTestConfig.getPostBody()).orElse("")) .retrieve() .onStatus(status -> !status.is2xxSuccessful(), resp -> { Mono<String> body = resp.body(BodyExtractors.toMono(String.class)); return body.flatMap(str -> { log.error("request[{}]error,result: {}", dialTestConfig.getUrl(), str); XxlJobLogger.log("request[{}]error,result: {}", dialTestConfig.getUrl(), str); // 业务返回的错误 if (StringUtils.isNotBlank(str)) { throw new RequestException(str, resp.statusCode().value()); } throw new RequestException(resp.statusCode().getReasonPhrase(), resp.statusCode().value()); }); }).bodyToMono(String.class) .doOnError(Exception.class, err -> { throw new RequestException(err.getMessage(), -1); }); // 这里不做超时限制,由xxl-job任务的超时时间来打断(interrupt) // String result = resultMono.block(Duration.ofSeconds(2000)); String result = resultMono.block(); if (!CollectionUtils.isEmpty(dialTestConfig.getResultKeys())) { for (String key : dialTestConfig.getResultKeys()) { if (!result.contains(key)) { throw new Exception(String.format("request[%s]error,result not contain: %s", dialTestConfig.getUrl(), key)); } } } log.info("httpDialTestHandler success, param: {}", param); XxlJobLogger.log("httpDialTestHandler success, param: {}", param); return ReturnT.SUCCESS; } catch (RequestException re) { log.error("httpDialTestHandler RequestException, param: {}, code: {}", param, re.getCode(), re); XxlJobLogger.log("httpDialTestHandler error, param: {}, code: {}, RequestException: {}", param, re.getCode(), re.getMessage()); } catch (Exception e) { log.error("httpDialTestHandler Exception, param: {}", param, e); XxlJobLogger.log("httpDialTestHandler error, param: {}, Exception: {}", param, e.getMessage()); } finally { stopwatch.stop(); XxlJobLogger.log("finish httpDialTestHandler, param: {}, cost: {}", param, stopwatch.elapsed(TimeUnit.MILLISECONDS)); log.info("finish httpDialTestHandler, param: {}, cost: {}", param, stopwatch.elapsed(TimeUnit.MILLISECONDS)); } return ReturnT.FAIL; } } @Data public class DialTestConfig { private String url; /** * POST/GET/... */ private String method; /** * 返回结果关键字 */ private List<String> resultKeys; /** * POST Body */ private String postBody; } public class RequestException extends RuntimeException { public static final int DEFAULT_ERROR_CODE = HttpStatus.INTERNAL_SERVER_ERROR.value(); private int code = DEFAULT_ERROR_CODE; public RequestException(String message) { super(message); } public RequestException(int code) { this.code = code; } public RequestException(String message, int code) { super(message); this.code = code; } public RequestException(String message, Throwable cause, int code) { super(message, cause); this.code = code; } public RequestException(Throwable cause, int code) { super(cause); this.code = code; } public RequestException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace, int code) { super(message, cause, enableSuppression, writableStackTrace); this.code = code; } public int getCode() { return code; } public RequestException setCode(int code) { this.code = code; return this; } }
代码逻辑很简单:
resultKeys
,则轮询判断接口是否包含了 key
,如果没有包含,任务失败。 配置示例如下(为了展示的更直观,配置参数了包含了所有配置项):
这里报警邮件里填的并不是邮件,而是企业微信名,是因为后面也改造了监控,告警时同时发送企业微信和邮件,详细后面会讲到。
当拨测接口协议比较复杂时,可以自定义请求脚本,而不是使用现有的 BEAN:httpDialTestHandler
模式。在允许模式选择 GLUE(XXX)
,比如 GLUE(java)
,然后在列表页面选择 GLUE IDE
编辑脚本,脚本里可以引入 执行器executor
项目的类,因为类最终会是在 executor
项目上运行。
注:当前版本的 GLUE(java)
不支持 lambda
语法。
告警逻辑在 admin
项目, xxl在任务失败重试时,失败几次(含重试次数),就会告警几次,这里改造成重试的最后一次(含不重试的任务)才告警。
告警逻辑在 JobFailMonitorHelper
类。
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { 改造成 if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0 && log.getExecutorFailRetryCount() == 0) {
log.getExecutorFailRetryCount()
返回的是 当前任务剩余重试次数
。
同样的告警内容里的 失败重试次数
的值从 剩余重试次数
改为 当前已重试次数
。
告警内容逻辑在 XxlJobTrigger.processTrigger
方法里。
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); 改为 triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(jobInfo.getExecutorFailRetryCount() - finalFailRetryCount);
企业微信
xxl-job只有邮件告警,这里加上企业微信告警。
从 JobAlarmer
类看到 XxlJob
是通过 applicationContext.getBeansOfType(JobAlarm.class)
获取告警类别列表,所以只需要写一个自定义 JobAlarm
实现类并注册成spring的Bean既可,告警内容可以在邮件内容上做减法。
@Component public class RtxJobAlarm implements JobAlarm { private static Logger logger = LoggerFactory.getLogger(RtxJobAlarm.class); /** * fail alarm * * @param jobLog */ @Override public boolean doAlarm(XxlJobInfo info, XxlJobLog jobLog) { boolean alarmResult = true; if (info != null && info.getAlarmEmail() != null && info.getAlarmEmail().trim().length() > 0) { // alarmContent String alarmContent = "Alarm Job LogId=" + jobLog.getId(); // 这里xxl的triggerMsg里已经使用了<br>,所以追加内容时也先用<br>,后面再统一换成企业微信的/n alarmContent += "<br>TriggerMsg=<br>" + jobLog.getTriggerMsg(); if (jobLog.getHandleCode() > 0 && jobLog.getHandleCode() != ReturnT.SUCCESS_CODE) { alarmContent += "<br>HandleCode=" + jobLog.getHandleMsg(); } String title = I18nUtil.getString("jobconf_monitor"); List<String> mails = Arrays.asList(info.getAlarmEmail().split(",")); try { MessageUtil.postMessage(Lists.newArrayList(MessageTypeEnum.RTX), mails, title, info.getJobDesc() + "/n" + alarmContent.replaceAll("<br>", "/n")); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job, job fail alarm rtx send error, JobLogId:{}", jobLog.getId(), e); alarmResult = false; } } return alarmResult; } }
邮件
前面添加任务时,有讲到 报警邮件
填的不是邮件,而是企业微信名,所以在发送邮件时,需要补上 企业邮箱后缀
。
参考:
分布式任务调度平台XXL-JOB