生产者-消费者
模型用于解耦生产者与消费者,平衡两者之间的能力不平衡,该模型广泛应用于各个系统中,Hudi也使用了该模型控制对记录的处理,即记录会被生产者生产至队列中,然后由消费者从队列中消费,更具体一点,对于更新操作,生产者会将文件中老的记录放入队列中等待消费者消费,消费后交由 HoodieMergeHandle
处理;对于插入操作,生产者会将新记录放入队列中等待消费者消费,消费后交由 HandleCreateHandle
处理。
前面的文章中提到过无论是 HoodieCopyOnWriteTable#handleUpdate
处理更新时直接生成了一个 SparkBoundedInMemoryExecutor
对象,还是 HoodieCopyOnWriteTable#handleInsert
处理插入时生成了一个 CopyOnWriteLazyInsertIterable
对象,再迭代时调用该对象的 CopyOnWriteLazyInsertIterable#computeNext
方法生成 SparkBoundedInMemoryExecutor
对象。最后两者均会调用 SparkBoundedInMemoryExecutor#execute
开始记录的处理,该方法核心代码如下
public E execute() { try { ExecutorCompletionService<Boolean> producerService = startProducers(); Future<E> future = startConsumer(); // Wait for consumer to be done return future.get(); } catch (Exception e) { throw new HoodieException(e); } }
该方法会启动所有生产者和单个消费者进行处理。
Hudi定义了 BoundedInMemoryQueueProducer
接口表示生产者,其子类实现如下
Function
来生产记录,在合并日志log文件和数据parquet文件时使用,以便提供 RealTimeView
。 定义了 BoundedInMemoryQueueConsumer
类表示消费者,其主要子类实现如下
CopyOnWrite
表类型时的插入。
MergeOnRead
表类型时的插入,其为 CopyOnWriteInsertHandler
的子类。
CopyOnWrite
表类型时的更新。 整个生产消费相关的类继承结构非常清晰。
对于生产者的启动, startProducers
方法核心代码如下
public ExecutorCompletionService<Boolean> startProducers() { // Latch to control when and which producer thread will close the queue final CountDownLatch latch = new CountDownLatch(producers.size()); final ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(executorService); producers.stream().map(producer -> { return completionService.submit(() -> { try { preExecute(); producer.produce(queue); } catch (Exception e) { logger.error("error producing records", e); queue.markAsFailed(e); throw e; } finally { synchronized (latch) { latch.countDown(); if (latch.getCount() == 0) { // Mark production as done so that consumer will be able to exit queue.close(); } } } return true; }); }).collect(Collectors.toList()); return completionService; }
该方法使用 CountDownLatch
来协调生产者线程与消费者线程的退出动作,然后调用 produce
方法开始生产,对于插入更新时的 IteratorBasedQueueProducer
而言,其核心代码如下
public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception { ... while (inputIterator.hasNext()) { queue.insertRecord(inputIterator.next()); } ... }
可以看到只要迭代器还有记录(可能为插入时的新记录或者更新时的旧记录),就会往队列中不断写入。
对于消费者的启动, startConsumer
方法的核心代码如下
private Future<E> startConsumer() { return consumer.map(consumer -> { return executorService.submit(() -> { ... preExecute(); try { E result = consumer.consume(queue); return result; } catch (Exception e) { queue.markAsFailed(e); throw e; } }); }).orElse(CompletableFuture.completedFuture(null)); }
消费时会先进行执行前的准备,然后开始消费,其中 consume
方法的核心代码如下
public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception { Iterator<I> iterator = queue.iterator(); while (iterator.hasNext()) { consumeOneRecord(iterator.next()); } // Notifies done finish(); return getResult(); }
可以看到只要队列中还有记录,就可以获取该记录,然后调用不同 BoundedInMemoryQueueConsumer
子类的 consumeOneRecord
进行更新插入处理。
值得一提的是Hudi对队列进行了 流控 ,生产者不能无限制地将记录写入队列中,队列缓存的大小由用户配置,队列能放入记录的条数由采样的记录大小和队列缓存大小控制。
在生产时,会调用 BoundedInMemoryQueue#insertRecord
将记录写入队列,其核心代码如下
public void insertRecord(I t) throws Exception { ... rateLimiter.acquire(); // We are retrieving insert value in the record queueing thread to offload computation // around schema validation // and record creation to it. final O payload = transformFunction.apply(t); adjustBufferSizeIfNeeded(payload); queue.put(Option.of(payload)); }
首先获取一个许可( Semaphore
),未成功获取会被阻塞直至成功获取,然后获取记录的负载以便调整队列,然后放入内部队列( LinkedBlockingQueue
)中,其中 adjustBufferSizeIfNeeded
方法的核心代码如下
private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException { if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) { return; } final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload); final long newAvgRecordSizeInBytes = Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1)); final int newRateLimit = (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes)); // If there is any change in number of records to cache then we will either release (if it increased) or acquire // (if it decreased) to adjust rate limiting to newly computed value. if (newRateLimit > currentRateLimit) { rateLimiter.release(newRateLimit - currentRateLimit); } else if (newRateLimit < currentRateLimit) { rateLimiter.acquire(currentRateLimit - newRateLimit); } currentRateLimit = newRateLimit; avgRecordSizeInBytes = newAvgRecordSizeInBytes; numSamples++; }
首先看是否已经达到采样频率,然后计算新的记录平均大小和限流速率,如果新的限流速率大于当前速率,则可释放一些许可(供阻塞的生产者获取后继续生产),否则需要获取(回收)一些许可(许可变少后生产速率自然就降低了)。该操作可根据采样的记录大小动态调节速率,不至于在记录负载太大和记录负载太小时,放入同等个数,从而起到动态调节作用。
在消费时,会调用 BoundedInMemoryQueue#readNextRecord
读取记录,其核心代码如下
private Option<O> readNextRecord() { ... rateLimiter.release(); Option<O> newRecord = Option.empty(); while (expectMoreRecords()) { try { throwExceptionIfFailed(); newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS); if (newRecord != null) { break; } } catch (InterruptedException e) { throw new HoodieException(e); } } ... if (newRecord != null && newRecord.isPresent()) { return newRecord; } else { // We are done reading all the records from internal iterator. this.isReadDone.set(true); return Option.empty(); } }
可以看到首先会释放一个许可,然后判断是否还可以读取记录(还在生产或者停止生产但队列不为空都可读取),然后从内部队列获取记录或返回。
上述便是 生产者-消费者
在Hudi中应用的分析。
Hudi采用了 生产者-消费者
模型来控制记录的处理,与传统 多生产者-多消费者
模型不同的是,Hudi现在只支持 多生产者-单消费者
模型,单消费者意味着Hudi暂时不支持文件的并发写入。而对于生产消费的队列的实现,Hudi并未仅仅只是基于 LinkedBlockingQueue
,而是采用了更精细化的速率控制,保证速率会随着记录负载大小的变化和配置的队列缓存大小而动态变化,这也降低了系统发生OOM的概率。