MapReduce 根据使用者的不同,会产生不同的视角:
- 分布式系统的使用者:重点关注 Map&Reduce 两类函数的编写,无需关心具体的分布式框架。
- 分布式系统的开发者:需要管理 Map&Reduce 如何分发到机器上以便降低网络传输需求,同时关注冗余和备份以便顺利且迅速的完成计算任务。
接下来,本文作为 MapReduce 论文和 Mit6.824 的阅读笔记将以分布式系统的开发者的视角分析如何构建一个 MapReduce 系统。
MapReduce 过程概述
根据 MapReduce 论文原文,其工作流程可总结为以下核心阶段:
-
初始化与任务分配
- Master 节点接收用户提交的 Job(任务),解析输入文件并将其切分为 M 个 split(数据分片)
- 通过 GFS(Google 文件系统)确定 split 的物理位置后,将 Map 任务分发到存储对应 split 的 worker 节点
-
Map 阶段执行
- 每个 worker 进程执行用户定义的 Map 函数,输入参数为 split 的文件名和内容
- Map 函数通过 emit(key, value)生成中间键值对,这些数据被写入本地磁盘的临时文件
- 例如单词计数器场景中,输入"a b"会被拆解为两个键值对:<a,1>和<b,1>
-
Shuffle 阶段(核心数据交换)
-
Master 通知所有 worker 进行 shuffle 操作,该阶段包含三个关键动作:
1 2 3 4 5 6# 伪代码示例 partitioned_data = partition(intermediate_data, R) # 按key分区 sorted_data = sort(partitioned_data) # 区内排序 for each key in sorted_data: remote_workers = find_workers_with_key(key) # 定位key位置 transfer_data(remote_workers) # 网络传输 -
所有 Map 节点将中间数据按 key 哈希分到 R 个分区,每个分区对应一个 Reduce 任务
-
Reduce worker 通过 RPC 从所有 Map 节点拉取特定 key 的中间值,这个过程涉及跨节点网络传输
-
-
Reduce 阶段执行
- 每个 Reduce 任务合并所有相同 key 的 value 列表,执行用户定义的 Reduce 函数
- 继续单词计数案例,输入
["a", [1,1]]会输出<a,2> - 最终输出结果写入 GFS 等共享存储系统
-
作业完成与容错
- Master 监控所有任务状态,若发现 worker 失效(如 60 秒无响应),则重新调度该任务到其他节点
- 完整的 Job 包含 M 个 Map Task 和 R 个 Reduce Task,两者均可并行执行
典型执行流程图示:
|
|
关键设计要点:
- 数据本地性优化:优先将 Map 任务调度到存储对应 split 的节点
- 推测执行:当某些节点进度落后时,Master 会启动备份任务加速整体完成
Map 阶段
根据 MapReduce 论文,Map 阶段包含以下核心流程:
- 输入分片(Input Splitting):
- 输入文件被划分为多个 Input Split,每个 Split 由一个 Map 任务处理。
- Map 节点优先调度到存储 Split 的物理节点(数据本地性优化)。
- 执行 Map 函数:
- 用户定义的 Map(key, value)函数逐行处理 Split 数据。
- 例如单词计数任务中,输入"a b a"会被拆解为三个中间键值对:("a", 1)、("b", 1)、("a", 1)。
- 内存缓存与分区(Partitioning):
- 中间键值对写入内存缓冲区,当缓冲区满时触发溢写(Spill)到磁盘。
- 分区函数(默认 hash(key) mod R)决定每个键值对属于哪个 Reduce 任务(共 R 个分区)。
- 本地磁盘写入(Spill to Disk):
- 溢写时,每个分区的数据按 Key 排序,并压缩(可选)。
- 多个溢写文件最终合并为单个分区文件(Partition File)。
Map 与 Shuffle 的协同机制:
Shuffle 阶段的触发:Map 任务完成后,Worker 通知 Master“本任务完成”,Master 将分区文件的存储位置(IP+路径)告知对应的 Reduce 任务。同时,论文中提及 Map 阶段可以先对 Map 节点自己的数据进行排序和聚合,以减轻通信的压力。
Shuffle 阶段
Shuffle 阶段是 MapReduce 框架中连接 Map 任务和 Reduce 任务的核心环节,负责将 Map 生成的中间键值对按规则分发到对应的 Reduce 任务。其具体任务包括以下四个关键步骤:
- Map 端的中间数据写入
- 输出分区与本地存储:
- 每个 Map 任务生成中间键值对后,根据用户指定的分区函数(如
hash(key) % R)将数据划分为R个分区(对应R个 Reduce 任务)。 - 每个分区数据写入本地磁盘的临时文件,直到 Map 任务成功完成后再提交到最终路径,确保数据一致性。
- 每个 Map 任务生成中间键值对后,根据用户指定的分区函数(如
- Reduce 任务的数据拉取(Pull)
- 动态数据位置感知:
- Reduce 任务从所有 Map 节点的对应分区文件中拉取数据(Pull 机制)。例如,第
i号 Reduce 任务会从每个 Map 节点的第i个分区文件中读取数据。 - Master 节点记录每个 Map 任务的中间数据位置,并在 Map 节点故障或重试时动态更新这些信息,确保 Reduce 任务始终访问最新的数据源。
- Reduce 任务从所有 Map 节点的对应分区文件中拉取数据(Pull 机制)。例如,第
- 数据合并与排序(Merge & Sort)
- 按键排序与分组:
- Reduce 任务从多个 Map 节点拉取的中间数据会被合并(Merge)并按键排序,使得相同键的所有值连续存储。这是 Reduce 函数正确执行的前提条件(例如单词计数需聚合所有
("apple", 1)的值列表)。 - 排序过程可能涉及外部排序(External Sort),以应对内存不足的大规模数据。
- Reduce 任务从多个 Map 节点拉取的中间数据会被合并(Merge)并按键排序,使得相同键的所有值连续存储。这是 Reduce 函数正确执行的前提条件(例如单词计数需聚合所有
- 网络传输与容错
- 高并发网络通信:
- Shuffle 阶段涉及大量的网络数据传输。
- 容错机制:
- 若某个 Map 节点故障,Master 会重新调度该任务到其他节点,并通知依赖此数据的 Reduce 任务从新节点拉取数据。
- 对于“拖尾”任务(执行缓慢的 Map 任务),Master 会启动推测执行(Speculative Execution)副本,优先使用先完成的任务结果。
|
|
|
|
常见误区:
-
误区:Shuffle 仅涉及数据从 Map 到 Reduce 的简单传输。
-
真相:Shuffle 包含分区、拉取、合并、排序等复杂操作,且占整个作业执行时间的大部分。
-
容错设计:临时文件提交、任务重试与推测执行是分布式系统中保障 Shuffle 阶段可靠性的通用模式。
进一步思考
Shuffle 阶段和 Reduce 阶段的关系是什么?启动 Shuffle 是否意味着 Reduce 开始启动?
Shuffle 阶段的启动不等于 Reduce 函数执行!
- Shuffle 阶段的启动仅表示 Reduce 任务开始拉取已完成 Map 任务的中间数据,但 Reduce 函数的执行必须等待所有 Map 任务完成。
- 这是因为 Reduce 函数需要合并所有 Map 任务的中间数据并按键排序,才能正确执行聚合操作(例如单词计数需合并所有("apple", 1)的值列表)。
当某个 Map 任务的分区文件因节点故障或性能问题丢失/延迟时,Shuffle 阶段如何保证数据可用性?
根据论文描述,MapReduce 通过以下机制应对 Map 任务异常导致的数据不可用问题:
-
主节点监控与任务重试
- Master 定期向 Worker 发送心跳(ping),若某 Map 节点超时未响应,则标记为失败。
- Master 重新调度该节点未完成的 Map 任务到其他可用节点,并更新任务状态。
-
分区文件位置的动态更新
- 每个 Map 任务完成后,Master 会记录其生成的
R个分区文件的位置(IP+路径)。 - 若原始 Map 节点故障,Master 会将新生成的分区文件位置推送给所有依赖此数据的 Reduce 任务。
- 每个 Map 任务完成后,Master 会记录其生成的
-
任务原子提交与临时文件
- Map 任务在执行期间将输出写入本地临时文件,仅在任务成功完成后才提交(commit)到最终路径,确保 Reduce 任务读取的数据始终是完整且一致的。
-
推测执行(Speculative Execution)
- 对于执行缓慢的 Map 任务,Master 会启动一个备份任务(speculative copy)在其他节点上并行执行,以避免“拖尾”任务影响整体进度。
数据可用性的保障流程:
-
故障场景:
- 假设 Map 节点 A 发生故障,其生成的分区文件无法访问。
- Master 检测到 A 失败后,重新调度相同的 Map 任务到节点 B。
- Reduce 任务收到通知,从节点 B 的分区文件中读取数据,而非原始节点 A。
-
缓慢任务场景:
- 若 Map 节点 C 的处理速度显著低于其他节点,Master 启动推测任务到节点 D。
- Reduce 任务优先从先完成的节点(C 或 D)读取数据,忽略较慢的副本。
容错流程图:
|
|
注意:Shuffle 在第一个 Map 任务完成后即可启动,但 Reduce 任务需确保最终获取完整的分区数据。
如果省略排序步骤,Reduce 函数如何处理分散的相同键数据?可能会导致哪些问题?
回答这个问题,需要知道 Reduce 如何取得输入。首先,要明确一点:当前 MapReduce 框架中排序是正确性的必要条件!而不仅仅出于性能考虑。
核心概念解析
Map 与 Reduce 协作的正确流程:
-
阶段顺序:
- Map 并行执行:所有 Map 任务独立处理输入 Split,生成 R 个分区文件(每个对应一个 Reduce 任务)。
- Reduce 启动时机:当所有 Map 任务完成后,Reduce 任务才开始执行。
-
数据分发机制:
- Hash 分区:每个 Map 任务按
hash(key) % R将中间键值对分配到 R 个分区文件。 - Reduce 拉取数据:第 i 号 Reduce 任务从所有 Map 节点的第 i 号分区文件中拉取数据。
- Hash 分区:每个 Map 任务按
为何需要排序?——正确性的本质原因
-
Reduce 函数的输入要求:
- Reduce 函数的设计假设:所有相同键的值必须连续存储。
- 例如,在单词计数任务中,Reduce 需要一次性接收所有
("apple", 1)的值列表[1,1,1],而非分散的单个值。
-
Hash 分区的局限性:
- 保证键在同一个 Reduce 任务,但不保证键的值连续:
- 即使所有
"apple"键的数据被分配到 Reduce 1,这些数据可能分散在多个 Map 节点的分区文件中,且每个文件中"apple"的位置无序。 - Reduce 拉取所有数据后,若不排序,
"apple"的值会分散在多个位置,导致 Reduce 函数多次处理同一键的不同部分,结果错误。
- 即使所有
- 保证键在同一个 Reduce 任务,但不保证键的值连续:
总结来说,Reduce 会认为对于每一个 Key 均只需要处理一次,每重新启动一个 Reduce 均默认是从未处理过的新 Key。
那么,假设出于某种考虑,无论如何都不想进行排序操作,那么 shuffle 阶段将更加复杂:需要对所有 Map 产生的中间文件进行聚合操作,以确保 Reduce 一次性接受所有 key 对应的数据。同时,后续的容错处理也将十分复杂。
实际场景示例
-
场景 1:无排序导致错误
- 输入:
["a b a", "a c c"] - Map 输出:
- Map1:
[("a",1), ("b",1), ("a",1)] - Map2:
[("a",1), ("c",1), ("c",1)]
- Map1:
- Reduce 拉取分区后数据无序:
["a",1], ["c",1], ["a",1], ["b",1], ["c",1], ["a",1]
- Reduce 函数逐个处理键,可能输出
"a":1,"c":1,"a":1等错误结果。
- 输入:
-
场景 2:排序后正确分组
- 排序后数据:
["a",1], ["a",1], ["a",1], ["b",1], ["c",1], ["c",1] - Reduce 函数按组处理:
"a":[1,1,1],"b":,"c":[1,1],结果正确。
- 排序后数据: