分布式系统-MapReduce 阅读笔记

MapReduce 根据使用者的不同,会产生不同的视角:

  • 分布式系统的使用者:重点关注 Map&Reduce 两类函数的编写,无需关心具体的分布式框架。
  • 分布式系统的开发者:需要管理 Map&Reduce 如何分发到机器上以便降低网络传输需求,同时关注冗余和备份以便顺利且迅速的完成计算任务。

接下来,本文作为 MapReduce 论文和 Mit6.824 的阅读笔记将以分布式系统的开发者的视角分析如何构建一个 MapReduce 系统。

MapReduce 过程概述

根据 MapReduce 论文原文,其工作流程可总结为以下核心阶段:

  1. 初始化与任务分配

    • Master 节点接收用户提交的 Job(任务),解析输入文件并将其切分为 M 个 split(数据分片)
    • 通过 GFS(Google 文件系统)确定 split 的物理位置后,将 Map 任务分发到存储对应 split 的 worker 节点
  2. Map 阶段执行

    • 每个 worker 进程执行用户定义的 Map 函数,输入参数为 split 的文件名和内容
    • Map 函数通过 emit(key, value)生成中间键值对,这些数据被写入本地磁盘的临时文件
    • 例如单词计数器场景中,输入"a b"会被拆解为两个键值对:<a,1>和<b,1>
  3. Shuffle 阶段(核心数据交换)

    • Master 通知所有 worker 进行 shuffle 操作,该阶段包含三个关键动作:

      1
      2
      3
      4
      5
      6
      
      # 伪代码示例
      partitioned_data = partition(intermediate_data, R)  # 按key分区
      sorted_data = sort(partitioned_data)               # 区内排序
      for each key in sorted_data:
          remote_workers = find_workers_with_key(key)     # 定位key位置
          transfer_data(remote_workers)                   # 网络传输
      
    • 所有 Map 节点将中间数据按 key 哈希分到 R 个分区,每个分区对应一个 Reduce 任务

    • Reduce worker 通过 RPC 从所有 Map 节点拉取特定 key 的中间值,这个过程涉及跨节点网络传输

  4. Reduce 阶段执行

    • 每个 Reduce 任务合并所有相同 key 的 value 列表,执行用户定义的 Reduce 函数
    • 继续单词计数案例,输入["a", [1,1]]会输出<a,2>
    • 最终输出结果写入 GFS 等共享存储系统
  5. 作业完成与容错

    • Master 监控所有任务状态,若发现 worker 失效(如 60 秒无响应),则重新调度该任务到其他节点
    • 完整的 Job 包含 M 个 Map Task 和 R 个 Reduce Task,两者均可并行执行

典型执行流程图示:

1
2
3
4
[输入文件] → 分片 → [Map Task] → [本地写入] → [Shuffle网络传输] → [Reduce Task] → [输出结果]
          ↘                ↗
           ↖            ↙
             [Master协调]

关键设计要点:

  • 数据本地性优化:优先将 Map 任务调度到存储对应 split 的节点
  • 推测执行:当某些节点进度落后时,Master 会启动备份任务加速整体完成

Map 阶段

根据 MapReduce 论文,Map 阶段包含以下核心流程:

  1. 输入分片(Input Splitting)
  • 输入文件被划分为多个 Input Split,每个 Split 由一个 Map 任务处理。
  • Map 节点优先调度到存储 Split 的物理节点(数据本地性优化)。
  1. 执行 Map 函数
  • 用户定义的 Map(key, value)函数逐行处理 Split 数据。
  • 例如单词计数任务中,输入"a b a"会被拆解为三个中间键值对:("a", 1)、("b", 1)、("a", 1)。
  1. 内存缓存与分区(Partitioning)
  • 中间键值对写入内存缓冲区,当缓冲区满时触发溢写(Spill)到磁盘。
  • 分区函数(默认 hash(key) mod R)决定每个键值对属于哪个 Reduce 任务(共 R 个分区)。
  1. 本地磁盘写入(Spill to Disk)
  • 溢写时,每个分区的数据按 Key 排序,并压缩(可选)。
  • 多个溢写文件最终合并为单个分区文件(Partition File)。

Map 与 Shuffle 的协同机制

Shuffle 阶段的触发:Map 任务完成后,Worker 通知 Master“本任务完成”,Master 将分区文件的存储位置(IP+路径)告知对应的 Reduce 任务。同时,论文中提及 Map 阶段可以先对 Map 节点自己的数据进行排序和聚合,以减轻通信的压力。

Shuffle 阶段

Shuffle 阶段是 MapReduce 框架中连接 Map 任务和 Reduce 任务的核心环节,负责将 Map 生成的中间键值对按规则分发到对应的 Reduce 任务。其具体任务包括以下四个关键步骤:

  1. Map 端的中间数据写入
  • 输出分区与本地存储
    • 每个 Map 任务生成中间键值对后,根据用户指定的分区函数(如hash(key) % R)将数据划分为R个分区(对应R个 Reduce 任务)。
    • 每个分区数据写入本地磁盘的临时文件,直到 Map 任务成功完成后再提交到最终路径,确保数据一致性。
  1. Reduce 任务的数据拉取(Pull)
  • 动态数据位置感知
    • Reduce 任务从所有 Map 节点的对应分区文件中拉取数据(Pull 机制)。例如,第i号 Reduce 任务会从每个 Map 节点的第i个分区文件中读取数据。
    • Master 节点记录每个 Map 任务的中间数据位置,并在 Map 节点故障或重试时动态更新这些信息,确保 Reduce 任务始终访问最新的数据源。
  1. 数据合并与排序(Merge & Sort)
  • 按键排序与分组
    • Reduce 任务从多个 Map 节点拉取的中间数据会被合并(Merge)按键排序,使得相同键的所有值连续存储。这是 Reduce 函数正确执行的前提条件(例如单词计数需聚合所有("apple", 1)的值列表)。
    • 排序过程可能涉及外部排序(External Sort),以应对内存不足的大规模数据。
  1. 网络传输与容错
  • 高并发网络通信
    • Shuffle 阶段涉及大量的网络数据传输
  • 容错机制
    • 若某个 Map 节点故障,Master 会重新调度该任务到其他节点,并通知依赖此数据的 Reduce 任务从新节点拉取数据。
    • 对于“拖尾”任务(执行缓慢的 Map 任务),Master 会启动推测执行(Speculative Execution)副本,优先使用先完成的任务结果。
1
2
3
4
[Map任务开始] → [生成中间键值对] → [按R分区写入本地文件]
          ↘                          ↗
           ↗ (Map任务完成)          ↘
[Reduce任务启动] → [从所有Map节点拉取对应分区数据] → [合并+排序→输入Reduce函数]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# Map任务写入中间数据
def map_task(input_split):
    partitions = [[] for _ in range(R)]  # 初始化R个分区
    for key, value in input_split:
        partition_idx = hash(key) % R
        partitions[partition_idx].append((key, value))
    for i, records in enumerate(partitions):
        write_to_local_file(f"partition_{i}.tmp", records)  # 写入临时文件
    commit_files()  # 提交到最终路径

# Reduce任务拉取与处理数据
def reduce_task(partition_id):
    all_data = []
    for map_task in all_map_tasks:
        worker_ip = master.get_map_output_location(map_task)  # 获取数据位置
        data = http_get(worker_ip, f"partition_{partition_id}.tmp")  # 拉取数据
        all_data.extend(data)
    sorted_data = sorted(all_data, key=lambda x: x.key)  # 排序
    grouped_data = group_by_key(sorted_data)  # 分组
    for key, values in grouped_data.items():
        emit(key, reduce_function(values))  # 执行Reduce函数

常见误区

  • 误区:Shuffle 仅涉及数据从 Map 到 Reduce 的简单传输。

  • 真相:Shuffle 包含分区、拉取、合并、排序等复杂操作,且占整个作业执行时间的大部分。

  • 容错设计:临时文件提交、任务重试与推测执行是分布式系统中保障 Shuffle 阶段可靠性的通用模式。

进一步思考

Shuffle 阶段和 Reduce 阶段的关系是什么?启动 Shuffle 是否意味着 Reduce 开始启动?

Shuffle 阶段的启动不等于 Reduce 函数执行!

  • Shuffle 阶段的启动仅表示 Reduce 任务开始拉取已完成 Map 任务的中间数据,但 Reduce 函数的执行必须等待所有 Map 任务完成。
  • 这是因为 Reduce 函数需要合并所有 Map 任务的中间数据并按键排序,才能正确执行聚合操作(例如单词计数需合并所有("apple", 1)的值列表)。

当某个 Map 任务的分区文件因节点故障或性能问题丢失/延迟时,Shuffle 阶段如何保证数据可用性?

根据论文描述,MapReduce 通过以下机制应对 Map 任务异常导致的数据不可用问题:

  1. 主节点监控与任务重试

    • Master 定期向 Worker 发送心跳(ping),若某 Map 节点超时未响应,则标记为失败。
    • Master 重新调度该节点未完成的 Map 任务到其他可用节点,并更新任务状态。
  2. 分区文件位置的动态更新

    • 每个 Map 任务完成后,Master 会记录其生成的R个分区文件的位置(IP+路径)。
    • 若原始 Map 节点故障,Master 会将新生成的分区文件位置推送给所有依赖此数据的 Reduce 任务。
  3. 任务原子提交与临时文件

    • Map 任务在执行期间将输出写入本地临时文件,仅在任务成功完成后才提交(commit)到最终路径,确保 Reduce 任务读取的数据始终是完整且一致的。
  4. 推测执行(Speculative Execution)

    • 对于执行缓慢的 Map 任务,Master 会启动一个备份任务(speculative copy)在其他节点上并行执行,以避免“拖尾”任务影响整体进度。

数据可用性的保障流程

  • 故障场景

    • 假设 Map 节点 A 发生故障,其生成的分区文件无法访问。
    • Master 检测到 A 失败后,重新调度相同的 Map 任务到节点 B。
    • Reduce 任务收到通知,从节点 B 的分区文件中读取数据,而非原始节点 A。
  • 缓慢任务场景

    • 若 Map 节点 C 的处理速度显著低于其他节点,Master 启动推测任务到节点 D。
    • Reduce 任务优先从先完成的节点(C 或 D)读取数据,忽略较慢的副本。

容错流程图

1
2
3
4
[Map节点A执行任务] → [生成分区文件] → [Master记录位置]
          ↘                            ↗
           ↗ (节点A故障/超时)          ↘
[Map节点B重试任务] → [生成新分区文件] → [Master更新位置] → [Reduce从B读取]

注意:Shuffle 在第一个 Map 任务完成后即可启动,但 Reduce 任务需确保最终获取完整的分区数据。

如果省略排序步骤,Reduce 函数如何处理分散的相同键数据?可能会导致哪些问题?

回答这个问题,需要知道 Reduce 如何取得输入。首先,要明确一点:当前 MapReduce 框架中排序是正确性的必要条件!而不仅仅出于性能考虑。

核心概念解析

Map 与 Reduce 协作的正确流程

  • 阶段顺序

    • Map 并行执行:所有 Map 任务独立处理输入 Split,生成 R 个分区文件(每个对应一个 Reduce 任务)。
    • Reduce 启动时机:当所有 Map 任务完成后,Reduce 任务才开始执行。
  • 数据分发机制

    • Hash 分区:每个 Map 任务按hash(key) % R将中间键值对分配到 R 个分区文件。
    • Reduce 拉取数据:第 i 号 Reduce 任务从所有 Map 节点的第 i 号分区文件中拉取数据。

为何需要排序?——正确性的本质原因

  • Reduce 函数的输入要求

    • Reduce 函数的设计假设:所有相同键的值必须连续存储
    • 例如,在单词计数任务中,Reduce 需要一次性接收所有("apple", 1)的值列表[1,1,1],而非分散的单个值。
  • Hash 分区的局限性

    • 保证键在同一个 Reduce 任务,但不保证键的值连续
      • 即使所有"apple"键的数据被分配到 Reduce 1,这些数据可能分散在多个 Map 节点的分区文件中,且每个文件中"apple"的位置无序。
      • Reduce 拉取所有数据后,若不排序,"apple"的值会分散在多个位置,导致 Reduce 函数多次处理同一键的不同部分,结果错误。

总结来说,Reduce 会认为对于每一个 Key 均只需要处理一次,每重新启动一个 Reduce 均默认是从未处理过的新 Key。

那么,假设出于某种考虑,无论如何都不想进行排序操作,那么 shuffle 阶段将更加复杂:需要对所有 Map 产生的中间文件进行聚合操作,以确保 Reduce 一次性接受所有 key 对应的数据。同时,后续的容错处理也将十分复杂。

实际场景示例

  • 场景 1:无排序导致错误

    • 输入:["a b a", "a c c"]
    • Map 输出:
      • Map1: [("a",1), ("b",1), ("a",1)]
      • Map2: [("a",1), ("c",1), ("c",1)]
    • Reduce 拉取分区后数据无序:
      • ["a",1], ["c",1], ["a",1], ["b",1], ["c",1], ["a",1]
    • Reduce 函数逐个处理键,可能输出"a":1, "c":1, "a":1等错误结果。
  • 场景 2:排序后正确分组

    • 排序后数据:["a",1], ["a",1], ["a",1], ["b",1], ["c",1], ["c",1]
    • Reduce 函数按组处理:"a":[1,1,1], "b":, "c":[1,1],结果正确。
updatedupdated2025-07-272025-07-27