本文主要研究一下carrera的GroovyScriptAction
DDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/Action.java
public interface Action { enum Status { FAIL, CONTINUE, FINISH, ASYNCHRONIZED } class UnsupportedDataType extends RuntimeException { } default Status act(UpstreamJob job) { Object data = job.getData(); if (data instanceof byte[]) { return act(job, (byte[]) data); } else if (data instanceof JSONObject) { return act(job, (JSONObject) data); } else { throw new UnsupportedDataType(); } } default Status act(UpstreamJob job, byte[] bytes) { throw new UnsupportedDataType(); } default Status act(UpstreamJob job, JSONObject jsonObject) { throw new UnsupportedDataType(); } default void shutdown() { // DO NOTHING BY DEFAULT } default void logMetrics() { // DO NOTHING BY DEFAULT } }
DDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/GroovyScriptAction.java
public class GroovyScriptAction implements Action { private final static String CARRERA_GROOVY_CONTEXT = "carreraContext"; @SuppressWarnings("rawtypes") private final static LoadingCache<String, Class> cache = CacheBuilder .newBuilder() .expireAfterAccess(1, TimeUnit.HOURS) .build(new CacheLoader<String, Class>() { private final AtomicLong al = new AtomicLong(0); @Override public Class load(String key) throws Exception { try (GroovyClassLoader groovyLoader = new GroovyClassLoader()) { GroovyCodeSource gcs = AccessController.doPrivileged((PrivilegedAction<GroovyCodeSource>) () -> new GroovyCodeSource(key, "Script" + al.getAndIncrement() + ".groovy", "/groovy/shell")); Class clazz = groovyLoader.parseClass(gcs, false); return clazz; } catch (Throwable e) { LogUtils.logErrorInfo("GroovyScript_error", "[GroovyErr]", e); return null; } } }); @Override public Status act(UpstreamJob job, JSONObject jsonObject) { String groovyText = job.getUpstreamTopic().getGroovyScript(); if (StringUtils.isBlank(groovyText)) { return Status.FINISH; } try { @SuppressWarnings("rawtypes") Class groovyScript = cache.get(groovyText); if (groovyScript == null) { MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.INVALID); return Status.FINISH; } jsonObject.put(CARRERA_GROOVY_CONTEXT, new GroovyContext(job)); Script script = InvokerHelper.createScript(groovyScript, new Binding(jsonObject)); Object scriptRet = script.run(); if (scriptRet instanceof Boolean) { if ((Boolean) scriptRet) { jsonObject.remove(CARRERA_GROOVY_CONTEXT); return Status.CONTINUE; } } } catch (MissingPropertyException e) { LogUtils.logErrorInfo("GroovyScript_error", "missing property exception, jsonObject:{}, job:{}, e.msg:{}", JsonUtils.toJsonString(jsonObject), job.info(), e.getMessageWithoutLocationText()); } catch (Throwable e) { LogUtils.logErrorInfo("GroovyScript_error", "error when running groovy script, job={}, e={}", job, e.getMessage()); } MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.INVALID); return Status.FINISH; } }
GroovyScriptAction实现了Action接口,它使用guava的LoadingCache定义了groovy class的缓存,其CacheLoader的load方法会创建GroovyClassLoader,然后解析指定GroovyCodeSource的class;其act方法从job.getUpstreamTopic().getGroovyScript()获取groovyText,然后再根据groovyText从cache获取指定的Class,之后通过InvokerHelper.createScript(groovyScript, new Binding(jsonObject))创建Script,然后执行script.run()获取返回值