大数据系列——MapReduce分布式并行计算框架

讨论 MapReduce 之前,我们先从一个简单问题入手,一步步叙述 MapReduce 原理。

问题

我们有 2000 个文件共 50 TB,每一行内容都是数字,现在需要对这些数字做排序,使用什么方法可以简单高效的完成任务呢?由于文件太大,不能全部载入内存来做排序,而且一台机器的磁盘也很难容纳这么大的文件,可以使用多台机器并行执行排序任务,可快速的完成任务。使用多台机器并行计算时,需要控制哪些机器处理哪些数据。50TB 共 2000 个文件,平均每个文件也有 25.6 GB,因此也不能把一个文件全部载入内存做排序。可以先将一个文件按照 64MB 对其切割,每次读入一个 64MB 的切片并在内存里做排序,然后将排序结果写到一个中间文件,等到一个文件的全部切片都排序完成,此时磁盘已经有很多个已排序好的中间文件。我们可以再使用归并排序将这些有序切片文件合并生成一个大的有序文件,这样一来总共就有 2000 个有序的中间文件。由于这 2000 个有序中间文件只是局部有序,还需要对这 2000 个有序中间文件再次使用归并排序,输出多个有序的最终文件。

针对这个排序的问题,我们先去掉业务相关的排序功能,抽象通用功能,发现:

  • 文件切割;
  • 将数据送给业务去处理生成中间文件;
  • 合并中间文件再次送给业务去处理;
  • 控制任务分配,哪些机器跑哪些任务;
  • 分布式文件系统;

这些都属于通用功能,具体的业务处理代码就是分布式并行计算中留给用户实现的代码。这种模式由 Google 于 2003 年开发出来并发表论文 MapReduce : Simplified Data Processing on Large Clusters,Hadoop 也是基于这篇论文实现的 MapReduce。

原理

受到函数式编程语言 Lisp 的启发,将复杂业务场景使用的 map 和 reduce 两个函数来实现,map 计算生成中间结果,合并相同 key 的数据,reduce 对 map 结果再次处理,得到最终结果,可表示为图-1。

MapReduce类型

图-1 MapReduce 类型

先来看看 MapReduce 的执行流程,如图-2所示。

MapReduce执行流程

MapReduce 执行流程

每一步执行的操作是:

(0) MapReduce 框架首先将输入文件切分为逻辑上的多个分片文件,分片文件是MapReduce对文件进行处理和运算的输入单位。因为是逻辑分片,物理上并不会切分,只是记录了要处理数据的位置和长度。

(1) Master 将空闲的 map 任务分配在一台机器上执行。Master 负责管理 map 和 reduce 任务,通过 ping 的方式来检查任务健康行为。如果在规定时间内任务没有返回检查状态,Master 认为该任务已经死了,需要将该任务重置为空闲状态,然后重新分配执行。分片文件一般与 Map 任务同在一台机器或者在一个机房,叫做“计算向数据靠拢”,可以节省数据网络传输的时间。

(2) Map 任务读取分片文件的数据,将输入的 (k1,v1) 送给用户实现的代码,用户代码输出 (k2,v2),输出的数据会直接写入内存缓冲区。

(3) 内存缓冲区存放 map 输出数据 (k2,v2)。

(4) 当内存缓冲区写到一定程度时,后端线程会将内存数据按照 hash(k2) mod R 来分区并在内存中排序然后写入到临时的分区文件。随着内存数据不断的写入,磁盘会存在多个分区文件,比如分区编号为0的分区文件可能存在几十个,这时还需要使用归并排序,合并成一个文件。这种行为称为溢写、分区、排序、合并。当 Map 任务完成时,Map需要将这些文件的位置信息上报到 Master 节点,Master 根据这些信息安排 Reduce 任务从哪里取数据。

(5) 当所有 Map 任务执行完毕时,Master 分配 Reduce 任务,准备从远端读取 Map 中间有序分区文件。

(6) Reduce 任务通过 RPC 从多个 Map 获取多个分区文件。

(7) 由于分区文件来自不同的 Map ,多个文件只是局部有序,需要使用类似第4步的溢写、排序、归并排序,合并成一个大的有序文件。

(8) Reduce 任务从有序文件读取 (k2,list(v2)) 数据集,交给用户实现的 reduce 逻辑。

(9) 用户实现的 reduce 逻辑会输出 list(v2) 到最终文件。在输出到最终文件之前,同样会使用类似第4步溢写、归并排序生成一个大文件。当 reduce 任务结束时,向 Master 上报任务执行状态。

容错性

(1) map 任务的容错性:如果 master 通过 ping 检查机制发现 map 任务挂掉了,会将该任务重置为空闲状态,空闲状态的 map 任务会被 master 重新分配来执行新的计算。如果一台机器挂掉了,master 发现该机器存储了 map 已完成的中间有序文件,而且还没有被reduce 任务领取,master 会重新分配map任务重跑这些数据。因为 map 的中间文件都是本地存储,只有一个副本,机器挂掉了不能恢复时,文件内容也不能恢复。如果 master 收到了重复上报的已完成的中间有序文件,master 会忽略这个信息,只保留一份。

(2) reduce 容错性:如果 master 通过 ping 检查机制发现 reduce 任务挂掉了,会将该任务重置为空闲状态,空闲状态的 reduce 任务会被 master 重新分配来执行新的计算。由于 reduce 输出的文件存放在分布式文件系统,比如 GFS 或者 HDFS(对于 Hadoop 就是放在 HDFS 中),有多个副本,就算已完成的 reduce 任务所在的机器挂掉了,也不需要重新跑。

(3) Master 的容错性:Master 内部有检查点机制,检查点周期性的保存到磁盘,如果 master 挂掉,恢复时会从上一份检查点开始再次执行任务,这样会有部分数据的丢失,丢失的信息需要重跑补回来。一开始 Master 节点是单节点,无法做到高可用。后来在 Hadoop 中引入了备用的 master 节点,实时从活动的 Master 节点同步数据,避免单点故障。

(4) Semantics in the Presence of Failures:MapReduce 使用map 和 reduce 的原子提交来达到这种特征。每一个任务输出的文件首先是临时文件,比如 reduce 任务是一个输出文件,map 任务是 R 个输出文件。如果任务完成,使用 rename 的原子操作将文件重命名并提交到 master 节点,master 节点会忽略重复的信息。如果多个任务执行同样的 rename 操作,只有一个 rename 能成功,因为文件只能有一个。

应用

除了文中开头说的排序可以使用 MapReduce 来完成外,MapReduce 还可以使用在关系代数运算、分组与聚合运算等。

关系代数运算包括

(1) 关系的选择运算:对于关系的选择运算,只需要 Map 过程就能实现,对于关系R中的每个元组t,检测是否是满足条件的所需元组,如果满足条件,则输出键值对 (t,t),也就是说,键和值都是 t。这时的 Reduce 函数就是一个恒等式,对输入不作任何变换就直接输出。

(2) 关系的投影运算:假设对关系 R 投影后的属性集为 S,在 Map 函数中,对于 R 中的每个元组 t,提取出 t 中不属于 S 的字段,得到元组 t,输出键值对 (t,t)。对于 Map 任务产生的每个键 t,都可能存在一个或者多个键值对 (t,t),因此需要通过 Reduce 函数来剔除冗余,把属性完全相同的元组合并起来的到 (t,list(t)),剔除冗余后只输出一个 (t,t)。

(3) 关系的并、交、差运算。

(4) 关系的自然连接运算:将需要连接的公共字段作为键。