Quasar 是一个类似Scala的Akka的Java Actor模型实现框架,它使用 fiber(纤程)绿色线程实现类似Erlang的Actor模型,本站介绍结合Apache Kafka和ZeroMQ实现Quasar的分布式Actor模型。
当然, Galaxy 也是一种非常好的选择,它是一个快速in-memory数据网格,专门为数据本地化复制,可选的持久和分布式actor注册,甚至能够在节点服务器之间迁移actor。
Apache Kafka是目前流行的提供事件日志流的项目,它的API包括两种生产者:同步和异步,消费者只有一种:同步。一个Kafka生产者处理是线程安全的,易于使用。
Comsat项目 包括一个纤程友好fiber-friendly的 Kafka producer 集成, .我们使用它在actor内实现kafka的生产者,我们案例是演示从生产者发送几千个序列化的消息到消费者。
生产者代码如下:
final Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("client.id", "DemoProducer");
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
try (final FiberKafkaProducer<Integer, byte[]> producer = new FiberKafkaProducer<>(new KafkaProducer<>(producerConfig))) {
final byte[] myBytes = getMyBytes(); // ...
final Future<RecordMetaData> res = producer.send(new ProducerRecord<>("MyTopic", i, myBytes));
res.get(); // 可选,堵塞绿色线程直到记录被持久化,也可以 `producer.flush()`
}
我们使用Comsat的FiberKafkaProducer 包装了KafkaProducer 对象,这是为了获得fiber-blocking的future。
Kafka的消费处理不是线程安全的,且只有线程堵塞的同步方式:
final Properties producerConfig = new Properties();
consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {
consumer.subscribe(Collections.singletonList(TOPIC));
final ConsumerRecords<Integer, byte[]> records = consumer.poll(1000L);
for (final ConsumerRecord<Integer, byte[]> record : records) {
final byte[] v = record.value();
useMyBytes(v); // ...
}
}
因为我们并不想堵塞fiber的底层线程池,这里就不能使用FiberAsync.runBlocking来将数据喂给固定大小的线程池了,而是在我们的actor的doRun中,我们使用一个异步任务,它会堵塞fiber会等到poll(会在指定池中执行)返回:
final ExecutorService e = Executors.newFixedThreadPool(2);
try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {
consumer.subscribe(Collections.singletonList(TOPIC));
final ConsumerRecords<Integer, byte[]> records = call(e, () -> consumer.poll(1000L));
for (final ConsumerRecord<Integer, byte[]> record : records) {
final byte[] v = record.value();
useMyBytes(v); // ...
}
}
这里的call是一个工具方法,如下定义,如果没有 这个Java编译bug 是不必要的。
@Suspendable
public static <V> V call(ExecutorService es, Callable<V> c) throws InterruptedException, SuspendExecution {
try {
return runBlocking(es, (CheckedCallable<V, Exception>) c::call);
} catch (final InterruptedException | SuspendExecution e) {
throw e;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
完整案例代码见: complete example ,这是一个从生产者发送几千个序列化的消息到消费者。
ØMQ (或 ZeroMQ)是一个非集中的broker解决方案,有各种socket适合不同通讯模式(请求/应答或pub/sub发布/订阅),在我们的案例中我们使用最简单的请求应答模式,也是演示从生产者发送消息到消费者,生产者代码如下:
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {
trgt.connect("tcp://localhost:8000");
final byte[] myBytes = getMyBytes(); // ...
trgt.send(baos.toByteArray(), 0 /* flags */)
trgt.recv(); // Reply, e.g. ACK
}
这里context实际作为一个socket工厂,其中有I/O线程数量作为context参数,这是因为ZeroMQ的socket不是基于connection-bound OS处理,而是一个简单前端,能够进行连接重试处理,或多个连接处理,或有效率的并发I/O和提供队列。者就是为什么send方法调用几乎从来不会堵塞,而recv方法不是基于连接的I/O调用,而是一个基于你的线程和指定I/O任务之间的同步调用,所谓指定I/O任务是指从一个或多个连接中接受进来的字节数据。
在Actor我们就不使用堵塞fiber的方式了,因为这里send不会堵塞,因此使用FiberAsync.runBlocking堵塞住read调用。
因此改写上面生产者代码如下,这是Actor的代码:
final ExecutorService ep = Executors.newFixedThreadPool(2);
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {
exec(e, () -> trgt.connect("tcp://localhost:8000"));
final byte[] myBytes = getMyBytes(); // ...
call(e, trgt.send(myBytes, 0 /* flags */));
call(e, trgt::recv); // Reply, e.g. ACK
}
下面是消费者代码:
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {
exec(e, () -> src.bind("tcp://*:8000"));
final byte[] v = call(e, src::recv);
exec(e, () -> src.send("ACK"));
useMyBytes(v); // ...
}
这里exec是一个工具函数,类似之前的call,代码如下:
@Suspendable
public static void exec(ExecutorService es, Runnable r) throws InterruptedException, SuspendExecution {
try {
runBlocking(es, (CheckedCallable<Void, Exception>) () -> { r.run(); return null; });
} catch (final InterruptedException | SuspendExecution e) {
throw e;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
完整代码见: 这里example
更深入应用见: Distributed Quasar Actors with Kafka and ZeroMQ
源码项目: on GitHub
Quasar与Akka比较
Quasar专题
Actor专题
Reactive专题