Data science and Big data Analytics
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Advanced Analytics - Technology and Tools
1Module 5: Advanced Analytics - Technology and Tools
1Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Analytics for Unstructured Data - MapReduce and Hadoop
Advanced Analytics - Technology and Tools
During this lesson the following topics are covered:
• MapReduce & Hadoop
• HDFS – the Hadoop Distributed File System
• YARN – Yet Another Resource Negotiator
2Module 5: Advanced Analytics - Technology and Tools
Lesson 1 introduces the idea behind MapReduce processing, and then describes how Hadoop implements this algorithm. Also, the roles of the Hadoop Distributed File System (HDFS) and Yet Another Resource Negotiator (YARN) are covered.
This lesson covers Data Management: the processing and development of frameworks to work on unstructured data in the terabyte range, and presents extensions to Hadoop that leverage its capabilities.
Lesson 2 covers:
• Hive and Pig – Hadoop query languages
• HBase – a BigTable workalike using Hadoop
• Mahout – machine learning algorithms and Hadoop MapReduce
2Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Putting the Data Analytics Lifecycle into Practice
• MapReduce & Hadoop in Data Analytics Life Cycle ✓Phase 1: Discovery
✓Phase 2: Data Preparation
• Phase 3: Model Planning
✓Phase 4: Model Building
• Phase 5: Communicate results
• Phase 6: Operationalize
• You have “big data,” how can you make it suitable for analysis?
• That is, obtain results & key findings in a timely manner?
• MapReduce represents activities in Phase 1 and 2 of the life cycle in acquire/parse and filter stages and also in Phase 4 with model buliding
3Module 5: Advanced Analytics - Technology and Tools
In the previous module, we discussed various ways to mine and analyse data for information. In this module, you’re going to take a step back and look at another acquire-parse-filter method.
This method is called MapReduce, and it reflects a pattern for processing Big Data and extracting only the data you need.
The MapReduce paradigm fits well with various Data Analytics processes. In our Data Analysis lifecycle, Map and Reduce represent activities from the 1st two phases. Following Ben Fry’s model in Seven Stages of Data Visualization, the actions would be part of the Acquire/Parse/Filter stages (Map [Acquire/Parse] and Reduce [Filter]).
3Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Big Data is the Next Big Thing
4
Online Search Geo Sensor Social Media
Mobile Game Social Network Online Review
Youtube
72 hours of video are
uploaded every minute
2,000,000+ searches
per minute
130,000+ tweets per
minute
Module 5: Advanced Analytics - Technology and Tools
The emerging data ecosystem demonstrates a new economy that is emerging around data – a situation in which data itself has intrinsic value.
Shown are a variety of Big Data sources that are becoming prevalent. Groups are mining data from these sources to drive new value. The real-time analysis of the data gathered, coupled with advanced analytical methods, demands new analytical architectures for storing and analyzing these kinds of data.
4Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Why MapReduce?
“In Pioneer days, they used oxen for heavy pulling.
When one ox couldn’t budge a log, they didn’t try to grow a larger ox…
We shouldn’t be trying to grow bigger computers, but to add more systems of computers.”
Grace Hopper
The MapReduce paradigm helps you add more oxen
By definition, big data is too large to handle by conventional means. Sooner or later, you just can’t scale up anymore
5Module 5: Advanced Analytics - Technology and Tools
You see from Grace Hopper’s quote that the fundamental paradigm of MapReduce is the reduction in time to complete a given task by breaking it down into stages and then executing those stages in parallel.
Such an activity is sometimes called the “master/slave” or “master/worker” pattern, and has been known for a while.
The definition of Big Data asserts that the data is simply too large to handle by conventional means. The usual example is when an RDBMS is initially used to store that data. As performance needs increase, organizations purchase more powerful hardware and then more systems to share the data retrieval and processing. And yet the data keeps on growing.
What to do?
5Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
What is MapReduce?
• A parallel programming model suitable for big data processing Split data into distributable chunks (“shards”)
Define the steps to process those chunks
Run that process in parallel on the chunks
• Scalable by adding more machines to process chunks Leverage commodity hardware to tackle big jobs
• The foundation for Hadoop MapReduce is a parallel programming model
Hadoop is a concrete platform that implements MapReduce
6Module 5: Advanced Analytics - Technology and Tools
As we’ve said, the idea of MapReduce isn’t new (more like old wine in new bottles). What is new in Google’s MapReduce is the parallel processing of data as well as computation.
Here, a set of worker tasks each work on a subset of data where each subset is (usually) physically and logically distinct one from the other.
In the world of databases, this separation is often called “sharding.” Sharding is a technique whereby data is partitioned such that i/o operations can proceed without concern for other users, nor will these queries conflict with other users such that the data must be locked.
A classic example of this is separating a database into country of origin: queries against the US, Canada and Mexico run independently. The difference is that it’s the data that’s distributed as well as the computation.
MapReduce also borrows some elements of functional programming. All data elements in MapReduce are immutable; changing an input (key, value) pair does not change the input files. All communication occurs by generating new output (key, value) pairs and then forwarding the output to a new phase. At base, MapReduce programs transform lists of input data elements into lists of output data element, and do so twice: Once for the Map and once for the Reduce.
6Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
When to Use MapReduce
• Problems that are “embarrassingly parallel”
• Examples Word count
Reverse index
tf-idf
Distributed grep and distributed object recognition ("Where's Waldo?")
Distributed "associative" aggregation (marginalization, sum; mean if you track both numerator and denominator; min or max; count)
Hadoop calls them "combiners"
7Module 5: Advanced Analytics - Technology and Tools 7
For which kinds of problems is MapReduce most suited? Very simply, problems that are “embarrassingly parallel.“ In these situations, the problem can be decomposed into tasks that can be broken into “chunks” that can be distributed, processed, and recombined without communicating with each other.
Examples of such problems are listed on the slide. We’ve met some of these functions before: reverse index, tf-idf, distributed grep (pattern matching).
Marginalization occurs when you use aggregate statistics over certain variables as a way of reducing the number of variables in an analysis by focusing on the aggregation of other variables.
Consider this example.
Assume we have a table where each object has attributes "state", “climate" (wet, dry, temperate), and "% of cats in the US“ (pct_cats). This table provides us with the number of cats broken out by climate and state. If we sum the pct_cats across each state, then we are calculating the marginal probability of cats in each state. If we sum pct_cats across each value of climate, we are calculating the marginal probability of cats as a function of climate.
In statistical terms, the table gives you P(cats | state, climate). Marginalization is thus to aggregate this table up to either P(cats | state), or P(cats | climate), depending. For the SQL inclined, the statement
SELECT SUM(pct_cats) from table GROUP BY state;
would perform this action for the values of state.
7Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
The Map part of MapReduce
• Transform (Map) input values to output values: <k1,v1> <k2,v2>
• Input – Key/Value Pairs For instance, Key = line number, Value = text string
• Map Function Steps to transform input pairs to output pairs
For example, count the different words in the input
• Output – Key/Value Pairs For example, Key = <word>, Value = <count>
• Map output is the input to Reduce
8Module 5: Advanced Analytics - Technology and Tools
The first phase of any MapReduce job is to, well, map.
In the mapping phase, raw data is transformed into a set of <key, value> pairs.
For example, the key might be a line number and the value a text string. The map function will transform this input into a series of output pairs: in this case, a record containing each unique word in the input (the key), and a count of its occurrences (the value). So, for example, the input string “to be or not to be” would result in a series of output records such as [to 2 ; be 2; or 1; not 1] (assume that the “;” characters represents a newline).
8Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
The Reduce Part of MapReduce
• Merge (Reduce) Values from the Map phase Reduce is optional. Sometimes all the work is done in the Mapper
• Input Values for a given Key from all the Mappers
• Reduce Function Steps to combine (Sum?, Count?, Print?,…) the values
• Output Print values?, load into a DB? send to the next MapReduce job?
9Module 5: Advanced Analytics - Technology and Tools
Well, you’ve finished mapping, and now what?
The next task is to run the Reduce step on the input from the Mapper.
Sometimes all the work is done in the Mapper, and you’re done. Otherwise, you use the Reducer to combine the input from the Mapper. Maybe you count it, print it, load it into a database, or save it in a file and load it into R for more analysis.
The MapReduce framework supports a Combiner function as well. In this case, this function would take Mapper output and produce a second output stream for the Reducer component.
For example, you could imagine a combiner function that would take output from multiple mappers and further recombine it before handing it off to the Reducer job. Combiners work well when Mapper output is voluminous: adding a Combiner step may decrease network traffic resulting in better performance.
9Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
The “Word Count” example is the “hello, world” of the Data Analytics world. In this case, we have a millions of documents and hundreds of machines. We want to count the number of times the word “beach” appears in these documents.
Key to understanding this problem is that the Mappers don’t attempt to actually aggregate the count of the number of times the word “beach” appears in a single document. Instead, they simply output a key value pairs consisting of <beach,1>.
It’s up to the Reducers to aggregate the final results and output the single key/value pair <beach, nnn>.
Copyright © 2014 EMC Corporation. All Rights Reserved.
This is the “Hello World”
of MapReduce
Distribute the text of millions of
documents over hundreds of
machines.
(Beach, 1)
(Beach, 1) (Beach, 1)
(Beach, 1) (Beach, 1)
(Beach, 2) (Beach, 1) (Beach, 2)
(Beach, 5)
transition finalize
Reduce
Map
MAPPERS can be word-specific.
They run through the stacks and
shout “One!” every time they see the
word “beach”
REDUCERS listen to all the
Mappers and total the counts for
each word.
Motivating Example: Word Count
10Module 5: Advanced Analytics - Technology and Tools
10Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
e-Discovery can involve processing a huge number of documents. Consider the above example where we need to process over ½ million email messages, and we wish to determine the “social network” in the company, where “social network” is a particular group of people who communicate.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Suppose you have 517,424 emails from an energy company under
indictment.
The social network may be implied by all To:From pairings in the emails,
with reflexivity accounted for.
Date: Thu, 4 May 2000 09:55:00 -0700 (PDT)
From: walt.zimmerman@enron.com
To: michael.burke@enron.com, dana.gibbs@enron.com, lori.maddox@enron.com, susan.ralph@enron.com
Subject: Update on Steve Todoroff Prosecution--CONFIDENTIAL/SUBJECT TO ATTORNEY-CLIENT PRIVILEGE
Cc: steve.duffy@enron.com, stanley.horton@enron.com, jdegeeter@velaw.com
Almost one month ago, Special Agent Carl Wake of the FBI called me about the Steve Todoroff investigation. He indicated that the FBI had recently learned of the article about
EOTT's NGL theft that appeared in the business section of the Houston Chronicle. Mr. Wake said it might be a matter the FBI would like to investigate. I told Mr. Wake that EOTT
was currently working with the Harris County District Attorney on the prosecution of this matter, and I thanked him for the FBI's interest. He told me that the FBI might want to work
with the Harris County District Attorney in investigating this matter, and he stated that there may be investigative information that the FBI can obtain more quickly than the Harris
County District Attorney. Mr. Wake requested a copy of the materials we had provided to the Harris County District Attorney.
In order to avoid damage to the good rapport we have established with Assistant District Attorney Bill Moore, I asked John DeGeeter to call Bill Moore and advise him of the
contact that had been made by the FBI. Bill Moore agreed to call Carl Wake and work with Mr. Wake on his request for the materials provided by EOTT.
Carl Wake called me again yesterday. He has been working with Bill Moore. Mr. Wake stated it was too early to speculate as to what charges would be brought. He did say that
our materials clearly indicated federal wire fraud and possibly mail fraud. He said that where there is wire fraud, there is usually money laundering.
The purpose of Mr. Wake's call yesterday was to inquire about the status of some interview summaries that John DeGeeter and I have prepared and collected at the request of Bill
Moore. Mr. Wake requested that EOTT send a copy of the summaries to him when we sent the summaries to Bill Moore. Those summaries were sent out today.
I gathered from my calls with Carl Wake that the FBI is very interested in taking an active part in this investigation. In order to build on the relationship we have established with
Bill Moore, we will continue to direct our inquires about the investigation to Mr. Moore until he tells us to do otherwise.
Example: Social Triangles (e-discovery)
11Module 5: Advanced Analytics - Technology and Tools
11Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
We build this job as a sequence of MapReduce passes through the data (we can assume that the modified data is written back into HDFS).
The output from Mapper1 is a key/value pair where the key is the sender of the message and the value is a list of the recipients.
The Reducer takes as input the list of sender/recipients and creates a single record for each sender with the aggregated list of all recipients. These are all the people to whom Walt sent email.
<Walt, [Michael, Dan, Lori, Susan]> + <Walt, [Michael, Dan, Lori, Susan]> <Walt, [Dan, Jeff, Ken, Lori, Michael, Susan]).
Copyright © 2014 EMC Corporation. All Rights Reserved.
Mapper1
Maps two regular expression searches:
To: Michael, Dan, Lori, Susan
From: Walt
Emits the outbound directed edge of the social graph:
<Key, Value> = <Walt, [Michael, Dan, Lori, Susan]>
Reducer1
Gets the output from the mapper with different values
<Key, Value> = <Walt, [Michael, Dan, Lori, Susan]>
<Key, Value> = <Walt, [Lori, Susan, Jeff, Ken]>
Unions the values for the second directed edge:
<Key, Value> = <Walt, [Dan, Jeff, Ken, Lori, Michael, Susan]>
From: To:
Social Triangle: First Directed Edge
12Module 5: Advanced Analytics - Technology and Tools
12Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Mapper2 does the opposite.
For each email recipient, it creates a key/value pair of the form <recipient/participant>.
Reducer2 is similar to Reducer1: for each recipient it creates a single output record of the form <recipient/list of senders>. These represent people from whom the person received mail.
Susan received email from [Jeff, Ken, Walt].
Copyright © 2014 EMC Corporation. All Rights Reserved.
Mapper2
Reverses the previous Map:
To: Michael, Dan, Lori, Susan
From: Walt
Emits the inbound directed edge of the social graph:
<Key, Value> = <Susan, Walt>; <Lori, Walt>; <Dan, Walt>; etc
Reducer2
Gets the output from the mapper with different values
<Key, Value> = <Susan, Walt>
<Key, Value> = <Susan, Jeff>
Unions the values for the third directed edge:
<Key, Value> = <Susan, [Jeff, Ken, Walt]>
From:
To:
Social Triangle: Second Directed Edge
13Module 5: Advanced Analytics - Technology and Tools
13Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
The third time is the charm.
Mapper3 takes the combination of inbound [mail sent to] and outbound [mail sent from] and outputs a key value pair of the form <Sender:Recipient, relationship.
In this instance the relationships are defined as reciprocal (Sender sent/received mail to/from the recipient) or directed (person sent mail to the recipient)
Copyright © 2014 EMC Corporation. All Rights Reserved.
Mapper3
Join [inbound] and [outbound] lists by Key
Walt, [Jeff, Ken, Lori, Susan], [Jeff, Lori, Stanley]
Emits <Person, Person> pair with level of association:
<Key, Value> = <Walt: Jeff reciprocal>; <Walt:Stanley directed>,
etc Reducer3
Reducer unions the output of the mappers and presents rules:
<Key, Value> = <Walt::Jeff, reciprocal>
<Key, Value> <Walt::Stanley, directed>
The third reducer can shape the data any way that serves the business
objective.
From:
To:
Social Triangle: Third Directed Edge
14Module 5: Advanced Analytics - Technology and Tools
14Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
What is ……………….._______ ____________ ?
People use “Hadoop” to mean one of four things:
MapReduce paradigm.
HDFS: The Hadoop distributed
file system.
Java Classes for HDFS types
and MapReduce job
management.
Massive unstructured data
storage on commodity
hardware.
>
>
>
>
With Hadoop, you can do MapReduce jobs quickly and efficiently.
(ideas) (actual Hadoop)
15Module 5: Advanced Analytics - Technology and Tools
Unfortunately, people may use the word “Hadoop” to mean multiple things. They may use it to describe the MapReduce paradigm, or they may use if to describe massive unstructured data storage using commodity hardware (although commodity doesn’t mean inexpensive).
On the other hand, they may be referring to the Java classes provided by Hadoop that support HDFS file types or provide MapReduce job management.
Or they may be referring to HDFS: the Hadoop distributed file system. And they might mean both HDFS and MapReduce.
The point is that Hadoop enables the Data Scientist to create MapReduce jobs quickly and efficiently. As we shall see, one can utilize Hadoop at multiple levels: writing MapReduce modules in Java, leveraging streaming mode to write such functions in one of several scripting languages, or utilizing a higher level interface such as Pig or Hive.
The Web site http://hadoop.apache.org/ provides a solid foundation for unstructured data mining and management.
15Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
• A framework for performing big data analytics An implementation of the MapReduce paradigm
Hadoop glues the storage and analytics together and provides reliability, scalability, and management
What do we Mean by Hadoop
Storage (Big Data)
HDFS – Hadoop Distributed File System
Reliable, redundant, distributed file system optimized for large files
MapReduce (Analytics)
Programming model for processing sets of data
Mapping inputs to outputs and reducing the output of multiple Mappers to one (or a few) answer(s)
Two Main Components
16Module 5: Advanced Analytics - Technology and Tools
So what exactly is Hadoop anyway?
The quick answer is that Hadoop is a framework for performing Big Data Analytics, and as such is an implementation of the MapReduce programming model.
Hadoop is comprised of two main components, HDFS for storing big data and MapReduce for big data analytics.
The storage function consists of HDFS (Hadoop Distributed File System) that provides a reliable, redundant, distributed file system optimized for large files.
The analytics functions are provided by MapReduce that consists of a Java API as well as software to implement the services that Hadoop needs to function.
Hadoop glues the storage and analytics together in a framework that provides reliability, scalability, and management of the data.
16Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Hadoop and HDFS
17Module 5: Advanced Analytics - Technology and Tools
Let’s look a little deeper at the HDFS.
Between MapReduce and HDFS, Hadoop supports four different node types (a node is a particular machine on the network).
The NameNode and the DataNode are part of the HDFS implementation. Apache Hadoop has one NameNode and multiple DataNodes (there may be a secondary NameNode as well, but we won’t consider that here).
The NameNode service in Hadoop acts as a regulator/resolver between a client and the various DataNode servers. The NameNode manages that name space by determining which DataNode contains the data requested by the client and redirecting the client to that particular datanode. DataNodes in HDFS are (oddly enough) where the data is actually stored.
Hadoop is “rack aware”: that is, the NameNode and the Jobtracker node utilize a data structure that determines what DataNode is preferred based on the “network distance” between them. Nodes that are “closer” are preferred (same rack, different rack, same datacenter).
The data itself is replicated across racks: this means that a failure in one rack will not halt data access at the expense of possibly slower response. Since HDFS isn’t suitable for near real-time access, this is acceptable in the majority of cases.
17Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Hadoop Operational Modes
• Java MapReduce Mode Write Mapper, Combiner, Reducer functions in Java using Hadoop
Java APIs
Read records one at a time
• Streaming Mode Uses *nix pipes and standard input and output streams
Any language (Python, Ruby, C, Perl, Tcl/Tk, etc.)
Input can be a line at a time, or a stream at a time
18Module 5: Advanced Analytics - Technology and Tools
Hadoop provides two operational modes, where each mode supports a type of interaction with MapReduce and HDFS. Since Hadoop is written in Java and provides Java classes and APIs to access them, Java MapReduce mode writes the mapper, combiner and reducer functions in Java. In the Java MapReduce mode, input data is made available to each function a record at a time.
In contrast, streaming mode supports standard *nix streams (stdin, stdout) and the *nix pipe mechanisms. This means that all the MapReduce functions can be written in any programming or scripting language you desire (C, Ruby, Python, Perl, etc.) Although input can be read a line at a time, some languages support the “slurping” of the entire input stream into memory (Perl is one example).
18Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Hadoop Classes in Java public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
• 4 Arguments (LongWritable, Text, Text, IntWritable)
• Defined as Java Class with Hadoop types
• Emits via output.collect(,) function
• Standard Java coding paradigm
Hadoop defines a set of classes
that extend the scalar classes in
Java (examples: IntWritable,
Text)
Hadoop offers a number of base
classes to provide a framework
for jobs
This Mapper incorporates the
MapReduceBase, Reporter and
OutputCollector classes
explicitly
Module 5: Advanced Analytics - Technology and Tools 19
Here’s an example of defining a Map class MapClass that inherits from the MapReduceBase class and implements the Mapper interface. In this case, it’s a map function that takes as input a key value, a text value, an OutPutCollect, and a reporter.
Hadoop defines Java classes that replace/enhance the standard Java classes: for example, IntWritable instead of Int, Text instead of String.
This is done to optimize i/o operations on the data. Hadoop also defines a series of base classes and interfaces that enables the developer to write Mapper and Reducer classes and focus more closely on the analysis code than the underling technical implementations.
In this example, the MapClass class utilizes the MapReduceBase, the Mapper interface, and an OutputCollector class to perform its functions. Programming Hadoop using Java means that the developer has to learn the appropriate use of these classes and their associated methods.
19Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Example: Hadoop Streaming Mode
Script to invoke Hadoop
1 Hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-
streaming.jar \
2 -input input/ \ # relative to HDFS
3 -output output \ # relative to HDFS
4 -mapper mapper.py \
5 -reducer reducer.py \
6 -file scripts/mapper.py \
7 -file scripts/reducer.py
20Module 5: Advanced Analytics - Technology and Tools
This example shows a Hadoop invocation that uses the streaming interface. Let’s go through each numbered line. Our Mapper and Reducer function are written Python.
1. Hadoop implements the streaming function as a .jar (java archive) file. This line describes the jar file that implements the streaming framework
2. The –input argument defines the location of the input data in HDFS
3. Likewise, the –output switch defines the location for the output data in HDFS
4. The –mapper switch defines the program to run that implements the Map function. This can be a Unix pipeline command, or a single command
5. The –reducer argument names the program to run that implements the reduce function
6. When running using fully-distributed HDFS, the –file arguments denote which programs must be copied to the DataNodes for execution.
7. A separate file argument is required for each program
20Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Simple Example of a Mapper: mapper.py
Code Comments
import sys
for line in sys.stdin # input comes from STDIN
line = line.strip() # strip leading/trailing whitespace
words = line.split() # split line based on whitespace
for word in words: print ‘%s\t %s %(word, 1)
# for each word in the collection words # write word and count of “1”, tab delimited
21Module 5: Advanced Analytics - Technology and Tools
Here are our two functions, written in Python, that perform the map and reduce functions for our wordcount example.
The Mapper function simply reads the data from stdin (the standard input unit) a line at a time; splits the line into tokens (words) based on whitespace (blank, tab, etc.), and then writes each word to the standard output device (stdout) with a count of one.
This isn’t the most robust word count program (professional programmers will know not to use this code), but it’s sufficient as an example.
21Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Code Code, continued
cur_count = 0 if cur_word == word:
cur_word = None cur_count += count
for line in sys.stdin else:
line = line.strip() if cur_word:
word, count = line.split("\t",1) print '%s\t%s' (cur_word, cur_count)
try cur_count, cur_word = (count, word)
count = int(count) if cur_word == word:
except ValueError: print '%s\t%s' %(cur_word, cur_count)
continue
Simple Example of a Reducer: reducer.py
22Module 5: Advanced Analytics - Technology and Tools
The reduce function, reducer.py, is only a little more complicated. It reads a tab-delimited string from the standard input, and aggregates the total values for each word (Hadoop guarantees that the input to a reducer is sorted). For each different word, it outputs the word and the aggregated count. So input of “soft 1; soft 1; soft 1” would be output as “soft 3”.
Another benefit of using streaming mode is that test harnesses are easily constructed. The following *nix shell script tests the execution of mapper.py and reducer.py
cat test.input | mapper.py | sort | reducer.py
Hadoop automatically adds the “sort/shuffle” processing in its
framework.
22Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Putting it all Together: MapReduce and HDFS
Hadoop Distributed File System (HDFS)
Client/Dev
Map Task
Reduce Task
2 1
3
4
23Module 5: Advanced Analytics - Technology and Tools
TaskTracker
Map Task
Reduce Task
TaskTracker TaskTracker
Map Task
JobTracker NameNode
5
6
7
Shuffle and Sort
In the original Hadoop releases, cluster resource management is embedded into Hadoop’s MapReduce functionality. The provided diagram illustrates an overview of the execution of a MapReduce job. The major steps are:
1. The client submits a MapReduce job to the JobTracker which is responsible for the scheduling and monitoring of MapReduce jobs. The JobTracker determines when the necessary resources are available to begin a MapReduce job.
2. From the NameNode, the JobTracker determines where to run the map tasks. Whenever possible, the map tasks are run on the nodes where the specified input files are stored as blocks in HDFS.
3. The JobTracker assigns the map tasks to the TaskTracker running on the identified nodes.
4. A TaskTracker runs its assigned map tasks and reports to the JobTracker the status of the various tasks.
5. As the map tasks complete, the map output, consisting of key/value pairs, is stored to current node’s local disk space in preparation for the shuffle and sort process, which merges and sorts the map output, based on the key.
6. The JobTracker identifies which nodes to run the reduce tasks and informs the proper TaskTracker. Based on the assigned keys, the output from the shuffle and sort process is provided to the appropriate reduce task.
7. Finally, the reduce task output is written to HDFS.
As covered in the lab associated with this lesson, Hadoop provides a Web-based GUI for the NameNode, JobTracker, and TaskTracker.
23Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Hadoop Features and Enhancements
24Module 5: Advanced Analytics - Technology and Tools
• HDFS Secondary NameNode – assists the NameNode, not a backup
High Availability option – active/standby NameNodes
NameNode federation – multiple, independent NameNodes
• Yet Another Resource Negotiator (YARN) Separates resource management from MapReduce
Increases scalability of Hadoop clusters
Enables frameworks beyond MapReduce to be run on the cluster
Use case: social network analysis
There are several HDFS configuration options. Secondary NameNode periodically merges the NameNode’s edit log with the latest file system image (fsimage) in order to keep the log file to a reasonable size. In the event of a NameNode failure, the latest fsimage is used to load a restarted NameNode. Any additional edits not included in the fsimage are then applied to completely restore the NameNode. To minimize the chance of an outage, recent Hadoop releases provide a High Availability option where two NameNodes are maintained. If the active NameNode fails, the standby NameNode can quickly assume control.
As the number of stored files grows, the size of the NameNode increases as it maintains the locations of the replicated blocks for each HDFS file. NameNode federation establishes multiple NameNodes to handle specific HDFS directories assigned during configuration and setup. Since each NameNode is responsible for non-overlapping parts of the HDFS directory structure, these NameNodes can operate independent of each other. The DataNodes are available to store data for any of the federated NameNodes.
In the original Hadoop releases, cluster resource management is embedded into Hadoop’s MapReduce functionality. With the introduction of Yet Another Resource Negotiator (YARN), the resource management responsibilities is separated from the MapReduce functionality. Although transparent to most MapReduce developers, this separation has allowed larger Hadoop clusters to be built as well as to enable other frameworks beyond MapReduce to run on the cluster. Several new frameworks are currently under development (see http://wiki.apache.org/hadoop/PoweredByYarn for examples).
Next, we’ll examine how YARN handles the execution of a MapReduce job.
24Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Putting it all Together: MapReduce and HDFS (YARN)
Hadoop Distributed File System (HDFS)
Client/Dev
Map Task
Reduce Task
2
1
4
6
25Module 5: Advanced Analytics - Technology and Tools
NodeMgr
Map Task
Reduce Task
Map Task
Resource
Manager NameNode
7
8
9
Shuffle and Sort
AppMaster
NodeMgr NodeMgr
AppsMgr
Scheduler
3
5
Under YARN, the content and structure of a MapReduce job is unchanged, but how the scheduling and management of the job is quite different. The JobTracker functionality is now shared by the ResourceManager and the ApplicationMaster (AppMaster). The key steps are:
1. The client submits a MapReduce job to the ResourceManager which schedules the job based on cluster activity.
2. When the Scheduler decides to begin the MapReduce job, the ApplicationsManager (AppsMgr) starts the ApplicationMaster.
3. From the NameNode, the ApplicationMaster determines on which nodes the HDFS blocks are stored and builds an execution plan and resource requirements.
4. From the ResourceManager, the ApplicationMaster requests resources including RAM and specific node or rack names to minimize the transfer of data. The ResourceManager then informs the ApplicationMaster of the granted resources.
5. The ApplicationMaster instructs the NodeManagers (NodeMgr) to dedicate the allocated resources for each Map or Reduce task.
6. The ApplicationMaster starts the Map tasks and monitors their status.
7. The shuffle and sort occurs.
8. The ApplicationMaster starts the Reduce tasks and monitors their status.
9. Finally, the reduce task output is written to HDFS. Also, the job-specific ApplicationMaster is shutdown and its resources released.
25Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Using R with Hadoop
• The brute force way Reading
rcon <- pipe(“hadoop fs –cat output/d99/*”, open=“r”)
readLines(rcon)
close(rcon)
Writing
wcon <- pipe(“hadoop fs –put – dir1/dir2/out.txt”, open=“w”)
cat(…, file=wcon)
close(wcon)
MapReduce
system(“MapReduceJob.sh”, wait=true)
# blocks until done
26Module 5: Advanced Analytics - Technology and Tools
You can access data stored in Hadoop HDFS as well as running MapReduce jobs by using standard R functions. Reading and writing are supported via the pipe() command that either receives information from R or transmits data to R. A MapReduce job can be run synchronously or asynchronously as a shell script: both stdout and stderr can be redirected and captured if required.
26Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Using RHadoop
• 3 packages courtesy of Revolution R rhdfs -- access to hdfs functions
rhbase – access to HBase
rmr – run MapReduce job
• HDFS hdfs.read(), …
• HBase hb.list.tables(), …
• MapReduce mapreduce(input_dir, output_dir,
RMapFunction, RReduceFunction, … )
27Module 5: Advanced Analytics - Technology and Tools
Revolution Analytics has released a set of R packages callef RHadoop that allow direct access to Hadoop HDFS, HBase, and MapReduce. The HBase and HDFS interfaces provide a number of access functions: the goal of the project was to mirror the Java interfaces as much as possible.
mapreduce() is the exception. The input and output directories are specified as would be done with an Hadoop MapReduce job; however, the Map function and the Reduce function are both written in R.
The rhdfs functions are:
• File Manipulations -hdfs.copy, hdfs.move, hdfs.rename, hdfs.delete, hdfs.rm, hdfs.del, hdfs.chown, hdfs.put, hdfs.get
• File Read/Write -hdfs.file, hdfs.write, hdfs.close, hdfs.flush, hdfs.read, hdfs.seek, hdfs.tell, hdfs.line.reader, hdfs.read.text.file
• Directory -hdfs.dircreate, hdfs.mkdir
• Utility -hdfs.ls, hdfs.list.files, hdfs.file.info, hdfs.exists
• Initialization –hdfs.init, hdfs.defaults
The HBase functions are:
• Table Manipulation –hb.new.table, hb.delete.table, hb.describe.table, hb.set.table.mode, hb.regions.table
• Row Read/Write -hb.insert, hb.get, hb.delete, hb.insert.data.frame, hb.get.data.frame, hb.scan
• Utility -hb.list.tables
• Initialization -hb.defaults, hb.init
27Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Check Your Knowledge
1. Why is a combiner function optional in the MapReduce framework?
2. What is the purpose of the Namenode in HDFS? 3. Identify an “embarrassingly parallel” situation from your
current work.
4. What are two benefits of YARN?
28Module 5: Advanced Analytics - Technology and Tools
Please take a moment to answer these questions. Record your answers here.
28Module 5: Advanced Analytics - Technology and Tools
Copyright © 2014 EMC Corporation. All rights reserved.
Copyright © 2014 EMC Corporation. All Rights Reserved.
Advanced Analytics - Technology and Tools
During this lesson the following topics were covered:
• Applying the MapReduce/Hadoop Processing Framework in big data analytics problems for un-structured data
• Differentiating among the various elements of Hadoop
• Naming the components of HDFS
• Identifying the parts of an Hadoop streaming script
Summary
29Module 5: Advanced Analytics - Technology and Tools
Lesson 1 introduced the idea behind MapReduce processing, and then described how Hadoop implements this algorithm.
This lesson covered Data Management: the processing and development of frameworks to work on unstructured data in the terabyte range, and presented extensions to Hadoop that leverage its capabilities.
29Module 5: Advanced Analytics - Technology and Tools