跳到主要内容

Shuffle与排序 ⭐️⭐️⭐️

在一些情况下,多个reduce任务的数据流从map任务到reduce任务之间的数据流称为shuffle(混洗),因为每个reduce任务的输入都来自于多个map任务。

shuffle在MapReduce整个阶段中所处的工作阶段是map输出后到reduce接收前,具体可以分为map端和reduce端前后两个部分。

1.map 端

缓冲方式写入内存

每个map任务都有一个环形内存缓冲区,用于存储输出结果。默认情况下,缓冲区大小为100MB,但可以通过调整下面参数来进行调整。

特别说明:缓冲区大小会影响到 mapreduce 程序的执行效率,理论上,缓冲区越大,磁盘 io 的次数越少,执行效率就越快。生产中常用的配置是512MB,1024Mb。但是注意一点,不管数据多大,都是要落盘的

  • task.io.sort.mb

缓冲区溢出到磁盘

当缓冲区内容达到阈值(可由下面的参数控制,默认为0.80或80%)时,后台线程开始将内容溢出(spill)到磁盘。在溢出写入磁盘的过程中,map函数会继续将输出写入缓冲区,但如果缓冲区在此期间被填满,map函数会被阻塞,直到写入磁盘的过程完成。

  • map.sort.spill.percent

溢出写入过程会按轮询方式将缓冲区中的内容写入到指定目录下的溢出文件中,该目录由下面参数控制。

  • cluster.local.dir

数据分区和排序

在写入磁盘之前,线程首先根据数据最终要传递给的reducer将数据划分成相应的分区(partition)。在每个分区中,后台线程按键(key)进行内存中的排序操作。

Combiner函数运行

如果有定义combiner函数,它会在排序后的输出上运行。运行combiner函数可以使得map的输出结果更加紧凑,从而减少写入磁盘和传递给reducer的数据量。

注意:不是每种作业都可以做combine操作的,只有满足以下条件才可以。 reduce的输入输出类型都一样,因为combine本质上就是reduce操作。 计算逻辑上,combine操作后不会影响计算结果,像求和就不会影响。

溢出文件合并(merge)

每当内存缓冲区达到溢出阈值时,就会创建一个新的溢出文件。因此,在map任务写完最后一个输出记录之后,可能会有多个溢出文件。在任务完成之前,这些溢出文件会被合并成一个已分区且已排序的输出文件。下面参数控制该属性值,每次最多能合并文件数,默认值是10。

  • task.io.sort.factor

Combiner函数再次运行

如果至少存在3个溢出文件(通过下面参数控制),combiner函数会在输出文件写入磁盘之前再次运行。这是因为combiner函数可以在输入上进行多次运行,但不会影响最终结果。如果只有1或2个溢出文件,那么由于map输出规模减少,再次运行combiner函数带来的开销不值得,因此不会为该map输出再次运行combiner。

  • map.combine.minspills

压缩输出

在将压缩的map输出写入磁盘时,可以对数据进行压缩处理。这样可以提高写入磁盘的速度,节省磁盘空间,并减少传递给reducer的数据量。默认情况下,输出是不压缩的,将属性值①设置为true,就可以启用压缩功能。所使用的压缩库由属性②控制。

  • map.output.compress
  • map.output.compress.codec

reducer获取输出分区

reducer通过HTTP获取输出文件的分区。用于文件分区的工作线程数量由下面参数控制。默认将最大线程数设置为机器CPU核心数的两倍。

  • shuffle.max.threads

2.reduce 端

Reduce端大致流程为copy、merge、sort、reduce输入这几个阶段。

Copy阶段

简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。

由于每个Map任务的完成时间可能不同,因此在每个Map任务完成时,Reduce任务就开始复制其输出。Reduce任务有少量的复制线程,可以并行获取Map输出。并行线程数可以通过下面的参数来控制(默认值5)。

  • reduce.shuffle.parallelcopies
思考:Reducer如何知道要从哪台机器获取Map输出呢?

当Map任务成功完成后,它们会使用心跳机制通知它们的Application Master。因此,对于指定的作业,Application Master知道Map输出和主机位置之间的映射关系。

在Reducer中,有一个线程定期向Application Master询问,以获取Map输出主机的位置,直到获得所有输出位置。这样,Reducer就能知道从哪台机器获取Map输出。

需要注意的是:由于过程中Reducer可能会失败,因此主机不会立即从磁盘上删除Map输出。相反,主机会等待,直到Application Master告知它删除Map输出。这个操作会在作业完成后执行

merge和sort阶段

拉取过来的map输出数据会被复制到Reduce任务的JVM内存中(如果数据超过内存缓冲区大小会直接复制到本地磁盘)。这里也有一个内存缓冲区,内存缓冲区的大小由下面参数控制,该属性指定用于此目的的堆空间的百分比(减少磁盘交互,增加性能)。

  • reduce.shuffle.input.buffer.percent

当内存缓冲区中的数据量达到一定阈值时,会触发spill操作,将内存中的数据溢写到磁盘,并对这些数据做一次排序(如果来自不同Map Task的数据则需要合并排序merge-sort)。触发条件由下面两个参数控制。这个过程中,Reduce Task不断地读取、合并并排序从Map Task传来的数据块,直到所有的map输出都被处理过。

  • reduce.shuffle.merge.percent(内存缓冲区阈值)
  • reduce.merge.inmem.threshold(Map输出阈值)

如果在Map作业中指定了Combiner,那么在合并操作期间会运行Combiner函数以降低写入磁盘的数据量。

思考:Combiner在map阶段本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作? map端的combiner操作,不能跨map任务,只能处理单个map的数据;只有reduce可以接收多个map任务处理的数据(reduce会拉取不同map 端输出的数据放入一个缓冲区)。

注意: 如果Map输出启用了压缩(通过Map任务进行压缩),那么在进行合并之前,需要先将压缩的数据在内存中进行解压缩。

Reduce函数的输入

Reduce阶段,对于已经按照键进行排序的map输出,Reduce函数开始遍历这些有序数据,一次性为每个不同的key调用一次"reduce"函数进行处理。"Reduce"函数的目的是将具有相同键的数据进行合并、聚合或其他相关操作,并将生成的最终结果保存在类似HDFS的文件系统中。

提示:这里一般都是HDFS文件系统,也可以是其他的分布式文件系统。

MapReduce中的排序 ⭐️⭐️⭐️

重要程度:⭐️⭐️⭐️
面试公司:快手、阿里、美团、字节等

一、几次排序?

在Map任务和Reduce任务的过程中,一共发生了3次排序

1)当map函数产生输出时,会首先写入内存的环形缓冲区,当达到设定的阀值,在刷写磁盘之前,后台线程会将缓冲区的数据划分成相应的分区。在每个分区中进行内排序。

2)在Map任务完成之前,磁盘上存在多个已经分好区,并排好序的,大小和缓冲区一样的溢写文件,这时溢写文件将被合并成一个已分区且已排序的输出文件。由于溢写文件已经经过第一次排序,所有合并文件只需要再做一次归并排序即可使输出文件整体有序。

3)在reduce阶段,需要将多个Map任务的输出文件copy到ReduceTask中后合并,由于经过第二次排序,所以合并文件时只需再做一次归并排序即可使输出文件整体有序。

二、为什么要排序?

回答一:MR在reduce阶段需要分组,将key相同的放在一起进行规约,为了达到该目的,有两种算法:hashmap和sort,前者太耗内存,而排序通过外排可对任意数据量分组,只要磁盘够大就行。map端排序是为了减轻reduce端排序的压力。所以,本质上来讲,前两次的排序(快排+归并),都是为了进行第三次排序准备的。

回答二:因为在Reduce阶段是将相同Key的文件进行合并,如果不对key进行排序,需要一个一个判断是否是相同的key,加大reduce端排序的压力。效率不高;而按key排好序之后,只需要与后面的相比看是否是相同的key,如果不一样,直接将前面相同key的一组拿走进入到reduce,效率高。总之,排序后的数据对于后续的使用方便很多。

三、什么排序算法?(这里用到的排序算法代码要会写)

在这3次排序中第一次是内存缓冲区做的内排序,使用的算法是快速排序,第二次排序和第三次排序都是在文件合并阶段发生的,使用的是归并排序。

为什么第一次是快排,后面两次是归并排序?

1、在Map任务完成后,输出的数据首先会被存储在一个环形缓冲区中。当这个缓冲区的数据量达到一定阈值时,会触发一次排序操作。这里使用快速排序是因为它在平均情况下的时间复杂度为O(n log n),并且是一种原地排序算法,不需要额外的存储空间,非常适合用于内存中的排序。

2、归并排序适合处理已经在外部存储(如磁盘)上的大量已部分排序的数据。因为此时数据已经过大无法全部装入内存,而归并排序可以通过逐步合并两个或多个有序列表来创建一个更大的有序列表,这非常适合于外部排序场景。