MapReduce Patterns, Algorithms, and Use Cases

转载 Ilya Katsov; Juliashine [译]   2013-10-16 22:06  

In this article I digested a number of MapReduce patterns and algorithms to give a systematic view of the different techniques that can be found on the web or scientific articles. Several practical case studies are also provided. All descriptions and code snippets use the standard Hadoop’s MapReduce model with Mappers, Reduces, Combiners, Partitioners, and sorting. This framework is depicted in the figure below.

在这篇文章里总结了几种网上或者论文中常见的MapReduce模式和算法,并系统化的解释了这些技术的不同之处。所有描述性的文字和代码都使用了标准hadoop的MapReduce模型,包括Mappers、Reduces、Combiners、Partitioners和sorting。如下图所示:

/media/note/2013/10/16/mapreduce-patterns/fig1.png

MapReduce Framework

Basic MapReduce Patterns

Counting and Summing

Problem Statement:

There is a number of documents where each document is a set of terms. It is required to calculate a total number of occurrences of each term in all documents. Alternatively, it can be an arbitrary function of the terms. For instance, there is a log file where each record contains a response time and it is required to calculate an average response time.

有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。

Solution:

Let start with something really simple. The code snippet below shows Mapper that simply emit “1″ for each term it processes and Reducer that goes through the lists of ones and sum them up:

让我们先从简单的例子入手。在下面的代码片段里,Mapper每遇到指定词就把频次记1,Reducer一个个遍历这些词的集合然后把他们的频次加和。

class Mapper
   method Map(docid id, doc d)
      for all term t in doc d do
         Emit(term t, count 1)

class Reducer
   method Reduce(term t, counts [c1, c2,...])
      sum = 0
      for all count c in [c1, c2,...] do
          sum = sum + c
      Emit(term t, count sum)

The obvious disadvantage of this approach is a high amount of dummy counters emitted by the Mapper. The Mapper can decrease a number of counters via summing counters for each document:

这种方法的缺点显而易见,Mapper提交了太多无意义的计数。它完全可以通过先对每个文档中的词进行计数从而减少传递给Reducer的数据量:

class Mapper
   method Map(docid id, doc d)
      H = new AssociativeArray
      for all term t in doc d do
          H{t} = H{t} + 1
      for all term t in H do
         Emit(term t, count H{t})

In order to accumulate counters not only for one document, but for all documents processed by one Mapper node, it is possible to leverage Combiners:

如果要累计计数的的不只是单个文档中的内容,还包括了一个Mapper节点处理的所有文档,那就要用到Combiner了:

class Mapper
   method Map(docid id, doc d)
      for all term t in doc d do
         Emit(term t, count 1)

class Combiner
   method Combine(term t, [c1, c2,...])
      sum = 0
      for all count c in [c1, c2,...] do
          sum = sum + c
      Emit(term t, count sum)

class Reducer
   method Reduce(term t, counts [c1, c2,...])
      sum = 0
      for all count c in [c1, c2,...] do
          sum = sum + c
      Emit(term t, count sum)

Applications:

Log Analysis, Data Querying

Collating

Problem Statement:

There is a set of items and some function of one item. It is required to save all items that have the same value of function into one file or perform some other computation that requires all such items to be processed as a group. The most typical example is building of inverted indexes.

有一系列条目,每个条目都有几个属性,要把具有同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。

Solution:

The solution is straightforward. Mapper computes a given function for each item and emits value of the function as a key and item itself as a value. Reducer obtains all items grouped by function value and process or save them. In case of inverted indexes, items are terms (words) and function is a document ID where the term was found.

解决方案很简单。在Mapper中以每个条目的所需属性值作为key,其本身作为值传递给Reducer。Reducer取得按照属性值分组的条目,然后可以处理或者保存。如果是在构建倒排索引,那么每个条目相当于一个词而属性值就是词所在的文档ID。

Applications:

Inverted Indexes, ETL

Filtering (“Grepping”), Parsing, and Validation

Problem Statement:

There is a set of records and it is required to collect all records that meet some condition or transform each record (independently from other records) into another representation. The later case includes such tasks as text parsing and value extraction, conversion from one format to another.

假设有很多条记录,需要从其中找出满足某个条件的所有记录,或者将每条记录转换成另外一种形式(转换操作相对于各条记录独立,即对一条记录的操作与其他记录无关)。像文本解析、特定值抽取、格式转换等都属于后一种用例。

Solution:

Solution is absolutely straightforward – Mapper takes records one by one and emits accepted items or their transformed versions.

非常简单,在Mapper里逐条进行操作,输出需要的值或转换后的形式。

Applications:

Log Analysis, Data Querying, ETL, Data Validation

Distributed Task Execution

Problem Statement:

There is a large computational problem that can be divided into multiple parts and results from all parts can be combined together to obtain a final result.

大型计算可以分解为多个部分分别进行然后合并各个计算的结果以获得最终结果。

Solution:

Problem description is split in a set of specifications and specifications are stored as input data for Mappers. Each Mapper takes a specification, performs corresponding computations and emits results. Reducer combines all emitted parts into the final result.

将数据切分成多份作为每个Mapper的输入,每个Mapper处理一份数据,执行同样的运算,产生结果,Reducer把多个Mapper的结果组合成一个。

Case Study: Simulation of a Digital Communication System

There is a software simulator of a digital communication system like WiMAX that passes some volume of random data through the system model and computes error probability of throughput. Each Mapper runs simulation for specified amount of data which is 1/Nth of the required sampling and emit error rate. Reducer computes average error rate.

像WiMAX这样的数字通信模拟软件通过系统模型来传输大量的随机数据,然后计算传输中的错误几率。每个Mapper处理样本1/N的数据,计算出这部分数据的错误率,然后在Reducer里计算平均错误率。

Applications:

Physical and Engineering Simulations, Numerical Analysis, Performance Testing

Sorting

Problem Statement:

There is a set of records and it is required to sort these records by some rule or process these records in a certain order.

有许多条记录,需要按照某种规则将所有记录排序或是按照顺序来处理记录。

Solution:

Simple sorting is absolutely straightforward – Mappers just emit all items as values associated with the sorting keys that are assembled as function of items. Nevertheless, in practice sorting is often used in a quite tricky way, that’s why it is said to be a heart of MapReduce (and Hadoop). In particular, it is very common to use composite keys to achieve secondary sorting and grouping.

简单排序很好办 – Mappers将待排序的属性值为键,整条记录为值输出。不过实际应用中的排序要更加巧妙一点,这就是它之所以被称为MapReduce核心的原因(还有Hadoop)。在实践中,常用组合键来实现二次排序和分组。

Sorting in MapReduce is originally intended for sorting of the emitted key-value pairs by key, but there exist techniques that leverage Hadoop implementation specifics to achieve sorting by values. See this blog for more details.

MapReduce最初只能够对键排序,但是也有技术可以利用Hadoop的特性来实现按值排序。想了解的话可以看 这篇博客

It is worth noting that if MapReduce is used for sorting of the original (not intermediate) data, it is often a good idea to continuously maintain data in sorted state using BigTable concepts. In other words, it can be more efficient to sort data once during insertion than sort them for each MapReduce query.

按照BigTable的概念,使用MapReduce来对最初数据而非中间数据排序,也即保持数据的有序状态更有好处,必须注意这一点。换句话说,在数据插入时排序一次要比在每次查询数据的时候排序更高效。

Applications:

ETL, Data Analysis

Not-So-Basic MapReduce Patterns

Iterative Message Passing (Graph Processing)

Problem Statement:

There is a network of entities and relationships between them. It is required to calculate a state of each entity on the basis of properties of the other entities in its neighborhood. This state can represent a distance to other nodes, indication that there is a neighbor with the certain properties, characteristic of neighborhood density and so on.

假设一个实体网络,实体之间存在着关系。 需要按照与它比邻的其他实体的属性计算出一个状态。这个状态可以表现为它和其它节点之间的距离,存在特定属性的邻接点的迹象,邻域密度特征等等。

Solution:

A network is stored as a set of nodes and each node contains a list of adjacent node IDs. Conceptually, MapReduce jobs are performed in iterative way and at each iteration each node sends messages to its neighbors. Each neighbor updates its state on the basis of the received messages. Iterations are terminated by some condition like fixed maximal number of iterations (say, network diameter) or negligible changes in states between two consecutive iterations. From the technical point of view, Mapper emits messages for each node using ID of the adjacent node as a key. As result, all messages are grouped by the incoming node and reducer is able to recompute state and rewrite node with the new state. This algorithm is shown in the figure below:

网络存储为系列节点的结合,每个节点包含有其所有邻接点ID的列表。按照这个概念,MapReduce迭代进行,每次迭代中每个节点都发消息给它的邻接点。邻接点根据接收到的信息更新自己的状态。当满足了某些条件的时候迭代停止,如达到了最大迭代次数(网络半径)或两次连续的迭代几乎没有状态改变。从技术上来看,Mapper以每个邻接点的ID为键发出信息,所有的信息都会按照接受节点分组,reducer就能够重算各节点的状态然后更新那些状态改变了的节点。下面展示了这个算法:

class Mapper
   method Map(id n, object N)
      Emit(id n, object N)
      for all id m in N.OutgoingRelations do
         Emit(id m, message getMessage(N))

class Reducer
   method Reduce(id m, [s1, s2,...])
      M = null
      messages = []
      for all s in [s1, s2,...] do
          if IsObject(s) then
             M = s
          else               // s is a message
             messages.add(s)
      M.State = calculateState(messages)
      Emit(id m, item M)

It should be emphasized that state of one node rapidly propagates across all the network of network is not too sparse because all nodes that were “infected” by this state start to “infect” all their neighbors. This process is illustrated in the figure below:

一个节点的状态可以迅速的沿着网络传全网,那些被感染了的节点又去感染它们的邻居,整个过程就像下面的图示一样:

/media/note/2013/10/16/mapreduce-patterns/fig2.png

Case Study: Availability Propagation Through The Tree of Categories

Problem Statement:

This problem is inspired by real life eCommerce task. There is a tree of categories that branches out from large categories (like Men, Women, Kids) to smaller ones (like Men Jeans or Women Dresses), and eventually to small end-of-line categories (like Men Blue Jeans). End-of-line category is either available (contains products) or not. Some high level category is available if there is at least one available end-of-line category in its subtree. The goal is to calculate availabilities for all categories if availabilities of end-of-line categories are know.

这个问题来自于真实的电子商务应用。将各种货物分类,这些类别可以组成一个树形结构,比较大的分类(像男人、女人、儿童)可以再分出小分类(像男裤或女装),直到不能再分为止(像男式蓝色牛仔裤)。这些不能再分的基层类别可以是有效(这个类别包含有货品)或者已无效的(没有属于这个分类的货品)。如果一个分类至少含有一个有效的子分类那么认为这个分类也是有效的。我们需要在已知一些基层分类有效的情况下找出分类树上所有有效的分类。

Solution:

This problem can be solved using the framework that was described in the previous section. We define getMessage and calculateState methods as follows:

这个问题可以用上一节提到的框架来解决。我们在下面定义了名为 getMessagecalculateState 的方法:

class N
   State in {True = 2, False = 1, null = 0}, initialized 1 or 2 for end-of-line categories, 0 otherwise

method getMessage(object N)
   return N.State

method calculateState(state s, data [d1, d2,...])
   return max( [d1, d2,...] )

Case Study: PageRank and Mapper-Side Data Aggregation

This algorithm was suggested by Google to calculate relevance of a web page as a function of authoritativeness (PageRank) of pages that have links to this page. The real algorithm is quite complex, but in its core it is just a propagation of weights between nodes where each node calculates its weight as a mean of the incoming weights:

这个算法由Google提出,使用权威的PageRank算法,通过连接到一个网页的其他网页来计算网页的相关性。真实算法是相当复杂的,但是核心思想是权重可以传播,也即通过一个节点的各联接节点的权重的均值来计算节点自身的权重。

class N
    State is PageRank

method getMessage(object N)
    return N.State / N.OutgoingRelations.size()

method calculateState(state s, data [d1, d2,...])
    return ( sum([d1, d2,...]) )

It is worth mentioning that the schema we use is too generic and doesn’t take advantage of the fact that state is a numerical value. In most of practical cases, we can perform aggregation of values on the Mapper side due to virtue of this fact. This optimization is illustrated in the code snippet below (for the PageRank algorithm):

要指出的是上面用一个数值来作为评分实际上是一种简化,在实际情况下,我们需要在Mapper端来进行聚合计算得出这个值。下面的代码片段展示了这个改变后的逻辑(针对于PageRank算法):

class Mapper
   method Initialize
      H = new AssociativeArray
   method Map(id n, object N)
      p = N.PageRank  / N.OutgoingRelations.size()
      Emit(id n, object N)
      for all id m in N.OutgoingRelations do
         H{m} = H{m} + p
   method Close
      for all id n in H do
         Emit(id n, value H{n})

class Reducer
   method Reduce(id m, [s1, s2,...])
      M = null
      p = 0
      for all s in [s1, s2,...] do
          if IsObject(s) then
             M = s
          else
             p = p + s
      M.PageRank = p
      Emit(id m, item M)

Applications:

Graph Analysis, Web Indexing

Distinct Values (Unique Items Counting)

Problem Statement:

There is a set of records that contain fields F and G. Count the total number of unique values of filed F for each subset of records that have the same G (grouped by G).

记录包含值域F和值域G,要分别统计相同G值的记录中不同的F值的数目(相当于按照G分组)。

The problem can be a little bit generalized and formulated in terms of faceted search:

这个问题可以推而广之应用于分面搜索(某些电子商务网站称之为Narrow Search)

Problem Statement:

There is a set of records. Each record has field F and arbitrary number of category labels G = {G1, G2, …} . Count the total number of unique values of filed F for each subset of records for each value of any label. Example:

Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G={b}
Record 4: F=3, G={a, b}

Result:
a -> 3   // F=1, F=2, F=3
b -> 2   // F=1, F=3
d -> 1   // F=2
e -> 1   // F=2

Solution I:

The first approach is to solve the problem in two stages. At the first stage Mapper emits dummy counters for each pair of F and G; Reducer calculates a total number of occurrences for each such pair. The main goal of this phase is to guarantee uniqueness of F values. At the second phase pairs are grouped by G and the total number of items in each group is calculated.

第一种方法是分两个阶段来解决这个问题。第一阶段在Mapper中使用F和G组成一个复合值对,然后在Reducer中输出每个值对,目的是为了保证F值的唯一性。在第二阶段,再将值对按照G值来分组计算每组中的条目数。

Phase I:

class Mapper
   method Map(null, record [value f, categories [g1, g2,...]])
      for all category g in [g1, g2,...]
         Emit(record [g, f], count 1)

class Reducer
   method Reduce(record [g, f], counts [n1, n2, ...])
      Emit(record [g, f], null )

Phase II:

class Mapper
   method Map(record [f, g], null)
      Emit(value g, count 1)

class Reducer
   method Reduce(value g, counts [n1, n2,...])
      Emit(value g, sum( [n1, n2,...] ) )

Solution II:

The second solution requires only one MapReduce job, but it is not really scalable and its applicability is limited. The algorithm is simple – Mapper emits values and categories, Reducer excludes duplicates from the list of categories for each value and increment counters for each category. The final step is to sum all counter emitted by Reducer. This approach is applicable if th number of record with the same f value is not very high and total number of categories is also limited. For instance, this approach is applicable for processing of web logs and classification of users – total number of users is high, but number of events for one user is limited, as well as a number of categories to classify by. It worth noting that Combiners can be used in this schema to exclude duplicates from category lists before data will be transmitted to Reducer.

第二种方法只需要一次MapReduce即可实现,但扩展性不强。算法很简单 – Mapper输出值和分类,在Reducer里为每个值对应的分类去重然后给每个所属的分类计数加1,最后再在Reducer结束后将所有计数加和。这种方法适用于只有有限个分类,而且拥有相同F值的记录不是很多的情况。例如网络日志处理和用户分类,用户的总数很多,但是每个用户的事件是有限的,以此分类得到的类别也是有限的。值得一提的是在这种模式下可以在数据传输到Reducer之前使用Combiner来去除分类的重复值。

class Mapper
   method Map(null, record [value f, categories [g1, g2,...] )
      for all category g in [g1, g2,...]
          Emit(value f, category g)

class Reducer
   method Initialize
      H = new AssociativeArray : category -> count
   method Reduce(value f, categories [g1, g2,...])
      [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
      for all category g in [g1', g2',...]
         H{g} = H{g} + 1
   method Close
      for all category g in H do
         Emit(category g, count H{g})

Applications:

Log Analysis, Unique Users Counting

Cross-Correlation

Problem Statement:

There is a set of tuples of items. For each possible pair of items calculate a number of tuples where these items co-occur. If the total number of items is N then N*N values should be reported.

有多个各由若干项构成的组,计算项两两共同出现于一个组中的次数。假如项数是N,那么应该计算N*N。

This problem appears in text analysis (say, items are words and tuples are sentences), market analysis (customers who buy this tend to also buy that). If N*N is quite small and such a matrix can fit in the memory of a single machine, then implementation is straightforward.

这种情况常见于文本分析(条目是单词而元组是句子),市场分析(购买了此物的客户还可能购买什么)。如果N*N小到可以容纳于一台机器的内存,实现起来就比较简单了。

Pairs Approach

The first approach is to emit all pairs and dummy counters from Mappers and sum these counters on Reducer. The shortcomings are:

第一种方法是在Mapper中给所有条目配对,然后在Reducer中将同一条目对的计数加和。但这种做法也有缺点:

  • The benefit from combiners is limited, as it is likely that all pair are distinct

    使用combiners带来的的好处有限,因为很可能所有项对都是唯一的

  • There is no in-memory accumulations

    不能有效利用内存

class Mapper
   method Map(null, items [i1, i2,...] )
      for all item i in [i1, i2,...]
         for all item j in [i1, i2,...]
            Emit(pair [i j], count 1)

class Reducer
   method Reduce(pair [i j], counts [c1, c2,...])
      s = sum([c1, c2,...])
      Emit(pair[i j], count s)

Stripes Approach

The second approach is to group data by the first item in pair and maintain an associative array (“stripe”) where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.

第二种方法是将数据按照pair中的第一项来分组,并维护一个关联数组,数组中存储的是所有关联项的计数。

  • Generates fewer intermediate keys. Hence the framework has less sorting to do.

    中间结果的键数量相对较少,因此减少了排序消耗。

  • Greately benefits from combiners.

    可以有效利用 combiners。

  • Performs in-memory accumulation. This can lead to problems, if not properly implemented.

    可在内存中执行,不过如果没有正确执行的话也会带来问题。

  • More complex implementation.

    实现起来比较复杂。

  • In general, “stripes” is faster than “pairs”

    一般来说,“stripes”比“pairs”更快

class Mapper
   method Map(null, items [i1, i2,...] )
      for all item i in [i1, i2,...]
         H = new AssociativeArray : item -> counter
         for all item j in [i1, i2,...]
            H{j} = H{j} + 1
         Emit(item i, stripe H)

class Reducer
   method Reduce(item i, stripes [H1, H2,...])
      H = new AssociativeArray : item -> counter
      H = merge-sum( [H1, H2,...] )
      for all item j in H.keys()
         Emit(pair [i j], H{j})

Applications:

Text Analysis, Market Analysis

References:

  1. Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce

Relational MapReduce Patterns

In this section we go though the main relational operators and discuss how these operators can implemented in MapReduce terms.

在这部分我们会讨论一下怎么使用MapReduce来进行主要的关系操作。

Selection

class Mapper
   method Map(rowkey key, tuple t)
      if t satisfies the predicate
         Emit(tuple t, null)

Projection

Projection is just a little bit more complex than selection, but we should use a Reducer in this case to eliminate possible duplicates.

投影只比筛选稍微复杂一点,在这种情况下我们可以用Reducer来消除可能的重复值。

class Mapper
   method Map(rowkey key, tuple t)
      tuple g = project(t)  // extract required fields to tuple g
      Emit(tuple g, null)

class Reducer
   method Reduce(tuple t, array n)   // n is an array of nulls
      Emit(tuple t, null)

Union

Mappers are fed by all records of two sets to be united. Reducer is used to eliminate duplicates.

两个数据集中的所有记录都送入Mapper,在Reducer里消重。

class Mapper
   method Map(rowkey key, tuple t)
      Emit(tuple t, null)

class Reducer
   method Reduce(tuple t, array n)   // n is an array of one or two nulls
      Emit(tuple t, null)

Intersection

Mappers are fed by all records of two sets to be intersected. Reducer emits only records that occurred twice. It is possible only if both sets contain this record because record includes primary key and can occur in one set only once.

将两个数据集中需要做交叉的记录输入Mapper,Reducer输出出现了两次的记录。因为每条记录都有一个主键,在每个数据集中只会出现一次,所以这样做是可行的。

class Mapper
   method Map(rowkey key, tuple t)
      Emit(tuple t, null)

class Reducer
   method Reduce(tuple t, array n)   // n is an array of one or two nulls
      if n.size() = 2
          Emit(tuple t, null)

Difference

Let’s we have two sets of records – R and S. We want to compute difference R – S. Mapper emits all tuples and tag which is a name of the set this record came from. Reducer emits only records that came from R but not from S.

假设有两个数据集R和S,我们要找出R与S的差异。Mapper将所有的元组做上标记,表明他们来自于R还是S,Reducer只输出那些存在于R中而不在S中的记录。

class Mapper
   method Map(rowkey key, tuple t)
      Emit(tuple t, string t.SetName)    // t.SetName is either 'R' or 'S'

class Reducer
   method Reduce(tuple t, array n) // array n can be ['R'], ['S'], ['R' 'S'], or ['S', 'R']
      if n.size() = 1 and n[1] = 'R'
          Emit(tuple t, null)

GroupBy and Aggregation

Grouping and aggregation can be performed in one MapReduce job as follows. Mapper extract from each tuple values to group by and aggregate and emits them. Reducer receives values to be aggregated already grouped and calculates an aggregation function. Typical aggregation functions like sum or max can be calculated in a streaming fashion, hence don’t require to handle all values simultaneously. Nevertheless, in some cases two phase MapReduce job may be required – see pattern Distinct Values as an example.

分组聚合可以在如下的一个MapReduce中完成。Mapper抽取数据并将之分组聚合,Reducer中对收到的数据再次聚合。典型的聚合应用比如求和与最值可以以流的方式进行计算,因而不需要同时保有所有的值。但是另外一些情景就必须要两阶段MapReduce,前面提到过的惟一值模式就是一个这种类型的例子。

class Mapper
   method Map(null, tuple [value GroupBy, value AggregateBy, value ...])
      Emit(value GroupBy, value AggregateBy)

class Reducer
   method Reduce(value GroupBy, [v1, v2,...])
      Emit(value GroupBy, aggregate( [v1, v2,...] ) )  // aggregate() : sum(), max(),...

Joining

Joins are perfectly possible in MapReduce framework, but there exist a number of techniques that differ in efficiency and data volumes they are oriented for. In this section we study some basic approaches. The references section contains links to detailed studies of join techniques.

MapReduce框架可以很好地处理连接,不过在面对不同的数据量和处理效率要求的时候还是有一些技巧。在这部分我们会介绍一些基本方法,在后面的参考文档中还列出了一些关于这方面的专题文章。

Repartition Join (Reduce Join, Sort-Merge Join)

This algorithm joins of two sets R and L on some key k. Mapper goes through all tuples from R and L, extracts key k from the tuples, marks tuple with a tag that indicates a set this tuple came from (‘R’ or ‘L’), and emits tagged tuple using k as a key. Reducer receives all tuples for a particular key k and put them into two buckets – for R and for L. When two buckets are filled, Reducer runs nested loop over them and emits a cross join of the buckets. Each emitted tuple is a concatenation R-tuple, L-tuple, and key k. This approach has the following disadvantages:

这个算法按照键K来连接数据集R和L。Mapper遍历R和L中的所有元组,以K为键输出每一个标记了来自于R还是L的元组,Reducer把同一个K的数据分装入两个容器(R和L),然后嵌套循环遍历两个容器中的数据以得到交集,最后输出的每一条结果都包含了R中的数据、L中的数据和K。这种方法有以下缺点:

  • Mapper emits absolutely all data, even for keys that occur only in one set and have no pair in the other.

    Mapper要输出所有的数据,即使一些key只会在一个集合中出现。

  • Reducer should hold all data for one key in the memory. If data doesn’t fit the memory, its Reducer’s responsibility to handle this by some kind of swap.

    Reducer要在内存中保有一个key的所有数据,如果数据量大过了内存,那么就要缓存到硬盘上,这就增加了硬盘IO的消耗。

Nevertheless, Repartition Join is a most generic technique that can be successfully used when other optimized techniques are not applicable.

尽管如此,再分配连接方式仍然是最通用的方法,特别是其他优化技术都不适用的时候。

class Mapper
   method Map(null, tuple [join_key k, value v1, value v2,...])
      Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, ...] ] )

class Reducer
   method Reduce(join_key k, tagged_tuples [t1, t2,...])
      H = new AssociativeArray : set_name -> values
      for all tagged_tuple t in [t1, t2,...]     // separate values into 2 arrays
         H{t.tag}.add(t.values)
      for all values r in H{'R'}            // produce a cross-join of the two arrays
         for all values l in H{'L'}
            Emit(null, [k r l] )

Replicated Join (Map Join, Hash Join)

In practice, it is typical to join a small set with a large one (say, a list of users with a list of log records). Let’s assume that we join two sets – R and L, R is relative small. If so, R can be distributed to all Mappers and each Mapper can load it and index by the join key. The most common and efficient indexing technique here is a hash table. After this, Mapper goes through tuples of the set L and joins them with the corresponding tuples from R that are stored in the hash table. This approach is very effective because there is no need in sorting or transmission of the set L over the network, but set R should be quite small to be distributed to the all Mappers.

在实际应用中,将一个小数据集和一个大数据集连接是很常见的(如用户与日志记录)。假定要连接两个集合R和L,其中R相对较小,这样,可以把R分发给所有的Mapper,每个Mapper都可以载入它并以连接键来索引其中的数据,最常用和有效的索引技术就是哈希表。之后,Mapper遍历L,并将其与存储在哈希表中的R中的相应记录连接。这种方法非常高效,因为不需要对L中的数据排序,也不需要通过网络传送L中的数据,但是R必须足够小到能够分发给所有的Mapper。

class Mapper
   method Initialize
      H = new AssociativeArray : join_key -> tuple from R
      R = loadR()
      for all [ join_key k, tuple [r1, r2,...] ] in R
         H{k} = H{k}.append( [r1, r2,...] )

   method Map(join_key k, tuple l)
      for all tuple r in H{k}
         Emit(null, tuple [k r l] )

References:

  1. Join Algorithms using Map/Reduce
  2. Optimizing Joins in a MapReduce Environment

http://highlyscalable.wordpress.com/2012/02/01/mapreduce-patterns/

http://www.yeolar.com/note/2013/10/16/mapreduce-patterns/

http://www.yeolar.com/note/2013/10/16/mapreduce-patterns/