Flink 数据的发送是通过 Collector 的collector 方法
public interface Collector<T>{ /** * Emits a record. * * @param record The record to collect. */ void collect(T record); /** * Closes the collector. If any data was buffered, that data will be flushed. */ void close(); }
其中 Output 拓展了 Collector
public interface Output<T> extends Collector<T>{ /** * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream * operators. * * <p>A watermark specifies that no element with a timestamp lower or equal to the watermark * timestamp will be emitted in the future. */ void emitWatermark(Watermark mark); /** * Emits a record the side output identified by the given {@link OutputTag}. * * @param record The record to collect. */ <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record); void emitLatencyMarker(LatencyMarker latencyMarker); }
实现Output 的类有 RecordWriterOutput
public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>>{ ..... private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter; private SerializationDelegate<StreamElement> serializationDelegate; private final StreamStatusProvider streamStatusProvider; private final OutputTag outputTag; @Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record){ if (this.outputTag == null || !this.outputTag.equals(outputTag)) { // we are only responsible for emitting to the side-output specified by our // OutputTag. return; } pushToRecordWriter(record); } private <X> void pushToRecordWriter(StreamRecord<X> record){ serializationDelegate.setInstance(record); try { recordWriter.emit(serializationDelegate); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } }
StreamRecordWriter
public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T>{ @Override public void emit(T record) throws IOException, InterruptedException{ checkErroneous(); super.emit(record); if (flushAlways) { flush(); } } }
RecordWriter
public class RecordWriter<T extends IOReadableWritable>{ protected final ResultPartitionWriter targetPartition; // 用于真正写入到没有个partition private final ChannelSelector<T> channelSelector; //用于选着发送到哪一个channel /** {@link RecordSerializer} per outgoing channel */ private final RecordSerializer<T>[] serializers; //每一个channel 的序列 } public void emit(T record) throws IOException, InterruptedException{ for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { sendToTarget(record, targetChannel); } } private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException{ RecordSerializer<T> serializer = serializers[targetChannel]; synchronized (serializer) { //每一个channel serializer不能并发 SerializationResult result = serializer.addRecord(record); while (result.isFullBuffer()) { //buffer,底层使用memorySegment 已满 Buffer buffer = serializer.getCurrentBuffer(); //将其取出 if (buffer != null) { numBytesOut.inc(buffer.getSizeUnsafe()); writeAndClearBuffer(buffer, targetChannel, serializer); // If this was a full record, we are done. Not breaking // out of the loop at this point will lead to another // buffer request before breaking out (that would not be // a problem per se, but it can lead to stalls in the // pipeline). if (result.isFullRecord()) { break; } } else { buffer = targetPartition.getBufferProvider().requestBufferBlocking(); result = serializer.setNextBuffer(buffer); } } } private void writeAndClearBuffer( Buffer buffer, int targetChannel, RecordSerializer<T> serializer) throws IOException { try { targetPartition.writeBuffer(buffer, targetChannel); } finally { serializer.clearCurrentBuffer(); } } }
writeAndClearBuffer 真正写入是通过ResultPartitionWriter 的writeBuffer
public class ResultPartitionWriter implements EventListener<TaskEvent>{ private final ResultPartition partition; //分区的写入 public void writeBuffer(Buffer buffer, int targetChannel) throws IOException{ partition.add(buffer, targetChannel); } }
ResultPartition 直接将buffer add
public class ResultPartition implements BufferPoolOwner{ private final ResultSubpartition[] subpartitions; //对buffer 的add ,具体分为 pipeline or Spillable ,这是两种不同的机制 A pipelined in-memory only subpartition, which can be consumed once. A spillable sub partition starts out in-memory and spills to disk if asked to do so. 就是一种在内存里面,一种是刷新到磁盘上面,内存是仅仅只能获取一次 public void add(Buffer buffer, int subpartitionIndex) throws IOException{ boolean success = false; try { checkInProduceState(); final ResultSubpartition subpartition = subpartitions[subpartitionIndex]; synchronized (subpartition) { success = subpartition.add(buffer); // Update statistics totalNumberOfBuffers++; totalNumberOfBytes += buffer.getSize(); } } finally { if (success) { notifyPipelinedConsumers(); } else { buffer.recycle(); } } } }
ResultSubpartition 的add 时候就是将buffer 放入到一个queue 里面去同时notify reader 进行读取如果对于SpillableSubpartition则会刷入到磁盘,然后在返回