第十一章:批处理

带有太强个人色彩的系统无法成功。当最初的设计完成并且相对稳健时,真正的考验才刚开始:此后会有许多持不同观点的人做出各自的实验。

高德纳

到目前为止,本书大部分内容都围绕着 请求(request)查询(query) 以及对应的 响应(response)结果(result) 展开。现代很多数据系统都默认采用这种处理方式:你发出请求或指令,系统尽快给出答案。

网页浏览器请求页面、服务调用远程 API、数据库、缓存、搜索索引,以及很多其他系统都如此运作。我们称这类系统为 在线系统(online systems)。它们通常以响应时间作为主要性能指标,并且往往需要良好的容错能力来保证高可用。

但有时候,你需要执行的计算比一次交互式请求大得多,或者要处理的数据量远超单次请求能承载的范围。例如训练 AI 模型、把海量数据从一种形式转换成另一种形式、或者在超大数据集上做分析计算。我们把这类任务称为 批处理(batch processing) 作业,有时也称为 离线系统(offline systems)

批处理作业读取一批输入数据(只读),并生成一批输出数据(每次运行都从头生成)。它通常不会像读写事务那样原地修改数据。因此,输出是由输入推导出的 派生数据(derived data)(见“记录系统与派生数据”):如果不满意输出,你可以直接删除它,修改作业逻辑,再跑一遍即可。把输入视为不可变并尽量避免副作用(例如直接写外部数据库),不仅有助于性能,也带来其他好处:

  • 如果你在代码中引入了 bug 导致输出错误或损坏,可以直接回滚代码并重跑作业,输出就会恢复正确。更简单的做法是把旧输出保留在另一个目录,直接切回旧版本。多数对象存储与开放表格式(见“云数据仓库”)都支持这种能力,通常称为 时间旅行(time travel)。大多数支持读写事务的数据库不具备这种特性:如果错误代码把坏数据写进数据库,仅回滚代码并不能修复已写入的数据。能够从错误代码中恢复的能力被称为 容忍人为失误 1

  • 因为回滚容易,功能开发能比“犯错会造成不可逆损害”的环境更快推进。这个 最小化不可逆性 的原则对敏捷开发非常有益 2

  • 同一组文件可以作为多种作业的输入,包括监控类作业:例如计算指标、验证输出是否符合预期(如与上一次结果比较并度量偏差)。

  • 批处理框架能更高效地利用计算资源。虽然也可以用 OLTP 数据库和应用服务器等在线系统做批处理,但资源成本通常显著更高。

批处理也有挑战。多数框架中,作业只有在整体完成后,其输出才能被下游进一步处理。批处理也可能低效:输入哪怕只变动一个字节,也可能需要重算整个输入数据集。尽管如此,批处理在大量场景中依然非常有用,我们会在“批处理用例”中回到这个话题。

批处理作业可能运行很久:几分钟、几小时甚至几天。很多作业是周期调度的(例如每天一次)。它的核心性能指标通常是吞吐量:单位时间能处理多少数据。有些批处理系统通过“中止并整体重启”应对故障,也有些具备更细粒度容错能力,可以在部分节点崩溃时仍让作业完成。

Note

批处理的另一种替代形态是 流处理(stream processing):作业不会在“处理完输入后结束”,而是持续监听输入,并在变化发生后很快处理。我们将在第十二章讨论流处理。

在线处理与批处理的边界并不总是清晰:一个运行很久的数据库查询,看起来也很像批处理过程。但批处理有一些独特特性,使其成为构建可靠、可伸缩、可维护应用的重要积木。例如,它常在 数据集成(data integration) 中发挥作用,即把多个数据系统组合起来完成单一系统做不到的事。ETL(见“数据仓库”)就是典型例子。

现代批处理深受 MapReduce 影响。Google 在 2004 年发表了这一批处理算法 3,随后 Hadoop、CouchDB、MongoDB 等开源系统都实现了它。MapReduce 是相对底层的编程模型,其能力不如数据仓库中的并行查询执行引擎成熟 4 5。它在诞生时确实让商用硬件上的处理规模跃升一大步,但今天已大体过时,Google 内部也不再使用 6 7

如今批处理更常通过 Spark、Flink 或数据仓库查询引擎完成。它们与 MapReduce 一样高度依赖分片(见第七章)和并行执行,但缓存与执行策略更成熟。随着这些系统走向成熟,运维问题已大幅缓解,重点转向可用性:数据流 API、查询语言、DataFrame API 得到广泛支持;任务与工作流编排也显著进化。以 Hadoop 为中心的 Oozie、Azkaban 等调度器,正被 Airflow、Dagster、Prefect 这类更通用方案替代,它们可协调多种批处理框架与云数据仓库。

云计算已无处不在。批处理存储层也正在从 HDFS、GlusterFS、CephFS 这类分布式文件系统(DFS)向 S3 等对象存储迁移。BigQuery、Snowflake 这类可伸缩云数据仓库,正在模糊“数据仓库”和“批处理系统”之间的边界。

为了建立直觉,本章先从单机 Unix 工具示例出发,再扩展到分布式多机处理。你会看到,分布式批处理框架在很多方面很像操作系统:它也有调度器和文件系统。随后我们会讨论编写批处理作业的几种处理模型,最后给出常见应用场景。

使用 Unix 工具的批处理

假设你有一台 Web 服务器,每处理一个请求就在日志文件末尾追加一行。例如,使用 nginx 默认访问日志格式,一行可能像这样:

216.58.210.78 - - [27/Jun/2025:17:55:11 +0000] "GET /css/typography.css HTTP/1.1"
200 3377 "https://martin.kleppmann.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X
10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"

(实际上这是一行,这里为了阅读方便换了行。)这一行包含了很多信息。要正确解释它,你需要日志格式定义:

$remote_addr - $remote_user [$time_local] "$request"
$status $body_bytes_sent "$http_referer" "$http_user_agent"

这表示:UTC 时间 2025 年 6 月 27 日 17:55:11,服务器收到来自客户端 IP 216.58.210.78/css/typography.css 的请求。用户未认证,因此 $remote_user 是连字符(-)。响应状态码是 200(成功),响应体大小 3,377 字节。浏览器是 Chrome 137,该文件是从页面 https://martin.kleppmann.com/ 引用而来。

看起来“解析日志”有点朴素,但它在现代科技公司里是核心能力之一,从广告流水线到支付处理都大量依赖。事实上,这也是 MapReduce 与“大数据”浪潮快速兴起的重要推动力。

简单日志分析

很多工具都能从日志生成漂亮的网站流量报告。这里为了练手,我们只用基础 Unix 工具自己做一个。比如你想找出网站最受欢迎的五个页面,可以在 shell 中这样做:

cat /var/log/nginx/access.log | #1
  awk '{print $7}' | #2
  sort             | #3
  uniq -c          | #4
  sort -r -n       | #5
  head -n 5          #6
  1. 读取日志文件。(严格说这里不需要 cat,可直接把文件作为 awk 参数;但这样写更直观看出线性管道。)
  2. 以空白字符切分每行,只输出第 7 个字段,也就是请求 URL。上面的样例中是 /css/typography.css
  3. 按字典序对 URL 排序。某个 URL 若出现 n 次,排序后会连续出现 n 行。
  4. uniq 通过比较相邻两行是否相同来去重。-c 让它输出计数:每个不同 URL 出现了多少次。
  5. 第二次 sort 按每行开头的数字(-n)排序,并用 -r 逆序,出现次数最多的排在最前。
  6. head 只保留前 5 行(-n 5),丢弃其余。

输出大致如下:

    4189 /favicon.ico
    3631 /2016/02/08/how-to-do-distributed-locking.html
    2124 /2020/11/18/distributed-systems-and-elliptic-curves.html
    1369 /
     915 /css/typography.css

如果你不熟悉 Unix 工具,这条命令看起来可能有点晦涩,但它威力很强。它能在几秒内处理 GB 级日志,而且修改分析逻辑也非常方便:例如要排除 CSS 文件,可把 awk 参数改成 '$7 !~ /\.css$/ {print $7}';若要统计访问最多的客户端 IP,把 awk 参数改成 '{print $1}' 即可。

本书篇幅有限,无法展开讲 Unix 工具,但它们非常值得学。令人惊讶的是,仅靠 awksedgrepsortuniqxargs 的组合,就能在几分钟内做出很多数据分析,并且性能相当好 8

命令链与自定义程序

你也可以不用 Unix 管道,而写个小程序完成同样的事。比如用 Python:

from collections import defaultdict

counts = defaultdict(int) #1

with open('/var/log/nginx/access.log', 'r') as file:
    for line in file:
        url = line.split()[6] #2
        counts[url] += 1 #3

top5 = sorted(((count, url) for url, count in counts.items()), reverse=True)[:5] #4

for count, url in top5:  #5
    print(f"{count} {url}")
  1. counts 是散列表,记录每个 URL 出现次数,默认值为 0。
  2. 每行按空白字符切分,取第 7 个字段作为 URL(Python 数组从 0 开始,所以索引是 6)。
  3. 当前行对应 URL 的计数器加一。
  4. 按计数降序排序,取前五项。
  5. 打印前五项。

这个程序不如 Unix 管道简洁,但可读性也不错,偏好取决于习惯。不过两者除了语法差异,执行流程也很不一样;在大文件上运行时,这种差异会很明显。

排序与内存聚合

Python 脚本在内存里维护了一个“URL -> 出现次数”的散列表。Unix 管道示例没有这种散列表,而是通过排序把同一 URL 的多次出现排到一起。

哪种方法更好?取决于不同 URL 的数量。对多数中小网站而言,通常可以把所有不同 URL 及其计数器放进(比如)1GB 内存。这个作业的 工作集(working set)(需要随机访问的内存规模)只取决于不同 URL 的个数:即便一百万条日志都指向同一 URL,散列表也只存一个 URL 和一个计数器。工作集足够小时,内存散列表很好用,笔记本都能跑。

但如果工作集大于可用内存,排序法就有优势:它能高效使用磁盘。这与“日志结构存储”中的原理一样:先在内存对数据块排序并写成段文件,再把多个有序段合并成更大的有序文件。归并排序的顺序访问模式对磁盘很友好(见“SSD 上的顺序写与随机写”)。

GNU Coreutils(Linux)中的 sort 能自动把超内存数据溢写到磁盘,并自动利用多核并行排序 9。这意味着前面的 Unix 命令链可以自然扩展到大数据集而不耗尽内存,瓶颈通常变成磁盘读取输入文件的速率。

Unix 工具的一个局限是它们只在单机运行。当数据大到单机内存或本地磁盘都放不下时,就需要分布式批处理框架。

分布式系统中的批处理

在前面的 Unix 示例中,单机有几个协同组件在处理日志:

  • 通过操作系统文件系统接口访问的存储设备。
  • 决定进程何时运行、如何分配 CPU 资源的调度器。
  • 一串通过管道把 stdin/stdout 连接起来的 Unix 程序。

分布式批处理框架也有对应组件。某种意义上,你可以把分布式处理框架看成“分布式操作系统”:它有文件系统、有任务调度器,还有通过文件系统或其他通道互相传递数据的程序。

分布式文件系统

操作系统提供的文件系统由多层组成:

  • 最底层是块设备驱动,直接与磁盘交互,向上层提供原始块读写。
  • 块层之上是页缓存,缓存最近访问块以提升读取速度。
  • 块 API 之上是文件系统层,负责把大文件切块,并维护 inode、目录、文件等元数据。Linux 常见实现如 ext4、XFS。
  • 最上层,操作系统通过统一 API(虚拟文件系统,VFS)向应用暴露不同文件系统,让应用以统一方式读写底层不同实现。

分布式文件系统(DFS)工作方式很类似:文件被切成块并分散到多台机器。DFS 的块通常比本地文件系统大得多:HDFS 默认 128MB,JuiceFS 和许多对象存储常用 4MB,而 ext4 默认块通常是 4096 字节。块越大,需要维护的元数据越少,这对 PB 级数据非常关键;同时寻道开销占比也更低。

大多数物理存储设备不能做“部分块写入”,即使数据不足一个块也得写满块。DFS 的块更大且通常构建在操作系统文件系统之上,因此一般没有这个约束。比如一个 900MB 文件在 128MB 分块下,会有 7 个 128MB 块和 1 个 4MB 块。

读取 DFS 块需要通过网络请求到持有该块的集群节点。每台机器都运行守护进程,对外提供 API,使远程进程能把本地文件系统中的块当作文件读写。HDFS 把这些守护进程叫 DataNode,GlusterFS 叫 glusterfsd。后文统称 数据节点(data node)

DFS 也实现了“分布式版本”的页缓存。因为 DFS 块作为文件存放在数据节点本地,读写会经过数据节点操作系统,自带内存页缓存,热门块会被缓存在内存中。某些 DFS 还提供更多缓存层,例如 JuiceFS 的客户端缓存和本地磁盘缓存。

像 ext4/XFS 这样的文件系统会维护空闲空间、块位置、目录结构、权限等元数据。DFS 同样需要记录“文件块分布在哪些机器”“权限如何”等信息。Hadoop 使用 NameNode 维护集群元数据;DeepSeek 的 3FS 使用元数据服务并把元数据持久化到 FoundationDB 之类键值存储。

在文件系统之上是 VFS。批处理系统里最接近它的是 DFS 协议:批处理框架需要通过协议/接口来读写存储。只要实现协议,就能作为可插拔存储接入。例如 S3 API 已被 MinIO、Cloudflare R2、Tigris、Backblaze B2 等大量系统兼容支持。具备 S3 支持的批处理系统通常可直接使用这些存储。

有些 DFS 还提供 POSIX 兼容文件系统,让操作系统 VFS 把它当普通文件系统。常见集成方式是 FUSE 或 NFS 协议。NFS 可能是最知名分布式文件系统协议,最初用于让多个客户端读写单个服务器上的数据。后来 AWS EFS、Archil 等提供了更可伸缩的 NFS 兼容实现。NFS 客户端虽仍连到一个端点,但底层会与分布式元数据服务和数据节点交互完成读写。

分布式文件系统与网络存储

分布式文件系统基于 无共享(shared-nothing) 原则(见“共享内存、共享磁盘与无共享架构”),与 NAS(网络附加存储)和 SAN(存储区域网络)等 共享磁盘 方案形成对照。共享磁盘通常依赖集中式存储设备、定制硬件和专用网络(如光纤通道);无共享方案不要求专用硬件,只需普通数据中心网络互联的机器。

很多 DFS 构建在商用硬件上,成本更低但故障率高于企业级专用硬件。为容忍机器和磁盘故障,文件块通常复制到多台机器。这也让调度器更容易均衡负载:任务可在任一持有输入副本的节点运行。复制可以是多副本(见第六章),也可以是 Reed-Solomon 等 纠删码 方案,以更低存储开销恢复丢失数据 10 11 12。这与 RAID 思想类似,只是 RAID 面向同一机器上的多块磁盘,而 DFS 是通过普通数据中心网络跨机器做访问和复制。

对象存储

Amazon S3、Google Cloud Storage、Azure Blob Storage、OpenStack Swift 等对象存储,已成为批处理场景中对 DFS 的主流替代。实际上两者边界越来越模糊:正如前一节和“由对象存储支撑的数据库”所述,FUSE 可以把 S3 这类对象存储“挂载成文件系统”;JuiceFS、Ceph 等系统也同时提供对象 API 与文件系统 API。但这些接口、性能、以及一致性保证差异很大,即便 API 看似兼容,也需要仔细验证行为是否符合预期。

对象存储中的每个对象有一个 URL,例如 s3://my-photo-bucket/2025/04/01/birthday.png。其中主机部分(my-photo-bucket)是 bucket 名,后半部分是对象 键(key)(示例里是 /2025/04/01/birthday.png)。bucket 名全局唯一;对象键在 bucket 内必须唯一。

对象读取用 get,写入用 put。与文件系统文件不同,对象写入后通常不可变;更新对象需要通过 put 全量重写,类似键值存储。Azure Blob Storage 和 S3 Express One Zone 支持追加,但多数对象存储不支持。它也没有 fopenfseek 这类文件句柄 API。

对象看起来像按目录组织,这很容易让人误解:对象存储并没有真正目录概念。所谓路径只是约定,斜杠也是 key 的一部分。这个约定允许你按前缀列出对象,类似“目录列表”,但与文件系统目录列举有两点不同:

  • 前缀 list 行为更像 Unix 的递归 ls -R:会返回所有以该前缀开头的对象,包括“子路径”下的对象。
  • 不存在“空目录”。如果你删除了 s3://my-photo-bucket/2025/04/01 下所有对象,再列 s3://my-photo-bucket/2025/04 时就看不到 01。常见做法是创建 0 字节对象表示空目录(如创建空对象 s3://my-photo-bucket/2025/04/01 以保留目录占位)。

DFS 常支持硬链接、符号链接、文件锁、原子重命名等文件系统操作,而对象存储通常缺失这些能力:链接和锁大多不支持;重命名也非原子,通常是“复制到新 key,再删除旧 key”。若要“重命名目录”,因为目录名是 key 的一部分,实际上要逐个对象重命名。

第四章讨论的键值存储通常面向小值(通常 KB 级)和高频低延迟读写。相比之下,DFS 和对象存储通常优化的是大对象(MB 到 GB)和低频大块读写。不过近年对象存储也在增强小对象高频访问能力,例如 S3 Express One Zone 已提供单毫秒级延迟,计费模型也更接近键值存储。

DFS 与对象存储另一个区别是:HDFS 等 DFS 可把计算任务调度到持有文件副本的机器上,让任务本地读文件,减少网络传输(当任务代码远小于待读文件时尤其划算)。对象存储通常把存储和计算解耦,虽然可能用更多带宽,但现代数据中心网络很快,通常可接受。同时这种解耦让 CPU/内存与存储容量可以独立扩展。

分布式作业编排

前面的“操作系统类比”同样适用于作业编排。在单机上跑 Unix 批处理任务时,总得有东西真正去执行 awksortuniqhead 进程;需要把一个进程输出送到另一个进程输入;要给每个进程分配内存;公平调度 CPU 指令;隔离内存与 I/O 边界,等等。单机里这由操作系统内核负责;分布式环境里,这就是作业编排器(orchestrator)的职责。

批处理框架会向编排器的调度器发起“运行作业”请求。请求通常包含如下元数据:

  • 需要执行的任务数量;
  • 每个任务所需内存、CPU、磁盘;
  • 作业标识符;
  • 访问凭据;
  • 输入输出等作业参数;
  • 所需硬件信息(如 GPU、磁盘类型);
  • 作业可执行代码的位置。

Kubernetes、Hadoop YARN(Yet Another Resource Negotiator)13 等编排器会结合这些请求与集群状态,依靠以下组件执行任务:

任务执行器(Task executors)

每个节点上运行执行器守护进程,例如 YARN 的 NodeManager 或 Kubernetes 的 kubelet。执行器负责拉起任务、通过心跳上报存活状态、跟踪节点上的任务状态与资源占用。收到“启动任务”请求后,执行器会获取作业代码并执行启动命令;随后持续监控进程直至结束或失败,并更新对应状态元数据。

很多执行器还配合操作系统实现安全与性能隔离,例如 YARN 和 Kubernetes 都会使用 Linux cgroups。这样可防止任务越权访问数据,或因资源滥用影响同机其他任务。

资源管理器(Resource Manager)

资源管理器维护各节点元数据:可用硬件(CPU、GPU、内存、磁盘等)、任务状态、网络位置、节点健康状态等,从而形成全局视图。其中心化特性可能成为可用性和可伸缩性瓶颈。YARN 借助 ZooKeeper,Kubernetes 借助 etcd 存储集群状态(见“协调服务”)。

调度器(Scheduler)

编排器通常包含中心化调度子系统,接收启动/停止作业与状态查询请求。例如收到“启动 10 个任务,使用指定 Docker 镜像,且必须运行在某类 GPU 节点上”的请求后,调度器会基于请求和资源管理器状态决定“哪些任务跑在哪些节点”,再通知执行器执行。

不同编排器命名各异,但几乎都具备这些核心组件。

Note

有些调度决策需要“应用特定调度器”参与,才能考虑更具体的业务约束,例如当查询量达到阈值时自动扩容只读副本。中心调度器与应用调度器协同决定如何执行任务。YARN 把这类子调度器称为 ApplicationMaster,Kubernetes 通常称为 operator

资源分配

调度器在编排系统中最具挑战的职责之一,就是在资源有限且作业需求冲突时,做出合理分配。它本质上是在公平与效率之间做平衡。

假设一个小集群有 5 个节点,共 160 个 CPU 核。调度器收到两个作业请求,每个都想要 100 核。怎么排最好?

  • 可以给每个作业先分 80 个任务,剩余 20 个等前面的任务结束后再启动。
  • 也可以先跑完其中一个作业,再等 100 核都空出来后跑另一个。这叫 gang scheduling(成组调度)。
  • 如果一个请求先到,调度器还要决定是立即把 100 核都给它,还是为未来请求预留一部分资源。

这是很简化的例子,但已经能看到艰难权衡。以成组调度为例,如果调度器为了凑齐 100 核而长期预留资源,节点会闲置,资源利用率下降,若其他作业也在抢占式预留,还可能死锁。

反过来,如果只是被动等 100 核“自然可用”,中间可能被别的作业拿走,导致长时间凑不齐,从而产生 饥饿(starvation)。调度器也可以 抢占(preempt) 一部分先到作业任务,把它们杀掉给后到作业腾资源;但被杀任务之后还要重跑,整体效率同样下降。

把这个问题放大到数百甚至数百万个请求,想求全局最优几乎不可行。事实上这是 NP-hard 问题:除了很小规模,很难在可接受时间内算出最优解 14 15

因此工程上调度器通常采用启发式方法,在非最优前提下做“足够好”的决策。常见算法包括 FIFO、主导资源公平(DRF)、优先级队列、容量/配额调度、各种装箱算法等。细节超出本书范围,但这是非常有趣的研究领域。

工作流调度

本章开头的 Unix 示例是多个命令串联。分布式批处理中同样常见:一个作业输出要成为一个或多个后续作业输入,而每个作业又可能依赖多个上游输入。这个依赖结构称为 工作流(workflow)有向无环图(DAG)

Note

我们在“持久化执行与工作流”中讨论过“按步骤执行 RPC”的工作流引擎;在批处理语境里,“工作流”指的是一串批处理过程:每一步读输入、产输出,通常不直接对外做 RPC。持久化执行引擎通常单次请求处理的数据量小于批处理系统,但两者边界并非绝对。

需要多作业工作流常见有以下原因:

  • 一个作业输出可能被多个团队维护的下游作业消费。此时先把输出写到公共位置更合理,下游可按“数据更新触发”或定时方式运行。
  • 你可能要在多个处理工具间传递数据。比如 Spark 作业写 HDFS,再由 Python 触发 Trino SQL 查询(见“云数据仓库”)继续处理并写入 S3。
  • 有些流水线内部天然需要多阶段。例如第一阶段按某键分片,下一阶段按另一键分片,那么第一阶段需要先产出符合第二阶段要求的数据布局。

在 Unix 里,管道用很小的内存缓冲连接前后命令,不落盘。若缓冲区满,上游必须等待下游消费,这是一种 背压(backpressure)。Spark、Flink 等批处理执行引擎也支持类似模式:一个任务输出直接传给下一任务(跨机时经网络传输)。

但在工作流中,更常见仍是“上游作业写 DFS/对象存储,下游再读”,这样可让作业在时间上解耦。若一个作业有多个输入,工作流调度器通常会等待所有上游输入生产成功后再启动它。

YARN ResourceManager 或 Spark 内置调度器主要做“作业内调度”,不负责整条工作流。为管理跨作业依赖,出现了 Airflow、Dagster、Prefect 等工作流调度器。它们在维护大量批作业时非常关键:包含 50~100 个作业的工作流并不罕见;大型组织内很多团队会跨系统互相消费输出。没有工具支撑,很难管理这种复杂数据流。

故障处理

批处理作业往往运行时间长。长时间运行且并行任务多的作业,在执行过程中遇到至少一次任务失败几乎是常态。正如“硬件与软件故障”“不可靠网络”所述,原因可能是硬件故障(商用硬件尤甚)、网络中断等。

任务无法完成的另一原因是被调度器主动抢占(kill)。当系统有多优先级队列时,这很常见:低优先级任务便宜、高优先级任务昂贵。低优先级任务可用空闲算力跑,但高优先级任务一到就可能把它们抢占掉。云厂商的对应产品名分别是:AWS 的 spot instances、Azure 的 spot virtual machines、GCP 的 preemptible instances 16

批处理很多时候对实时性要求不高,因此很适合利用低优先级资源/抢占式实例降成本:本质上它在“吃”否则会闲置的算力,提高集群利用率。但代价是更高的被杀概率:实际里抢占往往比硬件故障更常见 17

由于批处理每次都从头生成输出,任务失败比在线系统更容易处理:删掉失败任务的部分输出,把任务重新调度到别的机器重跑即可。若只因一个任务失败就重跑整个作业会非常浪费,因此 MapReduce 及其后继系统都尽量让并行任务彼此独立,从而把重试粒度降到单个任务 3

当一个任务输出成为另一任务输入(即在工作流内传递)时,容错更复杂。MapReduce 的做法是:中间数据总是写回 DFS,且只有写入任务成功后才允许下游读取。这个方案在频繁抢占环境中也能工作,但会带来大量 DFS 写入,效率不高。

Spark 更倾向把中间数据放内存或溢写本地磁盘,只把最终结果写 DFS;它还记录中间数据的计算血缘,丢失时可重算 18。Flink 则采用定期检查点快照机制 19。我们会在“数据流引擎”继续讨论。

批处理模型

前面我们讨论了分布式环境中批作业如何调度。现在转向“批处理框架如何处理数据”。最常见的两类模型是 MapReduce 与数据流引擎。尽管实践中数据流引擎已大面积替代 MapReduce,但理解 MapReduce 仍然重要,因为它深刻影响了现代批处理框架。

MapReduce 与数据流引擎都发展出多种编程接口:低层 API、关系查询语言、DataFrame API。它们让应用工程师、数据分析工程师、业务分析师乃至非技术人员都能参与数据处理。我们将在“批处理用例”中讨论这些用途。

MapReduce

MapReduce 的处理模式与“简单日志分析”几乎同构:

  1. 读取输入文件并切分为 记录(records)。在日志例子里,每条记录就是一行(\n 为记录分隔符)。在 Hadoop MapReduce 中,输入通常存放在 HDFS 或 S3 等对象存储,文件格式可能是 Parquet(列式,见“面向列存储”)或 Avro(行式,见“Avro”)。
  2. 调用 mapper,从每条输入记录中提取键和值。Unix 示例中 mapper 相当于 awk '{print $7}':URL($7)是键,值可留空。
  3. 按键排序所有键值对。日志示例中这一步对应第一次 sort
  4. 调用 reducer 遍历排序后的键值对。同键记录会相邻,因此可以在很小内存状态下合并。Unix 示例中 reducer 等价于 uniq -c,统计相邻同键记录数。

这四步就是一个 MapReduce 作业。第 2 步(map)与第 4 步(reduce)是你写业务逻辑的地方;第 1 步(文件切记录)由输入格式解析器完成;第 3 步排序在 MapReduce 中是隐式内置的,你无需手写。这一步是批处理的基础算法,我们会在“混洗数据”再讨论。

要创建 MapReduce 作业,你需实现两个回调:mapper 与 reducer,其行为如下。

Mapper

对每条输入记录调用一次。它从输入记录中提取键和值,并可为每条输入产生任意数量键值对(包括 0 条)。它不保留跨记录状态,每条记录独立处理。

Reducer

框架收集 mapper 产生的键值对,把同键值集合交给 reducer(以迭代器形式)。reducer 可输出结果记录(如同一 URL 的出现次数)。

在日志示例里,第 5 步还有一次 sort 用于按请求次数排名 URL。MapReduce 若要第二轮排序,通常要再写一个作业:前一个输出作为后一个输入。换个角度看,mapper 的作用是把数据整理成适合排序的形态;reducer 的作用是处理已排序数据。

MapReduce 与函数式编程

MapReduce 虽用于批处理,但其编程模型来自函数式编程。Lisp 把 mapreduce/fold 作为列表上的高阶函数引入,后来进入 Python、Rust、Java 等主流语言。包括 SQL 在内的大量数据处理操作都可在 MapReduce 之上表达。Map 和 reduce 以及函数式编程的一些特性恰好契合 MapReduce:可组合、天然适合数据处理链;map 还是典型“令人尴尬地并行”(每条输入独立处理);reduce 则可按不同键并行。

但用原始 MapReduce API 写复杂处理其实很费力,例如各种连接算法都要自己实现 20。MapReduce 相比现代批处理引擎也偏慢,一个重要原因是其“以文件为中心”的 I/O 让作业流水化困难:上游不结束,下游很难提前处理输出。

数据流引擎

为解决 MapReduce 的局限,出现了多种分布式批处理执行引擎,最著名的是 Spark 18 21 和 Flink 19。它们设计细节各异,但有一个共同点:把整条工作流当成一个作业处理,而不是拆成互相独立的小作业。

因为它们显式建模了跨多个处理阶段的数据流动,所以称为 数据流引擎(dataflow engines)。与 MapReduce 一样,它们提供低层 API(反复调用用户函数逐条处理记录),也提供更高层算子(如 joingroup by)。它们通过分片并行输入,并通过网络把一个任务输出传给另一个任务输入。与 MapReduce 不同,算子不必严格在 map/reduce 两类角色间交替,而可以更灵活组合。

这些 API 通常以关系风格构件表达计算:按字段值连接数据集、按键分组、按条件过滤、按计数或求和等函数聚合。内部实现依赖的正是下一节要讲的混洗算法。

这种处理引擎风格可追溯到 Dryad 22、Nephele 23 等研究系统。相比 MapReduce,它有几个优势:

  • 像排序这类昂贵操作只在“确实需要”的地方执行,而不是每个 map 与 reduce 阶段之间都默认做。
  • 连续多个不改变分片方式的算子(如 map/filter)可融合成一个任务,减少数据复制开销。
  • 由于工作流里的连接与数据依赖都显式声明,调度器能全局优化数据局部性。比如把“消费某数据”的任务放到“生产该数据”的同机上,用共享内存缓冲交换,而非走网络拷贝。
  • 算子间中间状态通常放内存或本地磁盘即可,比写 DFS/对象存储 I/O 更低(后者要多副本并落到多机磁盘)。MapReduce 仅对 mapper 输出做了这类优化,数据流引擎把它推广到所有中间状态。
  • 输入一就绪就能启动下游算子,无需等待整个上游阶段全部完成。
  • 可复用已有进程运行新算子,减少启动开销;MapReduce 往往为每个任务起一个新 JVM。

因此,数据流引擎能实现与 MapReduce 工作流同样的计算,但通常速度明显更快。

混洗数据

本章开头的 Unix 工具示例和 MapReduce 都建立在排序之上。批处理系统要能排序 PB 级数据,单机放不下,因此必须使用“输入与输出都分片”的分布式排序算法,这就是 混洗(shuffle)

混洗不是随机

“shuffle” 容易引发误解。洗牌会得到随机顺序;而这里的 shuffle 产出的是排序后的确定顺序,不含随机性。

混洗是批处理系统的基础算法,连接与聚合都依赖它。MapReduce、Spark、Flink、Daft、Dataflow、BigQuery 24 都实现了高可伸缩且高性能的混洗机制以处理大数据集。这里用 Hadoop MapReduce 的混洗实现做说明 25,但核心思想在其他系统同样适用。

图 11-1 展示了一个 MapReduce 作业的数据流。假设输入已分片,标记为 m1m2m3。例如每个分片可以是 HDFS 中一个文件,或对象存储中的一个对象;同一数据集的所有分片可以放在同一 HDFS 目录,或使用同一对象前缀。

图 11-1. 一个包含三个 mapper 和三个 reducer 的 MapReduce 作业。

框架会为每个输入分片启动一个 map 任务。任务读取分配到的文件,并逐条记录调用 mapper 回调。reduce 侧也会分片。map 任务数由输入分片数决定;reduce 任务数由作业作者配置(可与 map 数不同)。

mapper 输出是键值对。框架需要保证:若不同 mapper 输出了同一个键,这些键值对最终必须由同一个 reducer 处理。为此,每个 mapper 会在本地磁盘为每个 reducer 维护一个输出文件(例如图 11-1中的 m1,r2:由 mapper1 生成,目标是 reducer2)。mapper 每输出一条键值对,通常会按键的哈希决定写入哪个 reducer 文件(类似“按键哈希分片”)。

mapper 写这些文件的同时,也会在每个文件内部按键排序。可用的正是“日志结构存储”中的技术:先在内存有序结构里积累一批键值对,写成有序段文件,再把小段逐步合并成大段。

每个 mapper 完成后,reducer 会连接到 mapper,把属于自己的有序文件拷贝到本地磁盘。reducer 拿到所有 mapper 的对应分片后,再用归并排序方式合并它们并保持有序。同键记录即便来自不同 mapper,也会在合并后相邻。随后 reducer 以“每个键一次调用”的方式执行,每次拿到一个可迭代器,遍历该键所有值。

reducer 输出记录会顺序写入文件,每个 reduce 任务一个文件。图 11-1中的 r1r2r3 就是输出数据集的分片,最终写回 DFS 或对象存储。

MapReduce 在 map 与 reduce 之间执行混洗;现代数据流引擎和云数据仓库则更复杂。BigQuery 等系统已优化混洗,使数据尽量留在内存,并写入外部排序服务 24,以提升速度并通过复制增强韧性。

JOIN 与 GROUP BY

下面看“有序数据”如何简化分布式连接与聚合。为便于说明仍以 MapReduce 为例,但概念适用于大多数批处理系统。

批处理里常见连接场景见图 11-2。左边是用户活动日志(activity eventsclickstream data),右边是用户数据库。它可以看作星型模型的一部分(见“星型与雪花型:分析模式”):活动日志是事实表,用户库是维度表之一。

图 11-2. 用户活动日志与用户画像数据库的连接。

如果你要做“结合用户库信息的活动分析”(例如利用用户出生日期字段,判断哪些页面更受年轻或年长用户欢迎),就需要连接这两张表。若两边都大到必须分片,怎么做?

可利用 MapReduce 的关键特性:混洗会把同键键值对汇聚到同一个 reducer,无论它们最初在哪个分片。这里用户 ID 就可以作为键。因此可写一个 mapper 扫活动日志,输出“按用户 ID 键控的页面访问 URL”(见图 11-3);再写一个 mapper 按行扫描用户表,提取“用户 ID 作为键、出生日期作为值”。

图 11-3. 基于用户 ID 的排序合并连接。若输入数据集由多个文件分片组成,可并行启动多个 mapper 处理。

混洗保证 reducer 能同时拿到某用户的出生日期和该用户全部页面访问事件。MapReduce 甚至可以把记录进一步排成 reducer 先看到用户记录、再按时间戳看到活动事件,这称为 二次排序(secondary sort) 25

于是 reducer 很容易实现连接逻辑:先拿到出生日期并存入局部变量,再遍历同一用户 ID 的活动事件,输出“被访问 URL + 访问者出生日期”。因为 reducer 一次处理一个用户的全部记录,所以内存里只要保留一条用户记录,也无需发任何网络请求。这个算法称为 排序合并连接(sort-merge join):mapper 输出先按键排序,reducer 再把连接两侧有序记录合并。

工作流中的下一个 MapReduce 作业就可以继续计算“每个 URL 的访问者年龄分布”:先按 URL 做一次混洗,再在 reducer 中遍历同 URL 的所有访问记录(含出生日期),按年龄段维护计数并逐条累加,从而实现 group by 与聚合。

查询语言

这些年分布式批处理执行引擎不断成熟。如今在上万台机器的集群上存储并处理数 PB 数据,基础设施已足够稳健。随着“如何在这规模下把系统跑起来”基本被解决,重点开始转向编程模型的可用性。

MapReduce、数据流引擎、云数据仓库都把 SQL 作为批处理“通用语”。这很自然:传统数据仓库本就用 SQL,数据分析/ETL 工具都支持 SQL,几乎所有开发者和分析师也都熟悉 SQL。

相比手写 MapReduce,查询语言接口不仅代码更少,还支持交互式使用:可在终端或 GUI 里写分析 SQL 并直接执行。这种交互式查询对于业务分析、产品、销售、财务等角色探索数据非常高效。虽然它不完全是“经典批处理”形态,但 SQL 让探索式查询也能在分布式批处理系统中高效完成。

高级查询语言不只提升人的生产力,也提高机器执行效率。正如“云数据仓库”所述,查询引擎要把 SQL 转成在集群里执行的批处理作业。这个从查询到语法树再到物理算子的转换过程,让引擎有机会做优化。Hive、Trino、Spark、Flink 等查询引擎都具备代价优化器:它们可分析连接输入特征,自动选择更合适的连接算法,甚至重排连接顺序以减少中间状态 19 26 27 28

SQL 是最流行的通用批处理语言,但在一些细分场景中仍有其他语言。Apache Pig 提供了基于关系算子的逐步式数据流水线描述方式,而非“一个超大 SQL 查询”。DataFrame(下一节)有相似特征,Morel 则是受 Pig 影响的更现代语言。还有用户采用 jq、JMESPath、JsonPath 等 JSON 查询语言。

“图状数据模型”中,我们讨论了图建模与图查询语言如何遍历边和顶点。许多图处理框架也支持通过查询语言做批计算,例如 Apache TinkerPop 的 Gremlin。我们会在“批处理用例”继续看图处理场景。

批处理与云数据仓库正在收敛

历史上,数据仓库运行在专用硬件设备上,主要提供关系数据的 SQL 分析查询;而 MapReduce 等批处理框架强调更高可伸缩性与更高灵活性,允许使用通用编程语言写处理逻辑,并读写任意数据格式。

随着发展,两者越来越像。现代批处理框架已经支持 SQL,并借助 Parquet 等列式格式和优化执行引擎(见“查询执行:编译与向量化”)在关系查询上获得良好性能。与此同时,数据仓库通过云化(见“云数据仓库”)获得更强可伸缩能力,并实现了许多与分布式批处理框架相同的调度、容错和混洗技术,很多也使用分布式文件系统。

正如批处理系统采纳 SQL,云仓库也在采纳 DataFrame 等替代处理模型(下一节)。例如 BigQuery 提供 BigQuery DataFrames,Snowflake 的 Snowpark 能与 Pandas 集成。Airflow、Prefect、Dagster 等批处理工作流编排器也已广泛集成云仓库。

当然,并非所有批任务都容易用 SQL 表达。PageRank 等迭代图算法、复杂机器学习任务都很难用 SQL 写。涉及图像、视频、音频等非关系多模态数据的 AI 处理同样如此。

此外,云数据仓库在某些负载上并不理想。行级逐条计算与列式存储不匹配,效率较低,此时更适合使用仓库的其他 API 或批处理系统。云仓库通常也比其他批处理系统更贵,某些大作业放到 Spark/Flink 等系统可能更具成本优势。

因此,“用批处理系统还是数据仓库”最终要看成本、便利性、实现复杂度、可用性等综合因素。大型企业往往并存多套系统以保留选择空间;小公司通常一套系统也能跑起来。

DataFrames

随着数据科学家和统计学家开始用分布式批处理框架做机器学习,他们发现原有处理模型不够顺手,因为他们更习惯 R 与 Pandas 里的 DataFrame 数据模型(见“DataFrame、矩阵与数组”)。DataFrame 与关系库里的表很像:由多行组成,同一列值类型一致。它不是写一个超大 SQL,而是通过调用对应关系算子的函数来做过滤、连接、排序、分组等操作。

早期 DataFrame 操作大多在本地内存执行,因此只能处理单机装得下的数据集。数据科学家希望在批处理环境中,仍用熟悉的 DataFrame API 处理大数据。Spark、Flink、Daft 等分布式框架都因此提供了 DataFrame API。需要注意的是,本地 DataFrame 通常带索引且有顺序,而分布式 DataFrame 往往没有 29,迁移时可能出现性能“意外”。

DataFrame API 看起来和数据流 API 相似,但实现方式差别不小。Pandas 调用方法后通常立刻执行;Spark 则会先把 DataFrame API 调用翻译为查询计划,做查询优化后,再在分布式数据流引擎上执行,从而获得更好性能。

Daft 等框架甚至同时支持客户端与服务端计算:小规模内存操作在客户端执行,大数据与重计算在服务端执行。Apache Arrow 等列式格式提供统一数据模型,可被两侧执行引擎共享。

批处理用例

了解了批处理如何工作后,我们来看它在不同应用中的落地。批处理非常适合“海量数据的批量计算”,但不适合低延迟场景。因此,只要数据多且新鲜度要求不高,几乎都能看到批处理的身影。这听起来像限制,但现实里大量工作都符合这个模型:

  • 会计对账与库存核对:企业定期验证交易、银行账户与库存是否一致,常由批处理完成 30
  • 制造业需求预测:通常以周期性批任务计算 31
  • 电商、媒体、社交平台推荐模型训练:大量依赖批处理 32 33
  • 许多金融系统也是批处理驱动。例如美国银行网络几乎完全基于批任务运行 34

下面分别讨论几个几乎所有行业都常见的批处理用例。

提取-转换-加载(ETL)

“数据仓库”介绍了 ETL/ELT:从生产数据库抽取数据、进行转换,再加载到下游系统。本节用“ETL”统称这两类负载。尤其当下游是数据仓库时,ETL 常由批处理作业承载。

批处理天然并行,非常适合数据转换。很多转换任务都是“令人尴尬地并行”:过滤、字段投影及大量常见仓库转换都可并行完成。

批处理环境通常自带成熟工作流调度器,便于安排、编排和调试 ETL 流水线。发生故障时,调度器常会自动重试以覆盖瞬时问题;若持续失败,则明确标记失败,便于工程师快速定位流水线中断点。像 Airflow 还内置大量 source/sink/query 算子,可直接对接 MySQL、PostgreSQL、Snowflake、Spark、Flink 等数十种系统。调度器与数据处理系统的紧密集成显著简化了数据集成。

我们也看到,批处理在“出错后排障与修复”方面很友好,这对调试数据流水线极其关键。失败文件可直接检查,ETL 作业可修复后重跑。比如输入文件不再包含某个转换逻辑依赖字段,数据工程师就能据此更新转换逻辑或修复上游生产作业。

过去数据流水线往往由单一数据工程团队集中维护,因为让产品团队自行编写和维护复杂批流水线不太现实。近年随着处理模型和元数据管理改进,组织内更多团队都能参与并维护自己的流水线。data mesh 35 36data contract 37data fabric 38 等实践,正通过规范和工具帮助团队安全发布可被全组织消费的数据。

如今数据流水线与分析查询不仅共享处理模型,也常共享执行引擎。很多 ETL 作业与消费其输出的分析查询都运行在同一系统里:例如同样以 SparkSQL、Trino 或 DuckDB 查询执行。这样的架构进一步模糊了应用工程、数据工程、分析工程与业务分析之间的界限。

分析(Analytics)

“操作型系统与分析型系统”中我们看到,分析查询(OLAP)通常要扫描大量记录并做分组聚合。这类负载可以与其他批任务一起运行在批处理系统中。分析人员写 SQL,经查询引擎执行,读写底层 DFS 或对象存储。表到文件映射、名称、类型等表元数据通常由 Apache Iceberg 等表格式与 Unity 等 catalog 管理(见“云数据仓库”)。这种架构称为 数据湖仓(data lakehouse) 39

与 ETL 类似,SQL 接口改进让很多组织用 Spark 等批框架直接承载分析。常见模式有两类:

  • 预聚合查询:先把数据滚动聚合为 OLAP 立方体或数据集市,以提升查询速度(见“物化视图与数据立方”)。预聚合结果可在仓库查询,或推送到 Apache Druid、Apache Pinot 这类实时 OLAP 系统。预聚合通常按固定周期运行,通常由“工作流调度”中提到的调度器管理。
  • 临时查询(ad hoc):用户为回答具体业务问题、分析用户行为、排查运行问题等随时发起。该场景非常看重响应时间,分析师通常会根据每次结果继续迭代提问。执行快的批处理查询引擎可显著缩短等待。

SQL 支持还让批处理系统更易接入电子表格与可视化工具,如 Tableau、Power BI、Looker、Apache Superset。比如 Tableau 有 SparkSQL、Presto 连接器;Superset 支持 Trino、Hive、Spark SQL、Presto 等大量最终会触发批任务的数据系统。

机器学习

机器学习(ML)高度依赖批处理。数据科学家、ML 工程师、AI 工程师会用批处理框架探索数据模式、做数据转换、训练模型。常见用途包括:

  • 特征工程:把原始数据过滤并转换为可训练数据。预测模型往往要求数值特征,因此文本或离散值等数据需要先转成目标格式。
  • 模型训练:训练数据是批过程输入,训练后模型权重是输出。
  • 批量推理:当数据集很大且不要求实时结果时,可对整批数据做预测,也包括在测试集上评估模型预测效果。

很多框架为这些场景提供了专用工具。例如 Spark 的 MLlib、Flink 的 FlinkML 都内置丰富的特征工程工具、统计函数与分类器。

推荐系统和排序系统等 ML 应用也大量使用图处理(见“图状数据模型”)。许多图算法表达为“沿边逐步传播信息并反复迭代”:把一个顶点与相邻顶点连接,传递某些信息,重复直到满足停止条件,例如无边可继续,或某个指标收敛。

批同步并行(bulk synchronous parallel, BSP) 计算模型 40 已成为批图计算常用模型。Apache Giraph 20、Spark GraphX、Flink Gelly 41 等都实现了它。它也常被称为 Pregel 模型,因为 Google 的 Pregel 论文让这一方法广为人知 42

批处理同样是大语言模型(LLM)数据准备与训练的重要组成部分。网页等原始文本通常存放在 DFS 或对象存储中,必须先预处理才能用于训练。适合批处理框架的预处理步骤包括:

  • 从 HTML 中提取纯文本,并修复损坏文本;
  • 检测并清理低质量、无关或重复文档;
  • 对文本做分词并转换为嵌入向量(词或片段的数值表示)。

Kubeflow、Flyte、Ray 等框架就专为这类负载构建。以 OpenAI 为例,ChatGPT 训练流程中就使用了 Ray 43。这些框架通常内置与 PyTorch、TensorFlow、XGBoost 等 LLM/AI 库的集成,并支持特征工程、模型训练、批量推理、微调等能力。

最后,数据科学家常在 Jupyter、Hex 等交互式 Notebook 中实验数据。Notebook 由多个 cell 组成,每个 cell 是一小段 Markdown、Python 或 SQL;按顺序执行可得到表格、图表或数据结果。很多 Notebook 背后通过 DataFrame API 或 SQL 调用批处理系统。

对外提供派生数据

批处理常用于构建预计算/派生数据集,如商品推荐、面向用户的报表、机器学习特征等。这些数据通常由生产数据库、键值存储或搜索引擎对外服务。不论目标系统是什么,都需要把批处理环境中的 DFS/对象存储输出,回灌到线上服务数据库。

最直观的做法是:在批作业里直接使用数据库客户端库,一条条写生产数据库(假设防火墙允许)。这虽然能工作,但通常不是好主意,原因有三:

  • 每条记录一次网络请求,比批任务正常吞吐低几个数量级。即便客户端支持批写,性能通常也不理想。
  • 批处理框架常并行跑很多任务。若所有任务同时以批处理速率写同一数据库,很容易把数据库压垮,进而影响其在线查询性能,引发系统其他部分故障 44
  • 批作业通常提供清晰的“全有或全无”输出语义:作业成功时,结果等价于每个任务恰好执行一次;作业失败时,无有效输出。但如果在作业内直接写外部系统,就产生了外部可见副作用,难以隐藏:部分完成结果可能被其他系统看到,任务失败重启还可能造成重复写。

更好的方案是把预计算结果先推送到 Kafka 这类流系统(我们会在第十二章深入讨论)。Elasticsearch、Apache Pinot、Apache Druid、Venice 这类派生数据存储 45,以及 ClickHouse 等云数仓,都支持从 Kafka 摄入数据。通过流系统过渡可以改善前述问题:

  • 流系统针对顺序写优化,更适合批作业的大吞吐写入模式;
  • 流系统可在批作业与生产库间充当缓冲层,下游可按自身能力限速读取,避免影响线上流量;
  • 一个批作业输出可被多个下游系统同时消费;
  • 流系统还可作为批处理网络与生产网络之间的安全边界(可部署在 DMZ)。

但“经由流”并不会自动解决“全有或全无”语义。要实现这一点,批作业需要在完成后向下游发出“作业完成,可对外可见”的通知。流消费者需要像 读已提交(read committed) 事务那样,在收到完成通知前让新数据对查询不可见(见“读已提交”)。

另一种在数据库冷启动(bootstrap)时更常见的模式,是在批作业内直接构建一个全新数据库,再把文件从 DFS、对象存储或本地文件系统批量导入目标数据库。很多系统都提供这类批量导入工具,如 TiDB Lightning、Apache Pinot/Apache Druid 的 Hadoop 导入作业,RocksDB 也提供从批作业批量导入 SST 的 API。

“批构建 + 批导入”速度非常快,也更容易在不同数据版本间做原子切换。但对于需要持续增量更新的场景,这种“每次构建全新库”的方式会更难。很多系统采用混合策略,同时支持冷启动与增量加载。比如 Venice 就支持混合存储,可同时做基于行的批更新和全量数据集切换。

本章小结

本章讨论了批处理系统的设计与实现。我们先从经典 Unix 工具链(awk、sort、uniq 等)出发,说明了批处理的基础原语,例如排序和计数。

然后我们把视角扩展到分布式批处理系统。批处理以“不可变、有限(bounded)的输入数据集”为对象,生成输出数据,这使得重跑和调试可以不引入副作用。围绕这一模式,批处理框架通常包含三层核心能力:决定作业何时何地运行的编排层,负责持久化数据的存储层,以及执行实际计算的计算层。

我们看了分布式文件系统和对象存储如何通过分块复制、缓存和元数据服务管理大文件,也讨论了现代批处理框架如何通过可插拔 API 与这些存储交互。我们还讨论了编排器在大集群中如何调度任务、分配资源和处理故障,以及“按作业调度”的编排器与“按依赖图管理整组作业生命周期”的工作流编排器之间的区别。

在处理模型方面,我们回顾了 MapReduce 及其经典 map/reduce 函数,又介绍了 Spark、Flink 等更易用且性能更好的数据流引擎。为了理解批作业如何扩展到大规模,我们重点讲了混洗(shuffle)算法,它是实现分组、连接、聚合的基础操作。

随着批处理系统成熟,焦点转向可用性。高级查询语言(尤其 SQL)和 DataFrame API 让批处理作业更易编写,也更容易被优化器优化。查询优化器把声明式查询转换为高效执行计划。

最后我们回顾了批处理常见用例:

  • ETL 流水线:通过定时工作流在不同系统间提取、转换、加载数据;
  • 分析:既支持预聚合报表,也支持临时探索查询;
  • 机器学习:用于准备与处理大规模训练数据;
  • 把批处理输出灌入面向生产流量的系统:常通过流系统或批量导入工具,把派生数据提供给用户。

下一章我们将转向流处理。与批处理不同,流处理输入是 无界(unbounded) 的:作业仍在,但输入是持续不断的数据流,因此作业不会“完成”。我们会看到,流处理与批处理在一些方面很相似,但“输入无界”这一前提也会显著改变系统设计。

参考文献


  1. Nathan Marz. How to Beat the CAP Theorem. nathanmarz.com, October 2011. Archived at perma.cc/4BS9-R9A4 ↩︎

  2. Molly Bartlett Dishman and Martin Fowler. Agile Architecture. At O’Reilly Software Architecture Conference, March 2015. ↩︎

  3. Jeffrey Dean and Sanjay Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. At 6th USENIX Symposium on Operating System Design and Implementation (OSDI), December 2004. ↩︎ ↩︎

  4. Shivnath Babu and Herodotos Herodotou. Massively Parallel Databases and MapReduce Systems. Foundations and Trends in Databases, volume 5, issue 1, pages 1–104, November 2013. doi:10.1561/1900000036 ↩︎

  5. David J. DeWitt and Michael Stonebraker. MapReduce: A Major Step Backwards. Originally published at databasecolumn.vertica.com, January 2008. Archived at perma.cc/U8PA-K48V ↩︎

  6. Henry Robinson. The Elephant Was a Trojan Horse: On the Death of Map-Reduce at Google. the-paper-trail.org, June 2014. Archived at perma.cc/9FEM-X787 ↩︎

  7. Urs Hölzle. R.I.P. MapReduce. After having served us well since 2003, today we removed the remaining internal codebase for good. twitter.com, September 2019. Archived at perma.cc/B34T-LLY7 ↩︎

  8. Adam Drake. Command-Line Tools Can Be 235x Faster than Your Hadoop Cluster. aadrake.com, January 2014. Archived at perma.cc/87SP-ZMCY ↩︎

  9. sort: Sort text files. GNU Coreutils 9.7 Documentation, Free Software Foundation, Inc., 2025. ↩︎

  10. Michael Ovsiannikov, Silvius Rus, Damian Reeves, Paul Sutter, Sriram Rao, and Jim Kelly. The Quantcast File System. Proceedings of the VLDB Endowment, volume 6, issue 11, pages 1092–1101, August 2013. doi:10.14778/2536222.2536234 ↩︎

  11. Andrew Wang, Zhe Zhang, Kai Zheng, Uma Maheswara G., and Vinayakumar B. Introduction to HDFS Erasure Coding in Apache Hadoop. blog.cloudera.com, September 2015. Archived at archive.org ↩︎

  12. Andy Warfield. Building and operating a pretty big storage system called S3. allthingsdistributed.com, July 2023. Archived at perma.cc/7LPK-TP7V ↩︎

  13. Vinod Kumar Vavilapalli, Arun C. Murthy, Chris Douglas, Sharad Agarwal, Mahadev Konar, Robert Evans, Thomas Graves, Jason Lowe, Hitesh Shah, Siddharth Seth, Bikas Saha, Carlo Curino, Owen O’Malley, Sanjay Radia, Benjamin Reed, and Eric Baldeschwieler. Apache Hadoop YARN: Yet Another Resource Negotiator. At 4th Annual Symposium on Cloud Computing (SoCC), October 2013. doi:10.1145/2523616.2523633 ↩︎

  14. Richard M. Karp. Reducibility Among Combinatorial Problems. Complexity of Computer Computations. The IBM Research Symposia Series. Springer, 1972. doi:10.1007/978-1-4684-2001-2_9 ↩︎

  15. J. D. Ullman. NP-Complete Scheduling Problems. Journal of Computer and System Sciences, volume 10, issue 3, June 1975. doi:10.1016/S0022-0000(75)80008-0 ↩︎

  16. Gilad David Maayan. The complete guide to spot instances on AWS, Azure and GCP. datacenterdynamics.com, March 2021. Archived at archive.org ↩︎

  17. Abhishek Verma, Luis Pedrosa, Madhukar Korupolu, David Oppenheimer, Eric Tune, and John Wilkes. Large-Scale Cluster Management at Google with Borg. At 10th European Conference on Computer Systems (EuroSys), April 2015. doi:10.1145/2741948.2741964 ↩︎

  18. Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, and Ion Stoica. Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. At 9th USENIX Symposium on Networked Systems Design and Implementation (NSDI), April 2012. ↩︎ ↩︎

  19. Paris Carbone, Stephan Ewen, Seif Haridi, Asterios Katsifodimos, Volker Markl, and Kostas Tzoumas. Apache Flink™: Stream and Batch Processing in a Single Engine. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, volume 38, issue 4, December 2015. Archived at perma.cc/G3N3-BKX5 ↩︎ ↩︎ ↩︎

  20. Mark Grover, Ted Malaska, Jonathan Seidman, and Gwen Shapira. Hadoop Application Architectures. O’Reilly Media, 2015. ISBN: 978-1-491-90004-8 ↩︎ ↩︎

  21. Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee. Learning Spark, 2nd Edition. O’Reilly Media, 2020. ISBN: 978-1492050049 ↩︎

  22. Michael Isard, Mihai Budiu, Yuan Yu, Andrew Birrell, and Dennis Fetterly. Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks. At 2nd European Conference on Computer Systems (EuroSys), March 2007. doi:10.1145/1272996.1273005 ↩︎

  23. Daniel Warneke and Odej Kao. Nephele: Efficient Parallel Data Processing in the Cloud. At 2nd Workshop on Many-Task Computing on Grids and Supercomputers (MTAGS), November 2009. doi:10.1145/1646468.1646476 ↩︎

  24. Hossein Ahmadi. In-memory query execution in Google BigQuery. cloud.google.com, August 2016. Archived at perma.cc/DGG2-FL9W ↩︎ ↩︎

  25. Tom White. Hadoop: The Definitive Guide, 4th edition. O’Reilly Media, 2015. ISBN: 978-1-491-90163-2 ↩︎ ↩︎

  26. Fabian Hüske. Peeking into Apache Flink’s Engine Room. flink.apache.org, March 2015. Archived at perma.cc/44BW-ALJX ↩︎

  27. Mostafa Mokhtar. Hive 0.14 Cost Based Optimizer (CBO) Technical Overview. hortonworks.com, March 2015. Archived on archive.org ↩︎

  28. Michael Armbrust, Reynold S. Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K. Bradley, Xiangrui Meng, Tomer Kaftan, Michael J. Franklin, Ali Ghodsi, and Matei Zaharia. Spark SQL: Relational Data Processing in Spark. At ACM International Conference on Management of Data (SIGMOD), June 2015. doi:10.1145/2723372.2742797 ↩︎

  29. Kaya Kupferschmidt. Spark vs Pandas, part 2 – Spark. towardsdatascience.com, October 2020. Archived at perma.cc/5BRK-G4N5 ↩︎

  30. Ammar Chalifah. Tracking payments at scale. bolt.eu.com, June 2025. Archived at perma.cc/Q4KX-8K3J ↩︎

  31. Nafi Ahmet Turgut, Hamza Akyıldız, Hasan Burak Yel, Mehmet İkbal Özmen, Mutlu Polatcan, Pinar Baki, and Esra Kayabali. Demand forecasting at Getir built with Amazon Forecast. aws.amazon.com.com, May 2023. Archived at perma.cc/H3H6-GNL7 ↩︎

  32. Jason (Siyu) Zhu. Enhancing homepage feed relevance by harnessing the power of large corpus sparse ID embeddings. linkedin.com, August 2023. Archived at archive.org ↩︎

  33. Avery Ching, Sital Kedia, and Shuojie Wang. Apache Spark @Scale: A 60 TB+ production use case. engineering.fb.com, August 2016. Archived at perma.cc/F7R5-YFAV ↩︎

  34. Edward Kim. How ACH works: A developer perspective — Part 1. engineering.gusto.com, April 2014. Archived at perma.cc/F67P-VBLK ↩︎

  35. Zhamak Dehghani. How to Move Beyond a Monolithic Data Lake to a Distributed Data Mesh. martinfowler.com, May 2019. Archived at perma.cc/LN2L-L4VC ↩︎

  36. Chris Riccomini. What the Heck is a Data Mesh?! cnr.sh, June 2021. Archived at perma.cc/NEJ2-BAX3 ↩︎

  37. Chad Sanderson, Mark Freeman, B. E. Schmidt. Data Contracts. O’Reilly Media, 2025. ISBN: 9781098157623 ↩︎

  38. Daniel Abadi. Data Fabric vs. Data Mesh: What’s the Difference? starburst.io, November 2021. Archived at perma.cc/RSK3-HXDK ↩︎

  39. Michael Armbrust, Ali Ghodsi, Reynold Xin, and Matei Zaharia. Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics. At 11th Annual Conference on Innovative Data Systems Research (CIDR), January 2021. ↩︎

  40. Leslie G. Valiant. A Bridging Model for Parallel Computation. Communications of the ACM, volume 33, issue 8, pages 103–111, August 1990. doi:10.1145/79173.79181 ↩︎

  41. Stephan Ewen, Kostas Tzoumas, Moritz Kaufmann, and Volker Markl. Spinning Fast Iterative Data Flows. Proceedings of the VLDB Endowment, volume 5, issue 11, pages 1268-1279, July 2012. doi:10.14778/2350229.2350245 ↩︎

  42. Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, James C. Dehnert, Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. Pregel: A System for Large-Scale Graph Processing. At ACM International Conference on Management of Data (SIGMOD), June 2010. doi:10.1145/1807167.1807184 ↩︎

  43. Richard MacManus. OpenAI Chats about Scaling LLMs at Anyscale’s Ray Summit. thenewstack.io, September 2023. Archived at perma.cc/YJD6-KUXU ↩︎

  44. Jay Kreps. Why Local State is a Fundamental Primitive in Stream Processing. oreilly.com, July 2014. Archived at perma.cc/P8HU-R5LA ↩︎

  45. Félix GV. Open Sourcing Venice – LinkedIn’s Derived Data Platform. linkedin.com, September 2022. Archived at archive.org ↩︎

最后更新于