The basic model for MapReduce derives from the map and reduce concept in functional languages like Lisp.
In Lisp, a map takes as input a function and a sequence of values and applies the function to each value in the sequence.
A reduce takes as input a sequence of elements and combines all the elements using a binary operation (for example, it can use “+” to sum all the elements in the sequence).

MapReduce, inspired by these concepts, was developed as a method for writing processing algorithms for large amounts of raw data. The amount of data is so large that it can’t be stored on a single machine and must be distributed across many machines in order to be processed in a reasonable time.
In systems with such data distribution, the traditional central processing algorithms are useless as just getting the data to the centralized CPU running the algorithm implies huge network costs and months (!) spent on transferring data from the distributed machines.
Therefore, processing such massive scales of distributed data implies the need for parallel computing allowing us to run the required computation “close” to where the data is located.
MapReduce is an abstraction that allows engineers to write such processing algorithms in a way that is easy to parallelize while hiding the complexities of parallelization, data distribution, fault tolerance etc.

This value proposition for MapReduce is outlined in a Google research paper on the topic:

MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model, as shown in the paper.

Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines. The run-time system takes care of the details of partitioning the input data, scheduling the program’s execution across a set of machines, handling machine failures, and managing the required inter-machine communication. This allows programmers without any experience with parallel and distributed systems to easily utilize the resources of a large distributed system.

Our implementation of MapReduce runs on a large cluster of commodity machines and is highly scalable: a typical MapReduce computation processes many terabytes of data on thousands of machines. Programmers find the system easy to use: hundreds of MapReduce programs have been implemented and upwards of one thousand MapReduce jobs are executed on Google’s clusters every day.

The MapReduce Programming Model

As explained earlier, the purpose of MapReduce is to abstract parallel algorithms into a map and reduce functions that can then be executed on a large  scale distributed system.
In order to understand this concept better lets look at a concrete map reduce example – consider the problem of counting the number of occurrences of each word in a large collection of documents:

map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
  EmitIntermediate(w, "1"); 

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
  result += ParseInt(v);
Emit(AsString(result));

The map function goes over the document text and emits each word with an associated value of “1”.

The reduce functions sums together all the values for each word producing the number of occurrences for that word as a result.

First we go through the mapping phase where we go over the input data and create intermediate values as follows:

  • Records from the data source (lines out of files, rows of a database, etc.) are fed into the map function as <key,value> pairs.For example: <filename, file content>
  • The map function produces one or more intermediate values along with an output key from the input

After the mapping phase is over, we go through the reduce phase to process the intermediate values:

  • After the map phase is over, all the intermediate values for a given output key are combined together into a list and fed to the reduce function.
  • The reduce function combines those intermediate values into one or more final values for that same output key

Notice that both the map and the reduce functions run on independent set of input data. Each run of the map function process its own data source and each run of the reduce function processes the values of a different intermediate key.

Therefore both phases can be parallelized with the only bottleneck being the fact that the map phase has to finish for the reduce phase to start.

The underlying system running these method is in takes care of:

  • Initialize a set of workers that can run tasks – map or reduce functions.
  • Take the input data (in our case, lots of document filenames) and send them to the workers to map
  • Streamline values emitted by map function to the worker (or workers) doing the reduce. Note that we don’t have to wait for a certain map run to finish going over the entire file in order to start sending its emitted values to the reducer, so that the system can prepare the data for the reducer while the map function is running
    (In Hadoop – send the map values to the reducer node and andle grouping by key).
  • Handle errors – support a reliable, fault tolerant process as workers may fail, network can crush preventing workers from communicating results, etc.
  • Provides status and monitoring tools.

A Naive Implementation in C#

Lets see how we can build naive MapReduce implementation in C#.

First, we define a generic class to manage our Map-Reduce process:

public class NaiveMapReduceProgram<K1, V1, K2, V2, V3>

The generic types are used the following way:

  • (K1, V1) – key-value types for the input data
  • (K2, V2) – key value types for the intermediate results (results of our Map function)
  • V3 – The type of the result for the entire Map-Reduce process

Next, we’ll define the delegates of our Map and Reduce functions:

public delegate IEnumerable<KeyValuePair<K2, V2>>   MapFunction(K1 key, V1 value);
public delegate IEnumerable<V3>                     ReduceFunction(K2 key, IEnumerable<V2> values);
private MapFunction _map;
private ReduceFunction _reduce;
public NaiveMapReduceProgram(MapFunction mapFunction, ReduceFunction reduceFunction)
{
    _map = mapFunction;
    _reduce = reduceFunction;
}

(Yes, I realize I could use .NET’s Func<T1,T2,TResult> instead but that would just result in horribly long ugly code…)

Now for the actual program execution. The execution flow is as follows: We take the input values, pass them through the map function to get intermediate values, we group those values by key and pass them to the reduce function to get result values.

So first, lets look at the mapping step:

private IEnumerable<KeyValuePair<K2, V2>> Map(IEnumerable<KeyValuePair<K1, V1>> input)
{
    var q = from pair in input
            from mapped in _map(pair.Key, pair.Value)
            select mapped;

    return q;
}

Now after we got the mapped intermediate values we want to reduce them. The Reduce function expects a key and all its mapped values as input so to do that efficiently we want to group the intermediate values by key first and then call the Reduce function for each key.

The output of this process is a V3 value for each of the intermediate K2 keys:

private IEnumerable<KeyValuePair<K2, V3>> Reduce(IEnumerable<KeyValuePair<K2, V2>> intermediateValues)
{
    // First, group intermediate values by key
    var groups = from pair in intermediateValues
                 group pair.Value by pair.Key into g
                 select g;

    // Reduce on each group
    var reduced = from g in groups
                  let k2 = g.Key
                  from reducedValue in _reduce(k2, g)
                  select new KeyValuePair<K2, V3>(k2, reducedValue);

    return reduced;
}

Now that we have the steps code the execution itself is simply defined as Reduce(Map(input)) :

public IEnumerable<KeyValuePair<K2, V3>> Execute(IEnumerable<KeyValuePair<K1, V1>> input)
{
    return Reduce(Map(input));
}

The full source code and tests can be downloaded from here:

Map-Reduce Word Counting Sample – Revisited

Lets go back to the word-counting pseudo code and write it in C#.

The following Map function gets a key and a text value and emits a <word, 1> key-pair for each word in the text:

public IList<KeyValuePair<string, int>> MapFromMem(string key, string value)
{
    List<KeyValuePair<string, int>> result = new List<KeyValuePair<string, int>>();
    foreach (var word in value.Split(' '))
    {
        result.Add(new KeyValuePair<string, int>(word, 1));
    }
    return result;
}

Having calculated a <word, 1> key-pair for each input source, we can group the results by the word and then our Reduce function can sum the values (which are 1 in this case) for each word:

public IEnumerable<int> Reduce(string key, IEnumerable<int> values)
{
    int sum = 0;
    foreach (int value in values)
    {
        sum += value;
    }

    return new int[1] { sum };
}

Our program code looks like this:

MapReduceProgram<string, string, string, int, int> master = new MapReduceProgram<string, string, string, int, int>(MapFromMem, Reduce);
var result = master.Execute(inputData).ToDictionary(key => key.Key, v => v.Value);

The result dictionary contains a <word, number-of-occurrences> pairs.

Other Examples

Distributed LINQ Queries. One of POCs I’m working on using the above naive, LINQ-based implementation, is running a distributed LINQ query. Imagine you have a system where raw data is distributed across several SQL Servers. We can have our map function run a LINQ-to-SQL query on multiple DataContexts in parallel (the value input for the map function – V1 – can be a DataContext) and then reduce it to a single result set. This is probably a naive\simplified implementation of what the guys at Microsoft’s Dryad team are doing.

Count URL Visits. Consider you have several web servers and you want to produce the amount of visits for each page on your site. You can produce pretty much the same way the word-counting example works. The map function parses a log file and produce a <URL, 1> intermediate value. The reduce function then sums the values for each URL and emits <URL, number of visits>

Distributed Grep. You can run a grep search on a large amount of files by having the map function emits a line if it matches a given pattern. The reduce function in this case is just an identity function that copies the supplied intermediate data to the output.

Map-Reduce in the Real World

The real complexity and sophistication in MapReduce is in the underlying system takes care of running and managing the execution of MapReduce jobs. Real world MapReduce implementations, like Google’s system, Hadoop or Dryad have to go beyond the naive implementation shown here and take care of things like resource monitoring, reliability and fault tolerance (for example, handle cases where nodes running map\reduce jobs crush, or go offline due to network problems).

The following resources are worth checking out:

Similar Posts:

    None Found

Tags: , , , , ,


16 Comments to “Introduction to MapReduce for .NET Developers”

  1. avi | May 12th, 2009 at 7:45 am

    thanks,
    great post, very interesting.

    BTW’ the line
    MapReduceProgram master = new MapReduceProgram(MapFromMem, Reduce);
    is cut

  2. ekampf | May 12th, 2009 at 2:12 pm

    Thanks :)
    Its not cut, just for some reason it won’t wrap like the other text :S will try to fix…

  3. Paul Batum | May 13th, 2009 at 5:41 am

    Great post, but there is one thing that confuses me.

    First you say:
    “Therefore both phases can be parallelized with the only bottleneck being the fact that the map phase has to finish for the reduce phase to start.”

    Then you say:
    “Streamline values emitted by map function to the worker (or workers) doing the reduce. Note that we don’t have to wait for a certain map run to finish going over the entire file in order to start sending its emitted values to the reducer, so the map and reduce functions can process data in parallel.”

    Am I missing something, or are these two statements contradictory?

  4. ekampf | May 13th, 2009 at 11:22 am

    Hi Paul,
    You’re right. Those two statements seems contradicting but they’re actually both true and the post which isn’t correctly explained in the post (which I’ll have to correct)

    The first statement is obviously correct because you can’t start running our reducer method before you have all the values for the intermediate key – which means the mapper has to finish running.

    However, if you download Hadoop and run one of its samples you’ll see that it start running the Reducer tasks before the Mapper tasks finish.
    This happens because the reducer task is composed of two phases:
    1. Shuffle phase: collect the map outputs and group by key
    2. Reduce phase: the actual reducing

    Phase one of the reducer can execute in parallel with the mapping process and that is what the 2nd statement is referring to – you can start streaming and grouping values from the mappers to the reducers during the mapping process.

    Hope this clarifies your confusion…

    Regards,
    Eran

  5. Paul Batum | May 13th, 2009 at 1:31 pm

    Unless I am missing something, you can also start actual the reducing without all the data. Because the results of the reduce operation look the same as the results of the map operation, you can re-reduce a combination of previously reduced results and newly mapped results.

  6. ekampf | May 13th, 2009 at 1:35 pm

    The reducer’s result of the reduce operation do not look like the result of the map operation. That’s why on the sample code I define the following:
    * (K1, V1) – key-value types for the input data
    * (K2, V2) – key value types for the intermediate results (results of our Map function)
    * V3 – The type of the result for the entire Map-Reduce process

    The reducer gets as input an intermediate-key and all the values mapped to it in the mapping phase.
    It runs over those values and outputs a result (V3).
    You can’t run the reduce until you have that list of all the values mapped to a certain key – so you have to wait for the mapping phase to finish.

  7. ekampf | May 13th, 2009 at 1:48 pm

    Oh yeah, depending on your algorithm you may be able to add another layer of reducers. They’re called Combiners in Hadoop (http://wiki.apache.org/hadoop/HadoopMapReduce):

    “When the map operation outputs its pairs they are already available in memory. For efficiency reasons, sometimes it makes sense to take advantage of this fact by supplying a combiner class to perform a reduce-type function. If a combiner is used then the map key-value pairs are not immediately written to the output. Instead they will be collected in lists, one list per each key value. When a certain number of key-value pairs have been written, this buffer is flushed by passing all the values of each key to the combiner’s reduce method and outputting the key-value pairs of the combine operation as if they were created by the original map operation.

    For example, a word count MapReduce application whose map operation outputs (word, 1) pairs as words are encountered in the input can use a combiner to speed up processing. A combine operation will start gathering the output in in-memory lists (instead of on disk), one list per word. Once a certain number of pairs is output, the combine operation will be called once per unique word with the list available as an iterator. The combiner then emits (word, count-in-this-part-of-the-input) pairs. From the viewpoint of the Reduce operation this contains the same information as the original Map output, but there should be far fewer pairs output to disk and read from disk. “

  8. Paul Batum | May 13th, 2009 at 1:50 pm

    Heh, I’m still confused.

    Can you explain why your MapFromMem and Reduce methods at the end don’t use the key argument they are passed?

  9. ekampf | May 13th, 2009 at 2:03 pm

    They don’t need to use the key in this sample.

    The Map function:
    Gets a key and value where the value is a text to scan. It iterates over the words in the text and outputs values

    The Reduce function:
    Gets a key (word) and list of values (lots of 1′s) and return a sum of the values which is the number of the word was encountered.
    Its the code calling the reduce delegate function that uses the key:

    // Reduce on each group
    var reduced = from g in groups
    let k2 = g.Key
    from reducedValue in _reduce(k2, g)
    select new KeyValuePair(k2, reducedValue);

    So the output of the entire process is an list of pairs…

    The key matters to the result of each operation but not to the actual code of the map\reduce delegates as the framework running the delegates takes care of the work related to the keys (grouping etc.)

  10. George Dernikos | July 16th, 2009 at 8:10 pm

    @ekampf

    First of all hi there, nice post..

    As I was reading the post you I didn’t quite understand the “(Yes, I realize I could use .NET’s Func instead but that would just result in horribly long ugly code…)” part. I kind of disagree on that.

    Here’s my *naive* implementation of MapReduce to reason my disagreement, in which I use Func:

    public static IEnumerable MapReduce(
    this IEnumerable source,
    Func mapper,
    Func grouper,
    Func<IGrouping, TReduceResult> reducer)
    {
    return source.Select(mapper).GroupBy(grouper).Select(reducer);
    }

    that’s it..
    and a small example, say I want to get word counts from a list of documents:

    var documents = new[] {
    new {
    Name = "doc1.txt",
    Data = "word1 word2 word3 word4 word2 word2 word4 word4 word4"
    },
    new {
    Name = "doc2.txt",
    Data = "word5 word6 word7 word8 word6 word6 word7 word7 word7"
    }
    };

    var docs = documents.MapReduce(
    d => d,
    d => d.Name,
    d => new
    {
    FileName = d.Key,
    WordOccurences = d.Single().Data.Split(' ').MapReduce(
    w => new { Word = w, Occurence = 1 },
    w => w.Word,
    w => new { Word = w.Key, Occurence = w.Sum(wi => wi.Occurence) })
    });

    foreach (var d in docs)
    {
    Console.WriteLine(d.FileName);
    foreach (var w in d.WordOccurences)
    Console.WriteLine("\t{0}: {1}", w.Word, w.Occurence);
    }

    Note: I use MapReduce to iterate over the list of documents as well over the list of words of each document.
    I don’t think you get long and ugly code, on the contrary, in my opinion it’s more beautiful.

    Anyhow, it was a nice article of yours and if only we had a Querable version of a MapReduce method that could not only run on a bunch of threads but on a cluster too, that would be great :).

  11. George Dernikos | July 16th, 2009 at 8:15 pm

    Oops! Something happened to the code..
    Anyhow, here’s the snippet: http://csharp.pastebin.com/f6f8e34a0

  12. ekampf | July 18th, 2009 at 11:36 pm

    Hi George,
    What I meant by that sentence is that I could have used Func etc. instead of defining my own delegate types: MapFunction, ReduceFunction which would have made the function signature longer and less readable.

    I like your LINQ implementation, which also less explicit and provides more freedom. However for my tutorial I thought more explicit code (specifying the key, value types for each step) would help convey the concept better…

    In any case, both codes are just samples and you really need the power of a distributed platform (Hadoop, Dryad etc…) to really make use of the advantages map-reduce has to offer…

    Regards,
    Eran

  13. CraigH | August 6th, 2009 at 4:23 pm

    You key/value pair definitions are slightly different to the original Google spec.

    You specify
    Map –> Intermediates –> Reduce as
    (K1, V1) –-> (K2, V2) –-> V3

    where google has
    Map –> Intermediates –> Reduce as
    (K1, V1) –-> (K2, list(V2)) –-> V2

    I thought Hadoop was the same as Google?

    See also a C++ implementation of MapReduce at http://www.craighenderson.co.uk/mapreduce which I hope to get through a submission to Boost.org sometime in the future.

  14. MohitT | January 15th, 2010 at 1:23 pm

    Hi,

    Would it be good, if you complete this example for naive userd like me, by deploying this in one of the cloud providers like Windows Azure.

    Regards
    Mohit

  15. Is the a .NET connector for Hadoop? - Quora | January 15th, 2011 at 8:21 pm

    [...] Hadoop on a Windows machine)1 Comment • Insert a dynamic date hereAlso, this is worth a read – http://www.developerzen.com/2009…Eran Kampf • Insert a dynamic date hereView 1 CommentCannot add comment at this [...]

  16. Is the a .NET connector for Hadoop? - Quora | January 15th, 2011 at 8:21 pm

    [...] I've yet to hear of someone to install Hadoop on a Windows machine)Also, this is worth a read:http://www.developerzen.com/2009…1 Comment • Insert a dynamic date hereAlso, this is worth a read – [...]