1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
|
这些计数器的值周期性的从各个单独的 worker 机器上传递给 maste(r 附加在 ping 的应答包中传递)。master 把执行成功的 Map 和 Reduce 任务的计数器值进行累计,当 MapReduce 操作完成之后,返回给用户代码。
计数器当前的值也会显示在 master 的状态页面上,这样用户就可以看到当前计算的进度。当累加计数器 的值的时候,master 要检查重复运行的 Map 或者 Reduce 任务,避免重复累加(之前提到的备用任务和失效 后重新执行任务这两种情况会导致相同的任务被多次执行)。
有些计数器的值是由 MapReduce 库自动维持的,比如已经处理的输入的 key/value pair 的数量、输出的 key/value pair 的数量等等。
计数器机制对于 MapReduce 操作的完整性检查非常有用。比如,在某些 MapReduce 操作中,用户需要确 保输出的 key value pair 精确的等于输入的 key value pair,或者处理的 German 文档数量在处理的整个文档数 量中属于合理范围。
{% note info %} ## 性能 {% endnote %}
本节我们用在一个大型集群上运行的两个计算来衡量 MapReduce 的性能。一个计算在大约 1TB 的数据中 进行特定的模式匹配,另一个计算对大约 1TB 的数据进行排序。
这两个程序在大量的使用 MapReduce 的实际应用中是非常典型的 — 一类是对数据格式进行转换,从一 种表现形式转换为另外一种表现形式;另一类是从海量数据中抽取少部分的用户感兴趣的数据。 ### 集群配置
所有这些程序都运行在一个大约由 1800 台机器构成的集群上。每台机器配置 2 个 2G 主频、支持超线程 的 Intel Xeon 处理器,4GB 的物理内存,两个 160GB 的 IDE 硬盘和一个千兆以太网卡。这些机器部署在一个 两层的树形交换网络中,在 root 节点大概有 100-200GBPS 的传输带宽。所有这些机器都采用相同的部署(对 等部署),因此任意两点之间的网络来回时间小于 1 毫秒。
在 4GB 内存里,大概有 1-1.5G 用于运行在集群上的其他任务。测试程序在周末下午开始执行,这时主机 的 CPU、磁盘和网络基本上处于空闲状态。
### GREP
这个分布式的 grep 程序需要扫描大概 10 的 10 次方个由 100 个字节组成的记录,查找出现概率较小的 3 个字符的模式(这个模式在 92337 个记录中出现)。输入数据被拆分成大约 64M 的 Block(M=15000),整个 输出数据存放在一个文件中(R=1)。 ![](http://oliji9s3j.bkt.clouddn.com/15133914679471.jpg)
图 2 显示了这个运算随时间的处理过程。其中 Y 轴表示输入数据的处理速度。处理速度随着参与 MapReduce 计算的机器数量的增加而增加,当 1764 台 worker 参与计算的时,处理速度达到了 30GB/s。当 Map 任务结束的时候,即在计算开始后 80 秒,输入的处理速度降到 0。整个计算过程从开始到结束一共花了 大概 150 秒。这包括了大约一分钟的初始启动阶段。初始启动阶段消耗的时间包括了是把这个程序传送到各 个 worker 机器上的时间、等待 GFS 文件系统打开 1000 个输入文件集合的时间、获取相关的文件本地位置优 化信息的时间。
### 排序
排序程序处理 10 的 10 次方个 100 个字节组成的记录(大概 1TB 的数据)。这个程序模仿 TeraSort
benchmark[10]。
排序程序由不到 50 行代码组成。只有三行的 Map 函数从文本行中解析出 10 个字节的 key 值作为排序的 key,并且把这个 key 和原始文本行作为中间的 key/value pair 值输出。我们使用了一个内置的恒等函数作为 Reduce 操作函数。这个函数把中间的 key/value pair 值不作任何改变输出。最终排序结果输出到两路复制的GFS 文件系统(也就是说,程序输出 2TB 的数据)。
如前所述,输入数据被分成 64MB 的 Block(M=15000)。我们把排序后的输出结果分区后存储到 4000
个文件(R=4000)。分区函数使用 key 的原始字节来把数据分区到 R 个片段中。
在这个 benchmark 测试中,我们使用的分区函数知道 key 的分区情况。通常对于排序程序来说,我们会
增加一个预处理的 MapReduce 操作用于采样 key 值的分布情况,通过采样的数据来计算对最终排序处理的分 区点。 ![](http://oliji9s3j.bkt.clouddn.com/15133915176963.jpg)
图三(a)显示了这个排序程序的正常执行过程。左上的图显示了输入数据读取的速度。数据读取速度峰 值会达到 13GB/s,并且所有 Map 任务完成之后,即大约 200 秒之后迅速滑落到 0。值得注意的是,排序程序 输入数据读取速度小于分布式 grep 程序。这是因为排序程序的 Map 任务花了大约一半的处理时间和 I/O 带宽 把中间输出结果写到本地硬盘。相应的分布式 grep 程序的中间结果输出几乎可以忽略不计。
左边中间的图显示了中间数据从 Map 任务发送到 Reduce 任务的网络速度。这个过程从第一个 Map 任务 完成之后就开始缓慢启动了。图示的第一个高峰是启动了第一批大概 1700 个 Reduce 任务(整个 MapReduce 分布到大概 1700 台机器上,每台机器 1 次最多执行 1 个 Reduce 任务)。排序程序运行大约 300 秒后,第一批 启动的 Reduce 任务有些完成了,我们开始执行剩下的 Reduce 任务。所有的处理在大约 600 秒后结束。
左下图表示 Reduce 任务把排序后的数据写到最终的输出文件的速度。在第一个排序阶段结束和数据开始 写入磁盘之间有一个小的延时,这是因为 worker 机器正在忙于排序中间数据。磁盘写入速度在 2-4GB/s 持续 一段时间。输出数据写入磁盘大约持续 850 秒。计入初始启动部分的时间,整个运算消耗了 891 秒。这个速
度和 TeraSort benchmark[18]的最高纪录 1057 秒相差不多。
还有一些值得注意的现象:输入数据的读取速度比排序速度和输出数据写入磁盘速度要高不少,这是因
为我们的输入数据本地化优化策略起了作用 — 绝大部分数据都是从本地硬盘读取的,从而节省了网络带宽。 排序速度比输出数据写入到磁盘的速度快,这是因为输出数据写了两份(我们使用了 2 路的 GFS 文件系统, 写入复制节点的原因是为了保证数据可靠性和可用性)。我们把输出数据写入到两个复制节点的原因是因为这 是底层文件系统的保证数据可靠性和可用性的实现机制。如果底层文件系统使用类似容错编码[14](erasure coding)的方式而不是复制的方式保证数据的可靠性和可用性,那么在输出数据写入磁盘的时候,就可以降低 网络带宽的使用。
### 高效的 backup 任务
图三(b)显示了关闭了备用任务后排序程序执行情况。执行的过程和图 3(a)很相似,除了输出数据写 磁盘的动作在时间上拖了一个很长的尾巴,而且在这段时间里,几乎没有什么写入动作。在 960 秒后,只有 5 个 Reduce 任务没有完成。这些拖后腿的任务又执行了 300 秒才完成。整个计算消耗了 1283 秒,多了 44% 的执行时间。
### 失效的机器
在图三(c)中演示的排序程序执行的过程中,我们在程序开始后几分钟有意的 kill 了 1746 个 worker 中 的 200 个。集群底层的调度立刻在这些机器上重新开始新的 worker 处理进程(因为只是 worker 机器上的处理 进程被 kill 了,机器本身还在工作)。
图三(c)显示出了一个“负”的输入数据读取速度,这是因为一些已经完成的 Map 任务丢失了(由于 相应的执行 Map 任务的 worker 进程被 kill 了),需要重新执行这些任务。相关 Map 任务很快就被重新执行了。 整个运算在 933 秒内完成,包括了初始启动时间(只比正常执行多消耗了 5%的时间)。
{% note info %} ## 经验 {% endnote %}
我们在 2003 年 1 月完成了第一个版本的 MapReduce 库,在 2003 年 8 月的版本有了显著的增强,这包括 了输入数据本地优化、worker 机器之间的动态负载均衡等等。从那以后,我们惊喜的发现,MapReduce 库能 广泛应用于我们日常工作中遇到的各类问题。它现在在 Google 内部各个领域得到广泛应用,包括:
1. 大规模机器学习问题 2. Google News 和 Froogle 产品的集群问题
3. 从公众查询产品(比如 Google 的 Zeitgeist)的报告中抽取数据。
4. 从大量的新应用和新产品的网页中提取有用信息(比如,从大量的位置搜索网页中抽取地理位置信息)。
5. 大规模的图形计算。
![](http://oliji9s3j.bkt.clouddn.com/15133916089336.jpg)
图四显示了在我们的源代码管理系统中,随着时间推移,独立的 MapReduce 程序数量的显著增加。从 2003 年早些时候的 0 个增长到 2004 年 9 月份的差不多 900 个不同的程序。MapReduce 的成功取决于采用 MapReduce 库能够在不到半个小时时间内写出一个简单的程序,这个简单的程序能够在上千台机器的组成的集群上做大 规模并发处理,这极大的加快了开发和原形设计的周期。另外,采用 MapReduce 库,可以让完全没有分布式 和/或并行系统开发经验的程序员很容易的利用大量的资源,开发出分布式和/或并行处理的应用。 ![](http://oliji9s3j.bkt.clouddn.com/15133916531743.jpg)
在每个任务结束的时候,MapReduce 库统计计算资源的使用状况。在表 1,我们列出了 2004 年 8 月份 MapReduce 运行的任务所占用的相关资源。
### 大规模索引
到目前为止,MapReduce 最成功的应用就是重写了 Google 网络搜索服务所使用到的 index 系统。索引系 统的输入数据是网络爬虫抓取回来的海量的文档,这些文档数据都保存在 GFS 文件系统里。这些文档原始内 容4的大小超过了 20TB。索引程序是通过一系列的 MapReduce 操作(大约 5 到 10 次)来建立索引。使用 MapReduce(替换上一个特别设计的、分布式处理的索引程序)带来这些好处:
实现索引部分的代码简单、小巧、容易理解,因为对于容错、分布式以及并行计算的处理都是 MapReduce 库提供的。比如,使用 MapReduce 库,计算的代码行数从原来的 3800 行 C++代码减少到大概 700 行代码。
MapReduce 库的性能已经足够好了,因此我们可以把在概念上不相关的计算步骤分开处理,而不是混在 一起以期减少数据传递的额外消耗。概念上不相关的计算步骤的隔离也使得我们可以很容易改变索引处理方 式。比如,对之前的索引系统的一个小更改可能要耗费好几个月的时间,但是在使用 MapReduce 的新系统上, 这样的更改只需要花几天时间就可以了。
索引系统的操作管理更容易了。因为由机器失效、机器处理速度缓慢、以及网络的瞬间阻塞等引起的绝 大部分问题都已经由 MapReduce 库解决了,不再需要操作人员的介入了。另外,我们可以通过在索引系统集 群中增加机器的简单方法提高整体处理性能。
{% note info %} ## 相关工作 {% endnote %} 很多系统都提供了严格的编程模式,并且通过对编程的严格限制来实现并行计算。例如,一个结合函数可以通过把 N 个元素的数组的前缀在 N 个处理器上使用并行前缀算法,在 log N 的时间内计算完[6,9,13]5。
MapReduce 可以看作是我们结合在真实环境下处理海量数据的经验,对这些经典模型进行简化和萃取的成果。 更加值得骄傲的是,我们还实现了基于上千台处理器的集群的容错处理。相比而言,大部分并发处理系统都 只在小规模的集群上实现,并且把容错处理交给了程序员。
Bulk Synchronous Programming[17]和一些 MPI 原语[11]提供了更高级别的并行处理抽象,可以更容易写 出并行处理的程序。MapReduce 和这些系统的关键不同之处在于,MapReduce 利用限制性编程模式实现了用 户程序的自动并发处理,并且提供了透明的容错处理。
我们数据本地优化策略的灵感来源于 active disks[12,15]等技术,在 active disks 中,计算任务是尽量推送 到数据存储的节点处理6,这样就减少了网络和 IO 子系统的吞吐量。我们在挂载几个硬盘的普通机器上执行 我们的运算,而不是在磁盘处理器上执行我们的工作,但是达到的目的一样的。
我们的备用任务机制和 Charlotte System[3]提出的 eager 调度机制比较类似。Eager 调度机制的一个缺点是 如果一个任务反复失效,那么整个计算就不能完成。我们通过忽略引起故障的记录的方式在某种程度上解决 了这个问题。
MapReduce 的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一个超大的、共享机器的 集群上分布和运行用户任务。虽然这个不是本论文的重点,但是有必要提一下,这个集群管理系统在理念上 和其它系统,如 Condor[16]是一样。
MapReduce 库的排序机制和 NOW-Sort[1]的操作上很类似。读取输入源的机器(map workers)把待排序 的数据进行分区后,发送到 R 个 Reduce worker 中的一个进行处理。每个 Reduce worker 在本地对数据进行排 序(尽可能在内存中排序)。当然,NOW-Sort 没有给用户自定义的 Map 和 Reduce 函数的机会,因此不具备 MapReduce 库广泛的实用性。
River[2]提供了一个编程模型:处理进程通过分布式队列传送数据的方式进行互相通讯。和 MapReduce 类似,River 系统尝试在不对等的硬件环境下,或者在系统颠簸的情况下也能提供近似平均的性能。River 是 通过精心调度硬盘和网络的通讯来平衡任务的完成时间。MapReduce 库采用了其它的方法。通过对编程模型 进行限制,MapReduce 框架把问题分解成为大量的“小”任务。这些任务在可用的 worker 集群上动态的调度, 这样快速的 worker 就可以执行更多的任务。通过对编程模型进行限制,我们可用在工作接近完成的时候调度 备用任务,缩短在硬件配置不均衡的情况下缩小整个操作完成的时间(比如有的机器性能差、或者机器被某 些操作阻塞了)。
BAD-FS[5]采用了和 MapReduce 完全不同的编程模式,它是面向广域网(alex 注:wide-area network)的。不过,这两个系统有两个基础功能很类似。
1. 两个系统采用重新执行的方式来防止由于失效导致的数据丢
失。 2. 两个都使用数据本地化调度策略,减少网络通讯的数据量。
TACC[7]是一个用于简化构造高可用性网络服务的系统。和 MapReduce 一样,它也依靠重新执行机制来
实现的容错处理。
{% note info %} ## 结束语 {% endnote %}
MapReduce 编程模型在 Google 内部成功应用于多个领域。我们把这种成功归结为几个方面:首先,由于 MapReduce 封装了并行处理、容错处理、数据本地化优化、负载均衡等等技术难点的细节,这使得 MapReduce 库易于使用。即便对于完全没有并行或者分布式系统开发经验的程序员而言;其次,大量不同类型的问题都 可以通过 MapReduce 简单的解决。比如,MapReduce 用于生成 Google 的网络搜索服务所需要的数据、用来 排序、用来数据挖掘、用于机器学习,以及很多其它的系统;第三,我们实现了一个在数千台计算机组成的 大型集群上灵活部署运行的 MapReduce。这个实现使得有效利用这些丰富的计算资源变得非常简单,因此也 适合用来解决 Google 遇到的其他很多需要大量计算的问题。
我们也从 MapReduce 开发过程中学到了不少东西。首先,约束编程模式使得并行和分布式计算非常容易, 也易于构造容错的计算环境;其次,网络带宽是稀有资源。大量的系统优化是针对减少网络传输量为目的的: 本地优化策略使大量的数据从本地磁盘读取,中间文件写入本地磁盘、并且只写一份中间文件也节约了网络 带宽;第三,多次执行相同的任务可以减少性能缓慢的机器带来的负面影响(alex 注:即硬件配置的不平衡), 同时解决了由于机器失效导致的数据丢失问题。
{% note info %} ## 感谢 {% endnote %}
Josh Levenberg has been instrumental in revising and extending the user-level MapReduce API with a number of new features based on his experience with using MapReduce and other people’s suggestions for enhancements. MapReduce reads its input from and writes its output to the Google File System [8]. We would like to thank Mohit Aron, Howard Gobioff, Markus Gutschke, David Kramer, Shun-Tak Leung, and Josh Redstone for their work in developing GFS. We would also like to thank Percy Liang and Olcan Sercinoglu for their work in developing the cluster management system used by MapReduce. Mike Burrows, Wilson Hsieh, Josh Levenberg, Sharon Perl, Rob Pike, and Debby Wallach provided helpful comments on earlier drafts of this paper.The anonymous OSDI reviewers, and our shepherd, Eric Brewer, provided many useful suggestions of areas where the paper could be improved. Finally, we thank all the users of MapReduce within Google’s engineering organization for providing helpful feedback, suggestions, and bug reports.
{% note info %} ## 参考资料 {% endnote %}
[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Patterson.High-performance sorting on networks of workstations.In Proceedings of the 1997 ACM SIGMOD InternationalConference on Management of Data, Tucson,Arizona, May 1997.
[2] Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River:Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), pages 10.22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H ̈olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22.28, April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989.
[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78. 91, Saint-Malo, France, 1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29.43, Lake George, New York, 2003. To appear in OSDI 2004 12
[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par’96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401.408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.
[11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.
[12] L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A.
Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004USENIX File and Storage Technologies FAST Conference, April 2004.
[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831.838,
1980.
[14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal
of the ACM, 36(2):335.348, 1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data
processing. IEEE Computer, pages 68.74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor
experience. Concurrency and Computation: Practice and Experience, 2004.
[17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103.111,
1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf. {% note info %} ## 附录 A-单词频率统计 {% endnote %} 本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每一个不同的单词出现频率。
```python #include “mapreduce/mapreduce.h” // User’s map function
class WordCounter :public Mapper{ public: virtual void Map(const MapInput&input){ const string&text=input.value(); const int n=text.size(); for(int i=0;i<n; ){ // Skip past leading whitespace while((i<n)&&isspace(text[i]))i++; // Find word end int start=i; while((i<n)&&!isspace(text[i]))i++; if(start<i)Emit(text.substr(start,i-start),”1′′); }} }; REGISTER_MAPPER(WordCounter);
// User’s reduce function class Adder :public Reducer{ virtual void Reduce(ReduceInput*input){ // Iterate over all entries with the // same key and add the values int64 value=0; while(!input->done()){ value+=StringToInt(input->value());input->NextValue(); } // Emit sum for input->key() Emit(IntToString(value));} }; REGISTER_REDUCER(Adder); int main(int argc,char**argv){ParseCommandLineFlags(argc,argv); MapReduceSpecification spec; // Store list of input files into “spec” for(int i=1;i<argc; i++){ MapReduceInput*input=spec.add_input(); input->set_format(“text”); input->set_filepattern(argv[i]); input->set_mapper_class(“WordCounter”); } // Specify the output files: // /gfs/test/freq-00000-of-00100 // /gfs/test/freq-00001-of-00100 // ... MapReduceOutput*out=spec.output(); out->set_filebase(“/gfs/test/freq”); out->set_num_tasks(100); out->set_format(“text”); out->set_reducer_class(“Adder”); // Optional: do partial sums within map tasks to save network bandwidth out->set_combiner_class(“Adder”); // Tuning parameters: use at most 2000 // machines and 100 MB of memory per task spec.set_machines(2000); spec.set_map_megabytes(100); spec.set_reduce_megabytes(100); // Now run it MapReduceResult result; if(!MapReduce(spec,&result))abort(); // Done: ‘result’ structure contains info about counters, time taken, number of machines used, etc. return 0; }
|