Introduction to MapReduce for .NET Developers

.NET, Software Development May 6th, 2009

The basic model for MapReduce derives from the map and reduce concept in functional languages like Lisp.
In Lisp, a map takes as input a function and a sequence of values and applies the function to each value in the sequence.
A reduce takes as input a sequence of elements and combines all the elements using a binary operation (for example, it can use “+” to sum all the elements in the sequence).

MapReduce, inspired by these concepts, was developed as a method for writing processing algorithms for large amounts of raw data. The amount of data is so large that it can’t be stored on a single machine and must be distributed across many machines in order to be processed in a reasonable time.
In systems with such data distribution, the traditional central processing algorithms are useless as just getting the data to the centralized CPU running the algorithm implies huge network costs and months (!) spent on transferring data from the distributed machines.
Therefore, processing such massive scales of distributed data implies the need for parallel computing allowing us to run the required computation “close” to where the data is located.
MapReduce is an abstraction that allows engineers to write such processing algorithms in a way that is easy to parallelize while hiding the complexities of parallelization, data distribution, fault tolerance etc.

This value proposition for MapReduce is outlined in a Google research paper on the topic:

MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model, as shown in the paper.

Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines. The run-time system takes care of the details of partitioning the input data, scheduling the program’s execution across a set of machines, handling machine failures, and managing the required inter-machine communication. This allows programmers without any experience with parallel and distributed systems to easily utilize the resources of a large distributed system.

Our implementation of MapReduce runs on a large cluster of commodity machines and is highly scalable: a typical MapReduce computation processes many terabytes of data on thousands of machines. Programmers find the system easy to use: hundreds of MapReduce programs have been implemented and upwards of one thousand MapReduce jobs are executed on Google’s clusters every day.

The MapReduce Programming Model

As explained earlier, the purpose of MapReduce is to abstract parallel algorithms into a map and reduce functions that can then be executed on a large  scale distributed system.
In order to understand this concept better lets look at a concrete map reduce example – consider the problem of counting the number of occurrences of each word in a large collection of documents:

map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
  EmitIntermediate(w, "1"); 

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
  result += ParseInt(v);
Emit(AsString(result));

The map function goes over the document text and emits each word with an associated value of “1”.

The reduce functions sums together all the values for each word producing the number of occurrences for that word as a result.

First we go through the mapping phase where we go over the input data and create intermediate values as follows:

  • Records from the data source (lines out of files, rows of a database, etc.) are fed into the map function as <key,value> pairs.For example: <filename, file content>
  • The map function produces one or more intermediate values along with an output key from the input

After the mapping phase is over, we go through the reduce phase to process the intermediate values:

  • After the map phase is over, all the intermediate values for a given output key are combined together into a list and fed to the reduce function.
  • The reduce function combines those intermediate values into one or more final values for that same output key

Notice that both the map and the reduce functions run on independent set of input data. Each run of the map function process its own data source and each run of the reduce function processes the values of a different intermediate key.

Therefore both phases can be parallelized with the only bottleneck being the fact that the map phase has to finish for the reduce phase to start.

The underlying system running these method is in takes care of:

  • Initialize a set of workers that can run tasks – map or reduce functions.
  • Take the input data (in our case, lots of document filenames) and send them to the workers to map
  • Streamline values emitted by map function to the worker (or workers) doing the reduce. Note that we don’t have to wait for a certain map run to finish going over the entire file in order to start sending its emitted values to the reducer, so that the system can prepare the data for the reducer while the map function is running
    (In Hadoop – send the map values to the reducer node and andle grouping by key).
  • Handle errors – support a reliable, fault tolerant process as workers may fail, network can crush preventing workers from communicating results, etc.
  • Provides status and monitoring tools.

A Naive Implementation in C#

Lets see how we can build naive MapReduce implementation in C#.

First, we define a generic class to manage our Map-Reduce process:

public class NaiveMapReduceProgram<K1, V1, K2, V2, V3>

The generic types are used the following way:

  • (K1, V1) – key-value types for the input data
  • (K2, V2) – key value types for the intermediate results (results of our Map function)
  • V3 – The type of the result for the entire Map-Reduce process

Next, we’ll define the delegates of our Map and Reduce functions:

public delegate IEnumerable<KeyValuePair<K2, V2>>   MapFunction(K1 key, V1 value);
public delegate IEnumerable<V3>                     ReduceFunction(K2 key, IEnumerable<V2> values);
private MapFunction _map;
private ReduceFunction _reduce;
public NaiveMapReduceProgram(MapFunction mapFunction, ReduceFunction reduceFunction)
{
    _map = mapFunction;
    _reduce = reduceFunction;
}

(Yes, I realize I could use .NET’s Func<T1,T2,TResult> instead but that would just result in horribly long ugly code…)

Now for the actual program execution. The execution flow is as follows: We take the input values, pass them through the map function to get intermediate values, we group those values by key and pass them to the reduce function to get result values.

So first, lets look at the mapping step:

private IEnumerable<KeyValuePair<K2, V2>> Map(IEnumerable<KeyValuePair<K1, V1>> input)
{
    var q = from pair in input
            from mapped in _map(pair.Key, pair.Value)
            select mapped;

    return q;
}

Now after we got the mapped intermediate values we want to reduce them. The Reduce function expects a key and all its mapped values as input so to do that efficiently we want to group the intermediate values by key first and then call the Reduce function for each key.

The output of this process is a V3 value for each of the intermediate K2 keys:

private IEnumerable<KeyValuePair<K2, V3>> Reduce(IEnumerable<KeyValuePair<K2, V2>> intermediateValues)
{
    // First, group intermediate values by key
    var groups = from pair in intermediateValues
                 group pair.Value by pair.Key into g
                 select g;

    // Reduce on each group
    var reduced = from g in groups
                  let k2 = g.Key
                  from reducedValue in _reduce(k2, g)
                  select new KeyValuePair<K2, V3>(k2, reducedValue);

    return reduced;
}

Now that we have the steps code the execution itself is simply defined as Reduce(Map(input)) :

public IEnumerable<KeyValuePair<K2, V3>> Execute(IEnumerable<KeyValuePair<K1, V1>> input)
{
    return Reduce(Map(input));
}

The full source code and tests can be downloaded from here:

Map-Reduce Word Counting Sample – Revisited

Lets go back to the word-counting pseudo code and write it in C#.

The following Map function gets a key and a text value and emits a <word, 1> key-pair for each word in the text:

public IList<KeyValuePair<string, int>> MapFromMem(string key, string value)
{
    List<KeyValuePair<string, int>> result = new List<KeyValuePair<string, int>>();
    foreach (var word in value.Split(' '))
    {
        result.Add(new KeyValuePair<string, int>(word, 1));
    }
    return result;
}

Having calculated a <word, 1> key-pair for each input source, we can group the results by the word and then our Reduce function can sum the values (which are 1 in this case) for each word:

public IEnumerable<int> Reduce(string key, IEnumerable<int> values)
{
    int sum = 0;
    foreach (int value in values)
    {
        sum += value;
    }

    return new int[1] { sum };
}

Our program code looks like this:

MapReduceProgram<string, string, string, int, int> master = new MapReduceProgram<string, string, string, int, int>(MapFromMem, Reduce);
var result = master.Execute(inputData).ToDictionary(key => key.Key, v => v.Value);

The result dictionary contains a <word, number-of-occurrences> pairs.

Other Examples

Distributed LINQ Queries. One of POCs I’m working on using the above naive, LINQ-based implementation, is running a distributed LINQ query. Imagine you have a system where raw data is distributed across several SQL Servers. We can have our map function run a LINQ-to-SQL query on multiple DataContexts in parallel (the value input for the map function – V1 – can be a DataContext) and then reduce it to a single result set. This is probably a naive\simplified implementation of what the guys at Microsoft’s Dryad team are doing.

Count URL Visits. Consider you have several web servers and you want to produce the amount of visits for each page on your site. You can produce pretty much the same way the word-counting example works. The map function parses a log file and produce a <URL, 1> intermediate value. The reduce function then sums the values for each URL and emits <URL, number of visits>

Distributed Grep. You can run a grep search on a large amount of files by having the map function emits a line if it matches a given pattern. The reduce function in this case is just an identity function that copies the supplied intermediate data to the output.

Map-Reduce in the Real World

The real complexity and sophistication in MapReduce is in the underlying system takes care of running and managing the execution of MapReduce jobs. Real world MapReduce implementations, like Google’s system, Hadoop or Dryad have to go beyond the naive implementation shown here and take care of things like resource monitoring, reliability and fault tolerance (for example, handle cases where nodes running map\reduce jobs crush, or go offline due to network problems).

The following resources are worth checking out:

Tags: , , , , ,

Keep Believing!

Humor October 11th, 2008

I don’t get it. A bunch of tech ‘elites’ partying out in Cyprus made a funny video and suddenly they’re turned scapegoats for the entire web industry?

I guess they’re to blame in the housing led recession and over leveraged financials led by bank managers…

Oh well…

Tags: , ,

Scaling Web Application – Recommended Readings

Software Architecture August 13th, 2008

Designing for scale is one of the greatest challenges when building when building web applications for the Internet. The huge scale of the Internet and the amount of potentials users requires applications to be able to handle huge amounts of data and traffic.

Today’s Internet applications has to be design with large scale in mind:

  • It has to be able to accommodate increased usage
  • It has to be able to accommodate increased data volumes.
  • It has to be maintainable

While the need seems obvious, implementing a working solution seems is not trivial, and so we see a lot of new companies that fail to handle the load (Cuil, Twitter, ….)

Joining Nuconomy‘s ranks recently has opened me to the world of web scalability. And so, I’ve had to do quite a lot of reading the past couple of weeks.

I’ve compiled a list of the best resources I came across:

  • The following presentation by Cal Henderson provides a detailed overview of common patterns and approaches when building application for high availability and scale (you might also want to check out his book his book) :

SlideShare Link
(removed their player embed as it was throwing internal exceptions. So much for SlideShare’s QA…)

We describe our experience building a fault-tolerant data-base using the Paxos consensus algorithm.
Despite the existing literature in the field, building such a database proved to be non-trivial. We describe
selected algorithmic and engineering problems encountered, and the solutions we found for them. Our
measurements indicate that we have built a competitive system.

We used the Paxos algorithm (“Paxos”) as the base for a framework that implements a fault-tolerant
log. We then relied on that framework to build a fault-tolerant database. Despite the existing literature on
the subject, building a production system turned out to be a non-trivial task for a variety of reasons:
While Paxos can be described with a page of pseudo-code, our complete implementation contains several
thousand lines of C++ code. The blow-up is not due simply to the fact that we used C++ instead
of pseudo notation, nor because our code style may have been verbose. Converting the algorithm into
a practical, production-ready system involved implementing many features and optimizations – some
published in the literature and some not.

Got some more interesting scalability resources to share?  feel free to leave a comment…

Tags: , , , ,

Introducing Kampyle – The Next Generation of Online Feedback

Software Industry February 19th, 2008

My friend Eran from Kampyle invited me to be the first to try out and review their new service, now in private beta stages.
The company, based in Ramat Gan, offers a new service for website owners that will help them collect and manage user’s feedback.

Reinventing the Feedback Form

According to Eran, most companies today use simple email forms to track feedback from end-users. This mailbox tends to become flooded as companies fail handling the overwhelming amount of feedback and lack the human resources required to read and analyze all the responses, to act upon these feedbacks and to respond to these feedbacks.

Kampyle aims to offer a service that can give insights that are usually only available to companies that are able to employ a large customer support organization – the ability to track, analyze, manage, act and respond to a large amount of feedbacks without requiring a significant human effort.

Making Sense of it All

Managing Feedbacks

One of the most important aspects of making sense of a large amount of comments is figuring out the similarities and grouping feedbacks according to topics.
Besides the obvious grouping by feedback type and sub-type, the Kampyle team is working on an algorithm that can also figure out similarities and group according to the actual content of the feedbacks. It can also provide an automatically generated summary for a group of feedbacks.

Kampyle_FeedbackInbox

Besides the actual feedback, Kampyle also collects contextual information (like resolution, OS, browser version…) which may prove useful in understanding the feedback.

Kampyle_FeedbackInbox_Context

Analyzing Feedbacks

One of the important aspects of managing feedbacks is the ability prioritize the most important issues and analyze the possible causes.
Kampyle’s Feedback Analytics dashboard provides an overview on the site’s feedbacks. Besides the regular analytics features that illustrate the amount of feedbacks and the rate they’re being received, overview of feedbacks by grade or by type, the Feedback Analytics screen displays information to help with decision making – which topics are the most important (most reported) and require attention and an analysis on the possible causes…

Acting and Responding to Feedbacks

The whole purpose of the management and analytics screen is to allow you to figure out what is the input you’re end users are trying to provide you with, and act upon this information. End users like being listened to, and what better way to let them know you care about their feedback (which is not just thrown into a flooded unmonitored mailbox anymore) than to respond to their feedback?

Using Kampyle you can quickly respond to a group of users who gave feedback on an issue.

Final Thoughts

Every site and every new startup wants to gather feedbacks from its users. Developing such a system and dealing with the processing complexities is, by definition, not part of the company’s core. So I think a lot of site owners will appreciate Kampyle for taking that task off their shoulders and providing them with an out-of-the-box service that provides them both actual feedback and insights, allowing them to concentrate on their core product – their site.

By the way, I’ve added Kampyle’s feedback button to my blog’s side panel on the right (and to the bottom) of this post.
Let’s hear some feedback about this site… What do you think about the content?  Did you notice the new design?  :-)

Tags: , ,

Microsoft May Have Won the Battle but…

Software Industry June 2nd, 2006

We’ve seen many new technologies at Microsoft’s TechEd Eilat 2006.
Microsoft rules our desktops. It keeps pushing on new technologies that will make development and deployment for desktop clients easier and provide richer user experience and shorter time-to-market.

As Microsoft keeps concentrating on desktop clients its competitors, such as Google and Yahoo!, turn to internet-based services as a way to provide all sorts of services to users. By providing basic services that are hosted, maintained, and are usually free or require minimal charge, those companies are leading the trend of web based services over desktop software and that means big competition to Microsoft’s desktop platform.

Although AJAX and web based services have a very big hype now days, Microsoft was actually doing this long ago with Outlook Web Access and Sharepoint but unfortunately failed to realize the full potential of these.

There is a very interesting article at Forbes describing the changes Microsoft is undergoing in order to compete in this new market of web based services: http://money.cnn.com/magazines/fortune/fortune_archive/2006/05/01/8375454/index.htm

Note that this is not the first time Microsoft fails to capture the potential of the web.
And some of us still remember a company called Netscape that ruled the web before the giant fro Redmond woke…

The future of this battle should be interesting…

Technorati : , , ,
Del.icio.us : , , ,
Ice Rocket : , , ,

Tags: , , ,