map-reduce:简化大集群数据处理

概要

MapReduce是程序模型和处理和生成大数据集的相关实现。用户指定map函数,该函数处理键值对以生成一组中间键值对和reduce函数,该函数合并了相同与同意中间键所有中间值的reduce函数。一些真实任务可以被显式建模,如论文所示。

这个风格下书写的程序是在通用机器的大集群上自动并形执行的。运行时系统关心输入数据分区的详情,调度程序在不同机器上的执行,处理机器异常,和机器之间的连接。这允许没有并行和分布式经验的程序员简单的在大分布式系统上利用资源。

我们的MapReduce实现运行了大规模的通用计算设备并且是可扩展的:典型的MapReduce在上千台机器上计算处理大量TB的数据。程序员发现系统易于使用:已经实施了上百个MapReduce程序,每天在谷歌的集群上执行1000多个MapReduce作业。

1介绍

在过去的五年里,作者和谷歌的其他人已经实现了上百种特殊用途的计算,他们处理大量的原始数据,比如爬虫爬到的文档,web请求日志等,为了计算来自不同类型的数据,比如倒排索引,web文档的图形结构的各种表示,每台主机抓取的页面的摘要,在给定一天中最频繁的查询集合等。大部分这样的计算概念上是简单的。然而,输入数据通常很大计算分布在成百上千个机器上以确保在合理的时间内完成他们。问题来自如何并行计算,分发数据,和处理失败共同导致了原来简单的计算变得模糊不清,需要用大量复杂的代码处理这些问题。

为了反应这些复杂性,我们设计了新的抽象,允许我们表达简单的计算,我们尝试执行但是隐藏并行化,错误忍受,数据分布和库中负载均衡中麻烦的细节。我们的抽象灵感来自于lisp中的map和reduce原语还有其他的实用的与语言。我们意识到我们大部分的计算涉及到对在每个我们输入逻辑"记录"进行映射操作以计算一组中间键值对,然后对每个值应用规约(reduce)操作让他们共享一个键,已实现组合衍生数据。我们在用户空间的映射中使用了函数模型与规约操作让我们更简单的并行化大计算,并使用再执行作为主要的容错机制。

这项工作的主要的贡献是简单并强化了接口,它使用了自动并行和分发大尺度的计算,并结合了这些接口的实现,它在大规模集群和通用pc上实现了高性能。

章节2描述了基础的执行设计模型并给了几个例子。章节3描述了map-reduce接口的实现基于我们基于集群的计算环境。章节4描述了几个精炼的程序模型,我们发现这很有用。章节5有我们的实现在多样化任务表现评估。章节6探索了map-reduce在Google中的使用,包括我们使用它作为经验的基础。

重写我们产品索引系统。章节7讨论了相关的未来工作。

2程序模型

计算接收一组键值对,并产生一组输出键值对。用户的MapReduce库用俩个方法体现了计算:map和reduce(映射和规约)。

映射是由用户编写的,接收输入对输出中间键值对,MapReduce库将相同中间键I相关的所有中间值组合在一起,然后传递给规约函数。

规约函数也是用户编写的,接收中间键I和一组键值。它将这些值合并到一起,形成一个更小的集合。通常每次reduce调用只产生零个或一个输出值。中间的迭代器提供给用户reduce函数。这允许我们处理内存无法容纳的值列表。

2.1例子

考虑一个单词数在每个一些大文档中出现统计问题。用户将书写简单的下列伪代码

map(String key, String value):
    // key: 文档名
    // value: 文档内容
    for each word w in value:
        EmitIntermediate(w, "1");
reduce(String key, Iterator values):
    // key: 单词
    // values: 计数列表
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

映射函数提交每个单词外加相关出现的数量(在这个简单的例子中只有1)reduce函数对所有针对特定单词发出的计数求和。

此外,用户编写的代码用文件的输入输出名词来填充MapReduce规范的对象,并操作调整参数。然后用户调用map reduce函数,通过它的特殊对象。用户代码和其他MapReduce库(实现自C++)连接在一起。附录A包括了这个例子的全部代码

2.2类型

尽管前面的伪代码是根据做分词输入和输出编写的,概念上映射和规约函数是根据用户有的类型提供的

map $\;\left( {\mathrm{k}1,\mathrm{v}1}\right)$ $\rightarrow$ list $\left( {\mathrm{k}2,\mathrm{v}2}\right)$

reduce $\;\left( {\mathrm{k}2,1\text{ist}\left( {\mathrm{v}2}\right) }\right) \; \rightarrow 1$ ist $\left( {\mathrm{v}2}\right)$

即输入的键和值来自与输出键值不同的域。此外,中间键和值与输出键和值来自同一域。

我们的C++实现在用户定义的函数之间传递字符串,并将其留给用户代码在字符串和适当类型之间进行转换。

2.3更多例子

这里是我们有趣程序的简单例子,它可以简单的表达MapReduce的计算

恒等函数:f(x)=x

反向web链接图是一种数据结构被用来索引和搜索引擎计数。它代表基于链接的网页之间的关系。在标准的web连接图中每个节点(web页面)指向它连接的其他节点。其中边代表超链接。这种结构对用来爬取网页和了解那些页面链接到其他页面非常有用。

然而反向web链接图,逆转了这种关系,它列出了指向他的所有节点(被指向)而不是自身所指向的节点这种技术尝被用来搜索引擎排名,rank算法。

  • 分发程序:如果匹配上了提供的模式,映射函数会提交一行。规约函数是一个恒等函数它只将提供的中间函数复制到输出。
  • URL访问频率计数:映射函数处理web页面请求的日志并输出 $\langle \mathrm{{URL}},1\rangle$ 。reduce函数加和所有的相同URL的值和 $\langle$ URL, 总数 $\rangle$ 对
  • 反向web链接图:映射函数对每个链接输出 $\langle$ 目标, 来源 $\rangle$ 对,使目标URL找到源头页面的名字。规约函数将给定目标URL关联的所有源URL列表连接起来,并发出一对: $\langle$ 目标, $\operatorname{list}\left( \text{来源}\right) \rangle$
  • 每个主机的术语向量:术语向量将文档或一组文档中出现的最重要的单词总结为⟨单词、频率⟩对的列表。map函数提交每个输入文档的(主机名,术语向量)对(其中主机名提取自文档的URL)。reduce函数传递给定主机的每个文档文档向量。
  • 倒排索引:映射函数解析每个文档,提交一系列的(单词,文档ID)对。规约函数接收所有的给定单词的对,排序对应的文档ID并且提交(单词,list(文档id))对。所有集合形成一个简单的倒排索引。
  • 分布式排序:map函数摘取了每个记录中的key,并提交(key,记录)对。规约函数发出所有对不变。这个计算取决于分隔设施在4.1章节,排序属性在4.2章中描述。

3 实现

NUMA代表 Non-Uniform Memory Access 即非统一内存访问

在NUMA系统中每个程序或者节点有自己的本地内存,每个处理器或节点都有自己的本地内存,它可以比附加到其他节点的内存更快地访问本地内存。一般来说有本地内存和远程内存,远程内存访问需要更多的时间。

NUMA系统使用了高速连接器(如QPI,UPI或者NUMA)链接不同的节点到一起,允许处理器传递和访问远程内存。

MapReduce接口有许多不同的实现方式。正确的选择依赖于环境。例如,其中一个实现可能适合小共享内存的机器,另一个适合NUMA的多处理器,还有一种适用于更大网络机器的集合。

本章描述了在谷歌广泛使用的计算环境的目标实现:大集群通用PC使用交换式以太网连接在一起。在我们的环境中:

  1. 机器是典型的双处理器x86架构运行Linux,使用2-4G内存
  2. 使用的是一般的网络硬件——通常在机器级别上是每秒 100 兆比特或每秒 1 吉比特的速度。
  3. 集群由成百上千个机器,因此机器的失败是很常见的。
  4. 存储是由直接连接到机器的廉价IDE磁盘提供。内部的分布式系统开发被用于管理数据在磁盘上的存储。文件系统在不可靠的硬件上使用复制已提供可用性与可靠性。
  5. 用户提交任务到调度系统中。每一个工作由一组任务构成,根据调度器进行映射到一组集群内可用的机器。

执行预览

通过自动将输入数据划分为一组M个分割,Map调用分布在多台机器上。通过使用分隔函数(即hash(key) mod R)将中间秘钥空间分割成R个部分,Reduce进行分布。分区的数量和分区函数是用户指定的。

图1执行预览

图1展示了我们实现中MapReduce操作的数据流动。当用户程序调用MapReduce函数,之后行动发生的顺序,图1中标签与下表中的数字相对应。

  1. 用户程序中MapReduce库受限切分了输入文件成16MB-60MB的M段(用户可以通过参数进行控制)。然后,它在一组机器上启动许多副本。
  2. 其中一个程序副本是特殊的-master。其他的机器被master指派工作。这里有M个map和R个reduce任务指派。master选择空闲的机器分配1个map任务或者1个reduce任务。
  3. 被分配到map任务的机器读取相应的输入拆分的内容。它解析输入数据的键值对输出,传递每个对到用户定义的map函数。map函数产生中间键值对在内存缓冲中。
  4. 缓冲键值对被定期写到本地磁盘中,通过分区函数划分为R区域。这些缓冲对在磁盘上的位置是被传递回master,master负责转发这些位置给reduce机器
  5. 当主通知reduce机器这些位置时,它使用远程过程调用(RPC)去读来自本map机器地磁盘的缓存数据。当reduce机器读取了所有中间数据时,根据中间键对它进行排序,从而将相同秘钥的所有出现分组在一起。排序是必要的,因为通常许多不同的键映射到同一个reduce任务。如果中间的数据量太大无法放入内存则使用外部排序。
  6. reduce迭代已经排序完毕的中间数据,并对遇到的每一个特定的中间键,它通过键和对应的一组中间值应用用户的Reduce函数。reduce函数的输出将附加到此reduce分区的最终输出文件中。
  7. 当所有的map任务和reduce任务完成,master唤醒用户程序,此时mapreduce调用返回给用户代码

在成功完成后,MapReduce执行输出在R输出文件中可用(每个reduce任务一个文件,文件名用户指定)。通常用户不需要结合R个输出文件,用户通常将这些文件输入传递给另一个MapReduce调用, 或者从另一个分布式应用中使用他们,这个应用通常能够处理有多个文件的输入。

3.2Master数据结构

master保存了多个数据结构。对于每个映射任务和规约任务,它存储了状态(空闲,执行中,执行完成),以及机器的身份。

主是管道通过它本地的中间文件从map任务传递到reduce任务。因此,对于每个完成的map任务,主存储了map任务生成的本地和R个中间文件的位置和大小。map任务完成后,更新这些地址和大小信息,这些信息被增量推送到正在处理reduce任务的工作节点。

3.3异常恢复

因为MapReduce库被设计来帮助使用成百上千台机器处理非常大规模的数据,库必须优雅的处理机器故障。

工作节点异常

主节点周期性的ping每一个工作节点。如果在特点时间没有收到工作节点的响应,主标记工作节点为故障。由工作者完成的任何映射任务都被重置回其初始空置状态,因此,因此有资格安排其他工作节点。同样的,任何在故障节点上处理过程中的map任务或者reduce任务,也被重置为空闲,并有资格被重新调度。

在故障机器上完成的map任务将会被重新执行,因为它的输出被存储在故障机器上的本地磁盘上,因此不能获取。完成的reduce任务不需要重新执行,因为他们的文件已经在全局文件系统中了。

MapReduce对大尺度的工作故障具有弹性。例如,在MapReduce的操作中,运行的机器上进行了网络维护,这导致了80个机器的一个组在几分钟内不能访问。MapReduce主只是重新执行了无法访问的工作机器所做的任务,并继续向前处理,最终完成MapReduce操作。

master异常

很容易使master定期写对上述master数据结构描述写检查点。如果master死亡,新的复制从最后一个检查点的状态开始。然而,鉴于只有一个master,如果它不太可能失败;如果master故障了,我们当前的实现终止了MapReduce的计算。如果想的话,用户可以检查条件并重试MapReduce操作。

出现故障情况下的语义

当用户提供了map和reduce操作是根据输入值决定的时候,我们分布式实现所产生的输出与整个无故障顺序执行下所产生的输出相同。

我们依赖于map和reduce任务输出的原子提交来实现这一特性。每个进程内任务输出到私有的临时文件中。reduce任务产生这样一个文件(每个reduce任务一个),map任务产生R个这样的文件。当map任务完成,工作节点发送信息到master其中包括了R个临时文件的名字在信息中。如果master接收到来自已经结束的map任务完成消息,它将忽视这个任务,否则,它在主节点中记录R文件的名称。

当reduce函数完成,reduce工作节点将临时输出文件原子化的重命名为最终输出文件。如果相同的reduce任务在多个机器上执行,将最终输出文件执行多个重命名调用。我们根据底层文件系统提供原子的重命名来保证操作,最终的文件系统只包含reduce任务被执行了一次产生的最终文件状态。

绝大数我们的map和reduce操作是确定性的,事实上,我们的语义等于连续执行,在这个例子中程序员很容易对程序的行为进行推理。当map 和reduce操作是非确定性的,我们提供较弱但是合理的语义。在存在非确定性的情况下特定reduce任务的输出${R}_{1}$等于输出由非确定性程序顺序执行产生的$R_1$。然而不同reduce任务$R_2$可能对应着$R_2$由非确定性的程序产生的。

考虑map任务M和reduce任务R1和R2,设$e\left( {R}_{i}\right)$成为$R_i$的执行提交(只有一次这样的执行)。若语义的出现是因为 $e\left( {R}_{1}\right)$可能读取了M的某次执行中产生的输出,而$e\left( {R}_{2}\right)$可能读取了由M的另一次不同执行产生的输出。

3.4局部性

在我们的计算环境中网络带宽是相对缺乏的资源。我们使用数据输入(由GFS管理)存储在集群的机器本地磁盘这一事实事实来节省网络带宽。GFS设备每个文件分为64MB的块,每个块存储几个副本(通常是三个)在不同的机器上。MapReduce主将考虑输入文件的位置信息,尝试在包含响应输入数据副本的机器上调度映射任务,如果不行的话,它尝试它会尝试在输入数据的副本附近安排映射任务(即,在与包含数据机器位于同一交换机的网络上)。当在工作集群中重要的部分运行大MapReduce操作,大部分输入数据读取本地并不占用网络带宽

3.5任务粒度

我们将map阶段分为M片并将reduce细分为R片,如上所述。理想情况,M和R应该是大于工作节点。让每个工作节点执行不同的任务提高动态的负载均衡,和当工作异常时的恢复速度:它完成的许多映射任务可以分散到各种机器上。

在我们的实现中,M和R是有实际限制的,因为主必须做出$O\left( {M + R}\right)$调度决策和保留$O\left( {M * R}\right)$ 个状态在内存中 就如上述所属一样。(内存使用中的常数因子分非常小,然而 $O\left( {M * R}\right)$ 个状态片段是由大约一个字节的map任务和reduce任务对数据组成的)。

更进一步,R通常受限于用户,因为每一个reduce任务的输入最终会出现在单独的输出文件中。事实上,我们倾向于选择M,这样每个单独的任务大约16MB-64MB的输入数据(因此上述的局部优化描述是有效的),我们让R是一个我们期望使用的工作节点的一个小倍数。我们通常在M=200000同时R=5000时执行MapReduce任务,使用2000个工作节点。

3.6备份任务

导致MapReduce总操作花费时间边长的原因是“落后者”:机器要花费特别长时间去完成计算中map或者reduce的最后几个过程。落后者的出现由很多原因。例如,由一个机器磁盘不好,可能需要频繁的纠错它的读性能从30MB/s降低到了1MB/s。集群调度系统可能调度了机器上其他的任务,导致了它执行MapReduce代码更慢,因为cpu,内存或者网络带宽。最近的问题我们遇到了机器初始化代码中的一个bug,导致处理器缓存被禁用了:受影响的机器上的计算速度下降了100多倍。

我们有监督机制去减轻这个掉队问题。当MapReduce操作接近完成时,master调度备份执行剩下的正在执行中的任务。当主要或者备份机器执行完成时任务被标记为完成。我们已经调整了这些机器,使它通常指增加操作所使用计算资源的几个百分点。例如,当备份任务被禁用时,5.3中描述的排序程序需要44%更长长的时间才能完成。

4精炼

虽然简单编写map和reduce函数提供了基础的功能,能够满足大部分需要,我们发现了有用的扩展。它们将在这节被描述。

4.1分区函数

MapReduce的用户指定他们想要的reduce任务和输出文件数量R。数据在这些任务之间进行分区,默认的分区函数使用hash(即hash(key) mod R)。 往往会产生相对均衡的分区。然而在一些例子中,通过键的其他函数对数据进行分区是很有用的。例如,有时输出的键是URL,我们休息完单个主机所有的条目最终都在相同的输出文件中,为了支持这种情况,MapReduce库的用户可以提供特殊的分区函数,比如使用Hash(主机名(urlKey)) mod R作为来自同一主机的所有URL分区函数最终让相同的输出文件中。

4.2顺序保证

我们确保在给定的分区中,中间键值对是按递增顺序处理。这种排序保证使得为每个分区生成已排序的输出文件变得容易,这对于输出文件格式需要支持通过键进行高效的随机访问查找,或是输出使用者发现排序数据便于使用的情况非常有用。

4.3组合器函数

Zipf 分布是一种离散概率分布,常用来描述自然语言中词频的分布规律以及其他许多现实世界的现象。

在一些例子中,每个map任务的中间键产生中存在显著的重复,并且用户指定的reduce函数是可交换和关联的。一个好的例子是我们在章节2.1中提到的单词计数。因为词频遵循与Zipf分布,每个map任务将产生成百上千个任务。所有的计数将被用网络发送到一个统计任务上,根据reduce函数然后相加求和产生一个数。我们通过用户指定的可选的组合器函数,这个函数在数据发送到网络之前对其进行部分归并。

组合函数在每台执行map的机器上运行。通常是使用相同的代码来实现这个组合器和reduce函数。在reduce和组合函数之间唯一的不同是MapReduce库如何处理输出函数。reduce的函数的输出被写为最终输出文件。组合函数的输出的中间文件将被发送到reduce任务。

不分的组合显著加块了某些类型的MapReduce操作的速度。附录包含了使用的组合例子。

4.4输入和输出类型

MapReduce函数支持以几种不同的格式读取数据。例如,text模式输入将每一行作为一个键值对:key是文件的偏移量value是包含的行。另一个常见的支持格式存储按键排序排序的键值对。每个输入类型的实现知道如何将自身划为有意义的范围,以便于处理单独的map任务(即 文本的范围分割保证了它范围分割只发生在行边界处)。用户可以提供简单的读接口的视线以添加对心的输入类型的支持,虽然大部分用户只使用依稀哦啊哦部分的预定义输入类型之一。

读取器不一定需要从文件读取数据。例如,很容易定义一个在数据库或者其他数据结构在内存中映射的读取器。

类似的方式,我们支持了一组输出类型以产生不同格式数据,它简化了用户编写支持新的输出类型的代码。

4.5副作用

在一些例子中,MapReduce用户发现,从他们map和reduce操作符生成服主文件作为附加输出是很方便的。我们依靠编写的应用程序使这些副作用原子化和幂等。通常应用输出到临时文件,并且一旦这个文件生成,原子化的重命名这个文件。

我们不提供单个任务产出的多个文件的原子二阶段提交。因此,产生具有跨文件一致性要求的多个输出文件的任务应该是确定的。这个限制在实践中从没有出现问题

4.6跳过问题记录

一些时候用户代码中的bug会导致map或者reduce函数崩溃在特定的记录上。这样的问题阻止了MapReduce操作。通常的做法是修复这些bug,但是一些时候它是不可能的。这些可能的bug可能在不能获取到源代码的第三方库中。同样,一些时候忽略一些小的记录它是可以接受的,例如当做大数据集统计分析时。我们提供执行的操作模型,其中MapReduce库发现记录导致了确定性的崩溃,并跳过这个记录以实现继续运行程序。

每个工作进程安装一个信号处理器,用于捕获分段错误(segmentation violations)和总线错误。在执行用户的map或者reduce操作之前,MapReduce将参数的序列号存储在一个全局变量中。如果用户代码生成了信号,信号处理发送一个“最后喘息”UDP数据包到MapReduce主节点。当主节点在一个特定的记录中看到了不止一次的失败时,就表明了在下一次重新执行对应的map或者reduce任务时这个记录应该被跳过。

4.7本地执行

在MapReduce中尽显debug是麻烦的,因为真实的计算发生在分布式的系统中,通常是几千条台机器,工作分派决策是master动态进行的。为了更容易debug,分析,和小尺度的测试,我们开发了可供替代的MapReduce库实现,它按顺序执行所有MapReduce上的操作在本地的机器上。以便为用户计算有限的特定map任务提供了控制。用户使用特定的标记执行他们的程序,然后可以轻松的使用一些debug或者测试工具(如gdb)

4.8状态信息

主运行了内部HTTP服务并导出了一组可以被消费的状态页。状态页展示了计算的进度,比如多少个任务被完成了,多少还在进行中,输入的字节,中间数据的字节,输出的字节,执行率等。这个页页包含了每个任务生成的标注错误和标准输出文件的链接。用户可以使用这个数据以预测计算将花费多少时间,以及是否需要添加更多的计算资源。这些页页可以用来被判断计算是不是远低于预期。

此外,顶级状态页展示哪些工作节点失败了,以及这些map和reduce任务在失败时处理哪些任务。当尝试诊断用户代码中的bug时。

4.9计数器

MapReduce库提供了计数器以计算不同事件的发生。例如,用户代码可能想要计算单词的总数或者德语文档的索引数量。

要使用这个设施,用户代码创建名叫counter的对象,然后适当的增加计数器在map或者reduce函数中,例如

Counter* uppercase;
uppercase = GetCounter("uppercase");

map(String name, String contents):
    for each word w in contents:
        if (IsCapitalized(w)):
            uppercase->Increment();
        EmitIntermediate(w, "1");

来自单独工作机器的计数值是定期传播到master(借住ping进行响应)。当MapReduce操作结束时,master从成功执行的map函数和reduce函数中聚合这些值并返回他们到用户代码。当前计数器值也展示了master的状态页上,以便人们可以实时观看计算进度。在聚合计数器值,master消除了重复执行相同map或reduce任务的影响,以避免了多次计算(重复执行可能来自我们使用的备份任务和由于失败而重新执行任务)

一些计数器值使用MapReduce库自动保持,例如,处理输入键值对的数量和生成的键值对数量。

用户发现计数器功能对于检查MapReduce操作行为时非常有用。例如,再一些MapReduce操作中,用户代码可能想要确保产生的输出对的数量恰好等于处理输入对的数量,或者德语文件的处理比例在文件总数的某个可容忍比例之内。

5性能

在这一章我们衡量了MapReduce在大型机器集群上的性能。一个特定的计算搜索大约1TB的数据来查找特定的模式。其他的计算排序大约1个TB的数据。

这俩个程序代表了用户编写的真实程序中的一个大子集-一类程序将数据从一种表示变换到另一种表示,另一种类型从大数据集中摘取小量的感兴趣数据。

5.1集群配置

IDE是Integrated Drive Electronics,集成驱动电路。它是一种被用来连接硬盘驱动链接到计算机的主板。

所有的这些程序由一个大约1800台机器组成的集群上运行。每个机器上有2GHz的intelXeon处理器开启了超线程计数,4GB的内存,俩个160GB的IDE磁盘和千兆以太网。机器被布置了一个2层的树形交换网络,其中根节点大概有100-200Gbps的总贷款可以使用。所有机器都来自相同的托管设施中,任何一对机器的往返时间都不超过1毫秒。

内存不足4gb,大约1-1.5G被集群上的其他任务所保留。程序是在一个周末的下午被执行的,此时CPU,磁盘和网络最大程度的空闲。

5.2查询

查询程序扫描了${10}^{10}{100}$ -byte的记录,查询一个相对函件的三字符模式(个模式出现在92339个记录中)。这个输入被分成64MB每片(M=15000),所有的输出被放置在一个文件上(R=1)

图2.随着时间变化的数据传输率


图二展示了随着时间的进展。y轴表示了扫描输入数据扫描的速率,这个速率逐渐提升这是因为一些机器被指派到了MapReduce计算,顶峰超过了30GB/s此时1764个工作节点被指派到了任务。当map任务结束时,速率开始下降大约80秒以后达到0。整个计算从开始到结束大约花费了150秒。这包括了大约一分钟的启动开销。这个开销是由于传播程序到所有的机器和GFS交互打开1000个输入文件并获得局部性最优化所需的信息延迟。

5.3排序

排序程序排序了 ${10}^{10}{100}$ -byte 的记录(接近1TB的数据)。这个程序模型是在TeraSort基准测试之后建模的。

排序代码包括了不到50行的用户代码。3行的map函数从文本行中摘取了10byte排序key兵提交key和源文本行作为中间键值对。我们使用了恒等函数作为reduce操作。这个函数通过中间键值对作为输出键值对不加更改的进行传递。最终的排序输出被写到了入一组双向复制的GFS文件。(写入了2TB的输出文件程序)

在那之后,输入数据被划分成64MB一片(M=15000)。我们将排序后的输出为4000个文件(R=4000)。分区函数使用键的初始字节将其隔离位R块中的一个。

这个基准测试的分区函数内置了键分布的知识。在一般的排序程序中,我们需要添加预传递MapReduce操作,它将收集键样本,他收集键的样本,使用采样的秘钥分布、来计算最终排序过程的分割点。

图3:不同的执行程序随着时间数据变换率速率


图3展示了正常执行排序程序的进程。左上角的图展示了输入的读取率。这个速率顶峰在13GB/s并且很快就消失了,因为所有的map任务在200秒后结束。记住输入率是小于查询的。这是因为排序map任务速度花费了大概一般时间,I/O带宽写入中间输出到本地磁盘。grep对应的中间输出大小可以忽略不记。

中间左边的图展示了从map任务发送到reduce任务的网络速率。一旦map任务完成洗牌任务就开始了。在在图中的第一个凸起是因为第一批接近1700的reduce任务(整个MapReduce任务指定了大约1700台机器,每个机器执行最多一个reduce任务)。大概300秒之后,第一批reduce任务中的一些完成了,我们开始为剩下的reduce变换数据。所有的洗牌大约在600秒后完成。

左下角的图片展示了排序的数据被reduce任务写入到最终输出文件的速率。他在第一次洗牌结束和写入期开始之前有一个延迟,因为机器忙于排序中间数据。些操作保持2-4GB每秒的速率一段时间。所有写入操作在计算完成后大约850秒完成。包括启动开销,整个计算花费了891秒。这与TeraSort基准测试报告的1057秒的最佳结果相似。

有几点需要注意:输入率是高于洗牌率的,并且输入率因为我们的本地优化-大部分的数据是从本地磁盘并绕过我们相关的带宽限制。洗牌率是高于输出率的,因为输出写了俩个排序数据的副本(我们因为可用性与可靠性的原因制作输出的俩个副本)。我们编写两个副本,因为这是底层文件系统提供的可靠性和可用性机制。如果底层文件系统使用擦除编码[14]而不是复制,则写入数据所需的网络带宽将会减少。5.

5.4备份任务的效果

在图3b中,我们展示了在备份任务被禁用时排序程序的执行。这个执行流程接近图3a中展示的,除了有一个很长的尾巴其中几乎没有写活动发生。在960秒后,除了5个reduce任务都已经完成了。然而这些掉队者在300秒后才完成任务。整个计算任务花费了1283 秒,运行时间增加了44%。

5.5机器故障

在图3c中,我们展示了排序程序的执行其中,在计算过程开始的几分钟后,我们有意的在1748个工作进程中杀死了200个进程。底层的机器立即重新启动这些机器上的心工作进程。(因为只有进程被杀死了,机器还正常工作)

工作节点的死亡数展示了负面的输入率,因为一些一些先前的计算的map任务消失了(因为对应的map工作进程被杀死了)需要重做。再次执行map任务相对较快。整个计算在933秒结束包括了启动开销在内。(只是你正常执行时间增加了5%)。

6经验

我们些的第以个mapreduce版本是在2003年的2月,并在2003年的8月对其进行了重大的改进,包括了本地优化,任务在不同工作节点之间的动态负载均衡等。自那时起,我们惊喜的发现,mapreduce库对于我们处理各种问题具有广泛的适用性。它已经被广泛用于谷歌的各个领域。包括了:

  • 大尺度的机器学习问题
  • 谷歌新闻和购物产品中的问题
  • 提取用于热门查询的数据比如(谷歌Zeitgeist)
  • 为新实验和新产品提取网页属性(例如:从大量网页语料库中提取地理位置,用于本地化搜索)
  • 大尺度的图计算
Number of jobs29,423
Average job completion time634 secs
Machine days used79,186 days
Input data read3,288 TB
Intermediate data produced758 TB
Output data written193 TB
Average worker machines per job157
Average worker deaths per job1.2
Average map tasks per job3,351
Average reduce tasks per job55
Unique map implementations395
Unique reduce implementations269
Unique map/reduce combinations426
图4MapReduce随着时间增加的实例。 * 图4展示了随着时间的推移,检入主源代码管理系统独立MapReduce程序数量显著的增长,从2003年初的零到2004年9月底的近900个独立案例。MapReduce之所以如此成功,是因为它使得编写一个简单的程序称为可能,在半小时内在数以千计的机器上运行,大大加块了开发速度和设计周期。进一步,它允许程序员没有分布式和并行系统的经验下开发大量的资源变得简单。 在每项工作结束时,MapReduce库日志统计作业使用资源信息。在表1中,我们展示了2004年8月在Google运行的MapReduce作业子集的一些统计数据。 ### 6.1大规模索引 MapReduce最重要的用途之一是完全重写了生产索引系统,它产生的数据结构被谷歌用来做web检索服务。索引系统将爬取到的大量文档作为输入,存储为一组GFS文件。原始内容超过了20TB的数据。索引过程以5-10个MapReduce操作顺序运行。使用MapReduce(而不是以前版本的索引系统中的ad-hoc分布式传递)提供了几个好处: - 索引代码很简单,小巧,易于理解,因为处理容错分发,和并行的代码被隐藏起到MapReduce库中。例如,计算的一个阶段大大减小了,从接近3800行c++代码到用MapReduce表达时的接近700行。 - MapReduce库的性能已经足够好了,它可以保持概念上无关的概念分开,而不是将他们混合在一起以避免数据进行额外的传递。这使得索引的改变过程变得容易。例如,在我们的老索引系统中,一个改变花费了几个月的时间,在新系统中只需要今天。 - 索引进程的操作变得更简单,因为大部分问题是由机器故障,慢的机器,和网络抖动导致的,这些由MapReduce库自动处理,不需要操作 干预。此外,通过添加新的机器到索引集群中提升索引进程的性能容易。 ## 7相关工作 许多系统提供了受限的编程模型,并利用这些约束自动并行化计算。例如,可以在 N个处理器上使用并行前缀计算,在 log⁡N时间内计算出N元素数组的所有前缀上的关联函数。更重要的是,我们提供的容错实现可以扩展到数千个处理器。作为对比,大部分并行程序才刚刚实现小尺度壁并把机器故障处理的细节留给程序员。 > MPI:(Message Passing Interface )它代表了信息传递接口,它是一个标准的再并行计算进程之间通信的协议。MPI原语是指,MPI库提供的基础的构建模块和基本的函数,程序员在程序中使用它来实现通信和同步。 批量同步编程和一些MPI原语提供了高级的抽象,这使得简单的并行程序更容易被程序员书写。这些系统和MapReduce之间的一个关键区别是,MapReduce利用受限的编程模型来自动的并行用户程序并提供透明的容错。 我们的区域优化灵感主要来自于active disks技术,其中计算被推入处理单元,它靠近本地磁盘以减少I/O子系统以及网络的数据发送量。我们在连接小数量的磁盘上的通用处理器上运行,而不是之间在磁盘控制器处理器上直接运行,但是总体的方法是类似的。 我们的备份机制类似于在Charlotte 系统上的急切调度机制。一个急切调度机制的毛病是,如果给定的任务重复失败,整个计算将失败。我们修补了一些问题的事例,在我们的方法中可以跳过坏掉的记录。 MapReduce的实现依赖于内部集群管理系统,它负责在大型共享机器上分发和运行用户任务。虽然不是本文的重点,集群管理系统在精神上与Condor等其他系统相似。 MapReduce库中部分的设施很接近NOW-Sort 的运作。资源机器(map工作节点)将数据分割排序并发送到R个reduce工作节点上。每个reduce机器本地排序数据(如果可以的话在内存中)。当然NOW-Sort没有用户定义的Map和Reduce函数,这使得我们库具有广泛的适用性。 River提供了进程模型其中进程之间通过发送数据到分布式队列进行彼此相互连接。就像MapReduce,River系统尝试提供好的平均案例表现,甚至在异构硬件的非均匀型或者系统扰动下。River通过磁盘的仔细调度和网络传输已实现平衡的完成时间。MapReduce有不同的方式。通过限制程序模型,MapReduce框架可以将任务分区到大数量的细粒度任务只。这些任务是在可用工作节点上动态调度的,以至于更快的工作节点会处理更多的任务。受限的编程模型还允许我们安排在作业接近尾声时重复执行任务,它大大减少了不一致存在的时间(比如很慢或者卡主的机器)。 BAD-FS和MapReduce有非常不一样的执行模型,与MapReduce不同,目标在执行穿过大网络区域的作业。然而,有俩个根本相似之处。俩个系统使用了荣誉的执行以恢复来自故障导致的数据丢失。俩者都用了本地化软件调度以减少数据在阻塞的网络链接中中的发送量。 TACC是一个设计用来简化高可用网络服务构建的系统。就像MapReduce,它依赖于重新执行作为实现容错的机制。 ## 8结论 MapReduce程序模型已经成功的备用在了谷歌许多不同的目标上。我们将这些成功归功于几个原因。第一,模型易于使用,甚至对于没有并行和分布式系统经验的程序员来说,因为它隐藏了并行,容错,本地优化和负载均衡的细节。第二,多样化的问题可以简单的表示为MapReduce计算。例如MapReduce被用来生成谷歌的搜索引擎生产数据,排序,数据采集,机器学习和一些其他的系统中。第三,我们开发的MapReduce的实现,它可以缩放到上千台机器的大规模的机器集群。这个实现让这些机器资源更有效率,因此合适在更多谷歌遭遇的大型的计算问题上使用。 在这个工作中我们学习到了几件事。第一,受限的程序模型使得它很简单的并行和分发计算以及进行计算容错。第二,网络带宽是稀缺资源。我们系统中的许多优化都是针对减少数据在网络中的发送量:本地优化通过我们从本地磁盘中读取数据,并写入到单个副本的中间数据到本地磁盘中意节省网络带宽。第三,冗余的执行可以用来减少慢机器的影响,并处理机器的故障和数据的丢失。
Last modification:November 14, 2024
如果觉得我的文章对你有用,请随意赞赏