项目中一些服务需要监听其他微服务的启动信息,需要监听到启动后主动向其发请求拉取一些配置等。
可是 eureka-client
并未提供监听其他服务启动的事件, eureka-server
倒是提供了事件,
可以在自己的 eureka-server
中监听服务启动,监听后发送服务启动信息到kafka这些消息队列,服务监听kafka消息,
这种方式要依赖消息队列: 源码
;
或者改造 eureka-client
, 由于需要改的代码不多,修改源码重新打成依赖自己维护不方便,这里通过javassist直接修改jar里的字节码实现。
public void init() { try { ClassPool classPool = new ClassPool(true); //添加com.netflix.discovery包的扫描路径 ClassClassPath classPath = new ClassClassPath(Applications.class); classPool.insertClassPath(classPath); //获取要修改Application类 CtClass ctClass = classPool.get(APPLICATION_PATH); //获取addInstance方法 CtMethod addInstanceMethod = ctClass.getDeclaredMethod("addInstance"); //修改addInstance方法 addInstanceMethod.setBody("{instancesMap.put($1.getId(), $1);" + "synchronized (instances) {me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaAddInstance($1);" + "instances.remove($1);instances.add($1);isDirty = true;}}"); //获取removeInstance方法 CtMethod removeInstanceMethod = ctClass.getDeclaredMethod("removeInstance"); //修改removeInstance方法 removeInstanceMethod.setBody("{me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaRemoveInstance($1);this.removeInstance($1, true);}"); //覆盖原有的Application类 ctClass.toClass(); //使用类加载器重新加载Application类 classPool.getClassLoader().loadClass(APPLICATION_PATH); Class.forName(APPLICATION_PATH); } catch (Exception e) { throw new EurekaEventException(e); } } 复制代码
@SpringBootApplication @EnableEurekaClient public class EurekaClientApplication { public static void main(String[] args) { //先执行修改字节码代码 EurekaEventHandler.getInstance().init(); new SpringApplicationBuilder(EurekaClientApplication.class).web(true).run(args); } } 复制代码
public class EurekaEventObservable extends Observable { public void sendEvent(EurekaEventPayload payload) { setChanged(); notifyObservers(payload); } } 复制代码
public abstract class AbstractEurekaEventObserver implements Observer, EurekaEventService { @Override public void update(Observable o, Object arg) { if (arg instanceof EurekaEventPayload) { EurekaEventPayload payload = (EurekaEventPayload) arg; if (InstanceInfo.InstanceStatus.UP.name().equals(payload.getStatus())) { LOGGER.info("Receive UP event, payload: {}", payload); } else { LOGGER.info("Receive DOWN event, payload: {}", payload); } putPayloadInCache(payload); consumerEventWithAutoRetry(payload); } } } 复制代码
接收到服务启动去执行一些操作,如果执行失败有异常则自动重试指定次数,每个一段事件重试一次,执行成功则不再执行
private void consumerEventWithAutoRetry(final EurekaEventPayload payload) { rx.Observable.just(payload) .map(t -> { // 此处为接收到服务启动去执行的一些操作 consumerEvent(payload); return payload; }).retryWhen(x -> x.zipWith(rx.Observable.range(1, retryTime), (t, retryCount) -> { //异常处理 if (retryCount >= retryTime) { if (t instanceof RemoteAccessException || t instanceof RestClientException) { LOGGER.warn("error.eurekaEventObserver.fetchError, payload {}", payload, t); } else { LOGGER.warn("error.eurekaEventObserver.consumerError, payload {}", payload, t); } } return retryCount; }).flatMap(y -> rx.Observable.timer(retryInterval, TimeUnit.SECONDS))) .subscribeOn(Schedulers.io()) .subscribe((EurekaEventPayload payload1) -> { }); } 复制代码
自动重试失败,可以手动重试,添加手动重试接口
@RestController @RequestMapping(value = "/v1/eureka/events") public class EurekaEventEndpoint { private EurekaEventService eurekaEventService; public EurekaEventEndpoint(EurekaEventService eurekaEventService) { this.eurekaEventService = eurekaEventService; } @Permission(permissionLogin = true) @ApiOperation(value = "获取未消费的事件列表") @GetMapping public List<EurekaEventPayload> list(@RequestParam(value = "service", required = false) String service) { return eurekaEventService.unfinishedEvents(service); } @Permission(permissionLogin = true) @ApiOperation(value = "手动重试未消费成功的事件") @PostMapping("retry") public List<EurekaEventPayload> retry(@RequestParam(value = "id", required = false) String id, @RequestParam(value = "service", required = false) String service) { return eurekaEventService.retryEvents(id, service); } } 复制代码