Flink Sort-Shuffle读简析

news/2024/7/3 9:02:06

文章目录

  • 1、SortMergeResultPartition的创建使用
  • 2、PartitionedFileReader
    • 2.1、moveToNextReadableRegion
    • 2.2、readCurrentRegion
    • 2.3、hasRemaining
  • 3、读操作的调用
  • 4、数据返回
    • 4.1、读入缓存
    • 4.2、buffersRead读取


1、SortMergeResultPartition的创建使用

  首先是一个读过程的一个调用链

PartitionRequestServerHandler.channelRead0()
    ->CreditBasedSequenceNumberingViewReader.requestSubpartitionView()
        ->ResultPartitionManager.createSubpartitionView()
            ->SortMergeResultPartition.createSubpartitionView()
                ->SortMergeResultPartitionReadScheduler.crateSubpartitionReader()
                    ->createFileReader()->new PartitionedFileReader()

  SortMergeResultPartition的创建,由上一篇写出篇可知,SortMergeResultPartition是在ResultPartitionFactory创建的。首先SortMergeResultPartition对象的创建调用链:

new Task()
    ->NettyShuffleEnvironment.createResultPartitionWriters()
        ->ResultPartitionFactory.create()

  之后调用ConsumableNotifyingResultPartitionWriterDecorator.decorate()封装进Task的成员consumableNotifyingPartitionWriters,再之后是注册管理这个SortMergeResultPartition:

Task.doRun()
    ->setupPartitionsAndGates()
        ->consumableNotifyingPartitionWriters: SortMergeResultPartition.setup()
            ->ResultPartition.setup()
                ->ResultPartitionManager.registerResultPartition(this)
                    ->registeredPartitions.put()

  以上注册进了registeredPartitions的列表当中(registeredPartitions是ResultPartitionManager的成员变量),再根据第一个调用链,在createSubpartitionView()的时候从列表获取使用

  有一点需要注意的是,shuffle文件在任务结束的时候才会完成全部写出(主要是index文件),也就是PartitionedFile在Task结束才会创建,之后文件跟随TaskManager的统一管理,也就是ResultPartitionManager。也就是说,这里的读过程并不是下游来上游任务读的过程,而是对上游输出的读的一个处理。

  整个管理相关的链路如下:

TaskManagerServices.createShuffleEnvironment()
    ->NettyShuffleServiceFactory.createShuffleEnvironment()
        ->createNettyShuffleEnvironment()-> new ResultPartitionManager()
            ->new NettyConnectionManager()
                ->new NettyProtocol() -> 成员 partitionProvider
                    ->new PartitionRequestServerHandler(partitionProvider,...)

  最后跟第一个链路关联上了,partitionProvider即第一个链路中的ResultPartitionManager

  PartitionedFile是在任务结束的时候完成对象的创建的,如下在Task.doRun()中,会调用完成ResultPartition的输出

// finish the produced partitions. if this fails, we consider the execution failed.
for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
    if (partitionWriter != null) {
        partitionWriter.finish();
    }
}

  最终调用到PartitionedFileWriter的finish()接口,完成PartitionedFile对象的创建

public PartitionedFile finish() throws IOException {
    ......
    ......
    return new PartitionedFile(
            numRegions,
            numSubpartitions,
            dataFilePath,
            indexFilePath,
            dataFileSize,
            indexFileSize,
            numBuffers,
            indexEntryCache);
}

2、PartitionedFileReader

  这个类读取的原理可以解释原理章节描述的信息,即Flink一个分区的文件写在多个region中,写完之后并没有再耗费资源重新进行排序将分区数据聚合,而是在读取的时候,通过手段将跨region的数据一起读出来。

  这个类是sort shuffle文件最下层的文件阅读器,负责从shuffle文件中读取数据返回上层,主要有三个方法。几个重要的读取使用的标志位成员变量如下。注意其中的targetSubpartition成员,该变量是final的,在初始化赋值以后只读不改变,也就是说,每个PartitionedFileReader对应读取一个分区的数据

/** Target subpartition to read. */
private final int targetSubpartition;

/** Next data region to be read. */
private int nextRegionToRead;

/** Next file offset to be read. */
private long nextOffsetToRead;

/** Number of remaining buffers in the current data region read. */
private int currentRegionRemainingBuffers;

2.1、moveToNextReadableRegion

  功能是将阅读器的各项指标设置到下一个可读的region。使用这个类的时候,第一次读取会有一轮空读,然后调用到这个接口,完成各项指标的指向,之后才开始读取数据。

while (currentRegionRemainingBuffers <= 0
        && nextRegionToRead < partitionedFile.getNumRegions()) {
    partitionedFile.getIndexEntry(
            indexFileChannel, indexEntryBuf, nextRegionToRead, targetSubpartition);
    nextOffsetToRead = indexEntryBuf.getLong();
    currentRegionRemainingBuffers = indexEntryBuf.getInt();
    ++nextRegionToRead;
}

  while的循环条件是两个:1、当前region读完;2、未达到最后的region。

  getIndexEntry方法用于获取索引,根据写流程的章节,只有buffer不足时才会将index写出到文件,也就是说,buffer没有用完的话,index是存储在buffer中的,不需要去文件中读。如下,根据cache条件,分别从内存或文件获取index

/**
 * Gets the index entry of the target region and subpartition either from the index data cache
 * or the index data file.
 */
void getIndexEntry(FileChannel indexFile, ByteBuffer target, int region, int subpartition)
        throws IOException {
    checkArgument(target.capacity() == INDEX_ENTRY_SIZE, "Illegal target buffer size.");

    target.clear();
    long indexEntryOffset = getIndexEntryOffset(region, subpartition);
    if (indexEntryCache != null) {
        for (int i = 0; i < INDEX_ENTRY_SIZE; ++i) {
            target.put(indexEntryCache.get((int) indexEntryOffset + i));
        }
    } else {
        indexFile.position(indexEntryOffset);
        BufferReaderWriterUtil.readByteBufferFully(indexFile, target);
    }
    target.flip();
}

  缓存读取根据index占位数,循环从缓存中读取对应的字节数;文件读取,先跳转到文件指定位置,然后由于提供的读数据的buffer大小为index的大小,所以buffer大小用完即表示读取了一个index

  getIndexEntryOffset方法用于获取当前需要的数据的index的位置,根据index存储规则,直接计算获得,如下根据region号、partition号以及index占位数直接获取结果

private long getIndexEntryOffset(int region, int subpartition) {
    checkArgument(region >= 0 && region < getNumRegions(), "Illegal target region.");
    checkArgument(
            subpartition >= 0 && subpartition < numSubpartitions,
            "Subpartition index out of bound.");

    return (((long) region) * numSubpartitions + subpartition) * INDEX_ENTRY_SIZE;
}

  index获取完成以后根据index的内容,更新相关读取指标:1、读取位置;2、读取数量

2.2、readCurrentRegion

  功能是从shuffle文件中读取对应分区的数据。根据相应的指标,定位到文件的具体位置,接着先解析元数据头,获取数据的相关信息,之后根据元数据中标明的数据大小,读取数据。

Buffer readCurrentRegion(MemorySegment target, BufferRecycler recycler) throws IOException {
    if (currentRegionRemainingBuffers == 0) {
        return null;
    }

    dataFileChannel.position(nextOffsetToRead);
    Buffer buffer = readFromByteChannel(dataFileChannel, headerBuf, target, recycler);
    nextOffsetToRead = dataFileChannel.position();
    --currentRegionRemainingBuffers;
    return buffer;
}

  其中的headerBuf是一个固定大小的ByteBuffer,大小是元数据head的大小,8 bytes。

  readFromByteChannel具体读数据的时候,首先获取元数据,然后解析出对应的元数据信息,之后正式读数据

isEvent = headerBuffer.getShort() == HEADER_VALUE_IS_EVENT;
isCompressed = headerBuffer.getShort() == BUFFER_IS_COMPRESSED;
size = headerBuffer.getInt();
targetBuf = memorySegment.wrap(0, size);

Buffer.DataType dataType =
        isEvent ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER;
return new NetworkBuffer(memorySegment, bufferRecycler, dataType, isCompressed, size);

2.3、hasRemaining

  这个方法的功能就是判断是否已经将分区数据读取完了,同时会调用2.1的方法更新相应的指标。

boolean hasRemaining() throws IOException {
    moveToNextReadableRegion();
    return currentRegionRemainingBuffers > 0;
}

3、读操作的调用

  读取的调用链如下:

SortMergeResultPartitionReadScheduler.run()
    ->readData()
        ->SortMergeSubpartitionReader.readBuffers()
            ->PartitionedFileReader.readCurrentRegion()

  其中,SortMergeResultPartitionReadScheduler实现了Runnable类,也就是说,它是一个线程类,run方法就是按线程的调度方式。SortMergeResultPartitionReadScheduler有一个Executor成员,是一个线程执行类,SortMergeResultPartitionReadScheduler的执行基于这个成员

/** Executor to run the shuffle data reading task. */
private final Executor ioExecutor;mayTriggerReading()接口中,ExecutorSortMergeResultPartitionReadScheduler加入了执行

private void mayTriggerReading() {
    assert Thread.holdsLock(lock);

    if (!isRunning
            && !allReaders.isEmpty()
            && numRequestedBuffers + bufferPool.getNumBuffersPerRequest()
                    <= maxRequestedBuffers) {
        isRunning = true;
        ioExecutor.execute(this);
    }
}

  mayTriggerReading()接口的调用在第一章所述调用链的SortMergeResultPartitionReadScheduler.crateSubpartitionReader()当中,其中还包括了PartitionedFileReader的创建

PartitionedFileReader fileReader = createFileReader(resultFile, targetSubpartition);
SortMergeSubpartitionReader subpartitionReader =
        new SortMergeSubpartitionReader(availabilityListener, fileReader);
allReaders.add(subpartitionReader);
subpartitionReader
        .getReleaseFuture()
        .thenRun(() -> releaseSubpartitionReader(subpartitionReader));

mayTriggerReading();

  ioExecutor的成员变量最终来源是在在NettyShuffleServiceFactory当中,创建了一个batchShuffleReadIOExecutor的IO执行线程池,这个最终被接口层层传递到了SortMergeResultPartitionReadScheduler当中

// we create a separated IO executor pool here for batch shuffle instead of reusing the
// TaskManager IO executor pool directly to avoid the potential side effects of execution
// contention, for example, too long IO or waiting time leading to starvation or timeout
ExecutorService batchShuffleReadIOExecutor =
        Executors.newFixedThreadPool(
                Math.max(
                        1,
                        Math.min(
                                batchShuffleReadBufferPool.getMaxConcurrentRequests(),
                                4 * Hardware.getNumberCPUCores())),
                new ExecutorThreadFactory("blocking-shuffle-io"));

4、数据返回

  第二章PartitionedFileReader简析了从文件读出数据的操作,第三章简析了读操作的触发,此章简析数据如何返回。

4.1、读入缓存

  在读操作的调用链中,注意SortMergeSubpartitionReader.readBuffers()接口,此接口调用PartitionedFileReader.readCurrentRegion()完成shuffle数据读入buffer,之后将该buffer放入一个buffer列表。对应如下两项调用

((buffer = fileReader.readCurrentRegion(segment, recycler)) == null) {
    buffers.add(segment);
    break;
}
    
addBuffer(buffer);

  在addBuffer()接口中,完成了数据buffer加入列表的操作

buffersRead.add(buffer);

4.2、buffersRead读取

  由4.1可知,buffersRead存放了读入内存的shuffle数据,这一步放入操作是由blocking-shuffle-io线程完成的,此处简析buffersRead读取如何被下游获取。

PartitionRequestQueue.writeAndFlushNextMessageIfPossible()
    ->CreditBasedSequenceNumberingViewReader.getNextBuffer()
        ->SortMergeSubpartitionReader.getNextBuffer()
            ->buffersRead.poll()

  PartitionRequestQueue.writeAndFlushNextMessageIfPossible()的调用有多个上层分支,其中一个分支是在收到下游的AddCredit或者ResumeConsumption消息时会调用到,这两个消息都是表示开放下游传输的。


http://www.niftyadmin.cn/n/3018938.html

相关文章

信息社会

消息 通知 公告(简报) 新闻(深度分析文章) 历史(沉淀形成知识) 各部门的信息系统 陕西省住房和城乡建设厅 陕西省建筑市场监管平台陕西省质量安全监管信息系统陕西省标准定额协同管理平台陕西省房地产市场监管信息系统陕西省城市园林绿化企业信息管理系统陕西省执业资格注册人员…

java 实现方法_java常见代码(1)------常见实现方法

1.equals 和 hashcode方法class Students {String name;int age;byte[] idSequence;Overridepublic boolean equals(Object obj) {if (!(obj instanceof Students))return false;Students other (Students) obj;return name.equals(other.name)&& age other.age&…

JdbcSink 简析

文章目录1、JdbcSink1.1、参数1.2、返回2、JdbcBatchingOutputFormat2.1、参数2.2、open方法2.2.1、连接数据库2.2.2、JdbcExec2.2.3、scheduler2.3、writeRecord方法2.3.1、缓存数据2.3.2、flush1、JdbcSink 用于DataStream增加Jdbc的Sink输出&#xff0c;主要两个接口&#x…

机器学习(1)_R与神经网络之Neuralnet包

本篇博客将会介绍R中的一个神经网络算法包&#xff1a;Neuralnet&#xff0c;通过模拟一组数据&#xff0c;展现其在R中是如何使用&#xff0c;以及如何训练和预测。在介绍Neuranet之前&#xff0c;我们先简单介绍一下神经网络算法。 人工神经网络(ANN)&#xff0c;简称神经网络…

C# List.ForEach 方法

C#中List.ForEach 方法是对 List 的每个元素执行指定操作。 示例&#xff1a; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace AppExample {class Program{static void Main(string[] args){…

Adaptive调度器

文章目录1.前言2.测试3.配置启用4.其他配置参数4.1.主要配置4.2.其他可能相关的配置5.调用流程6.配置Adaptive调度器7.DefaultDeclarativeSlotPool7.1.NewSlotsListener7.2.offerSlots7.3.freeReservedSlot7.4.缩容触发8.AdaptiveScheduler8.1.使用条件8.2.计算并行度信息8.2.1…

jQuery触屏插件:Tap 代码

jQuery触屏插件&#xff1a;Tap&#xff0c;使用方法非常简单&#xff0c;例&#xff1a;$("#domid").tap(function(){alert("You tapped me! -- by"this.innerText);});依赖jquery 1.701$.fn.tap function(fn){02var collection this,03isTouch "…

e开头的java编辑器叫什么意思_java处理百度编辑器ueditor上传的图片等多媒体文件...

java处理百度编辑器ueditor上传的图片等多媒体文件开发项目过程中&#xff0c;一般会涉及到采用富文本编辑器处理“内容”之类的业务&#xff0c;而这内容中&#xff0c;难免会上传各种图片、视频等。而一般采用的富文本编辑器常见的有ueditor百度编辑器、widgEditor等等。我一…