20131211_BigDataIntro2

Werbung
Einführung in Big Data II
Kurt Stockinger
1
Zürcher Fachhochschule
Lernziele
• Kennen der Details des MapReduce Algorithmus
• Anwenden von MapReduce für Big Data Problemstellungen
• Kennen von Pig und Pig Latin
• Verstehen des Unterschieds zu SQL
2
Zürcher Fachhochschule
Wiederholung: MapReduce
3
Zürcher Fachhochschule
Wörterzählen: Einfacher Pseudo Code
define wordCount as Map<String,long>;
for each document in documentSet {
T = tokenize(document);
for each token in T {
wordCount[token]++;
}
}
display(wordCount);
Funktioniert für “kleine” Datenmengen
4
Zürcher Fachhochschule
Paralleles Programmieren
• Wie erweitert man den Code, sodass er auf mehreren Machinen
parallel läuft?
5
Zürcher Fachhochschule
Wörterzählen: MapReduce Pseudo Code
map(String input_key, String input_value):
// input_key: document name
// input_value: document contents
for each word w in input_value:
EmitIntermediate (w, "1");
reduce(String output_key, Iterator intermediate_values):
// output_key: a word
// output_values: a list of counts
int result = 0;
for each v in intermediate_values:
result += ParseInt(v);
Emit (AsString(result));
6
Zürcher Fachhochschule
Word Count with MR in Hadoop #1
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable,
Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
7
Zürcher Fachhochschule
Word Count with MR in Hadoop #2
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
8
Zürcher Fachhochschule
Word Count with MR in Hadoop #3
public static class Reduce extends MapReduceBase implements Reducer<Text,
IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
9
Zürcher Fachhochschule
Word Count with MR in Hadoop #4
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));
JobClient.runJob(conf);
}
10
Zürcher Fachhochschule
Details zum User Interface #1
• Mapper:
• Maps tranformieren Input Records in temporäre Records (Output Pairs)
• Zugriff auf Output Pairs mittles
OutputCollector.collect(WritableComparable,Writable).
• Reporter wird verwendet, um den Programmfortschritt zu loggen
• Alle temporären Werte für einen Output Key werden gruppiert und an
Reduce weitergereicht:
• Output wird partitioniert: #Partitionen entspricht #Reduce Tasks des Jobs
• Lokale Aggregation mittels JobConf.setCombinerClass(Class):
• Reduziert Netzwerkauslastung in Reduce-Phase
• #Maps?
• Entspricht Grösse der Inputdaten, #Blocks und #Inputdaten
• ~10-100 Maps/Knoten
11
Zürcher Fachhochschule
Details zum User Interface #2
• Reducer:
• Reduziert temporäre Werte zu kleinerer Anzahl.
• #Reducer ist definiert mittels JobConf.setNumReduceTasks(int).
• #Reduces?
• Zwischen 0.95 und 1.75 * #Knoten
12
Zürcher Fachhochschule
Pig
Teilweise basierend auf Slides von Dr. Bill Howe,
University of Washington
13
Zürcher Fachhochschule
Pig / Pig Latin
• Pig ist eine high-level Plattform, um MapReduce Jobs auf Hadoop zu
auszuführen:
• MapReduce Jobs werden automatisch generiert
• Gute Skalierbarkeit für Big Data
• Die Sprache der Plattform heisst Pig Latin:
• Ähnlich wie SQL
• Kein Java-basiertes “low-level” MapReduce notwendig
• Aber:
• User Defined Functions (UDF) können in Java, JavaScript, Python, etc.
geschrieben werden
• Pig wurde von Yahoo Research 2006 entwickelt
• Seit 2007 ist es ein Apache Projekt
14
Zürcher Fachhochschule
Datenmodell
• Atom:
• Integer, string, etc
• Tuple:
• Sequenz von Werten
• Jeder Wert hat einen bestimmten Typ z.B. integer, string
• Bag:
• Kollektion von Tuple
• Können von unterschiedlichem Typ sein
• Duplikate möglich (Unterschied zur Menge)
• Map:
• „Dictionary“
• String-Werte weisen auf einen beliebigen Wert
15
Zürcher Fachhochschule
Beispiel
f1: Atom
f2: Bag
f3: Map
t = < 7, { <1,2>, <3,4>, <5,6>, <3,4>}, [‘Huber’:’Student’] >
Ausdruck
Ergebnis
$0
1
f2
Bag{ <1,2>, <3,4>, <5,6>, <3,4>}
f2.$0
Bag{ <1>, <3>, <5>, <3>}
f3#’Huber’
Atom “Student”
sum(f2.$0)
1+3+5+3 = 12
16
Zürcher Fachhochschule
Wichtigste Operatoren
• LOAD/STORE:
• Laden/Speichern von Daten
• FILTER:
• Auslesen von Tupel (entspricht SQL-Select)
• FOREACH:
• Bearbeiten von Spalten (entspricht SQL-Projekt)
• GROUP:
• Gruppierung von Daten in eine einzige Relation
• COGROUP, inner Join, outer Join:
• Gruppierung bzw. Join in zwei oder mehrere Relationen
• UNION:
• Vereinigung von Relationen
• SPLIT:
• Aufteilung von Relationen
Zürcher Fachhochschule
17
LOAD
• Annahme:
• Datei ist ein Bag, d.h. jeder Datensatz wird als Tupel interpretiert
• Parsing mittles Funktion USING
• Schemadefintion mittles AS
T = LOAD ‘file.txt’ USING PigStorage(‘\t’) AS (f1, f2, f3)
<9, 2, 3>
<4, 2, 2>
<1, 2, 3>
<9, 3, 2>
18
Zürcher Fachhochschule
FILTER
• Werte ausfiltern
• Boolsche Operatoren möglich (AND, OR, etc)
• Reguläre Ausdrücke
Y = FILTER T BY f1 == ‘9’;
T= <9, 2, 3>
<4, 2, 2>
<1, 2, 3>
<9, 3, 2>
Y = <9, 2, 3>
<9, 3, 2>
19
Zürcher Fachhochschule
GROUP
Daten gruppieren
X = GROUP T BY f1;
T= <9, 2, 3>
<4, 2, 2>
<1, 2, 3>
<9, 3, 2>
Y = <1, {<1, 2, 3>}>
<4, {<4, 2, 2>}>
<9, {<9, 2, 3>, <9, 3, 2>}>
Name: “group”
Name: “T”
20
Zürcher Fachhochschule
DISTINCT
• Duplikate eliminieren
X1 = DISTINCT T1
T1=
<9, 2, 3>
<4, 2, 2>
<1, 2, 3>
<9, 2, 3>
X1 =
<9, 2, 3>
<4, 2, 2>
<1, 2, 3>
21
Zürcher Fachhochschule
FOREACH
• Jedes Tupel manipulieren
X = FOREACH T GENERATE f1, f2+f3
Projektion
Y = GROUP T BY f1;
Z = FOREACH Y GENERATE group, Y.($1,$2)
T = <9, 2, 3>
<4, 2, 2>
<1, 2, 3>
<9, 3, 2>
X = <9, 5>
<4, 4>
<1, 5>
<9, 5>
Z = <9, {<2, 3>, <3, 2>}>
<4, {<2 ,2>}>
<1, {<2, 3>}>
22
Zürcher Fachhochschule
COGROUP
C = COGROUP T f1 BY $0, S BY $0;
T = <9, 2, 3>
<4, 2, 2>
<1, 2, 3>
<9, 3, 2>
S = <4, 1>
<5, 2>
<6, 3>
C = <1, {<1, 2, 3>}, {}>
<4, {<4, 2, 2>}, {4,1}>
<5, {}, {<5, 2>} >
<6, {}, {<6, 3>} >
<9, {<9, 2, 3>, <9, 3, 2>}, {} >
23
Zürcher Fachhochschule
JOIN
C = JOIN T BY $0, S BY $1
T = <9, 2, 3>
<4, 2, 2>
<1, 2, 3>
<9, 3, 2>
S = <4, 1>
<5, 2>
<3, 9>
C = <9, 2, 3, 3>
<1, 2, 3, 4>
<9, 3, 2, 3>
24
Zürcher Fachhochschule
Join-Optimierung
• Join-Operator ist nicht automatisch optimiert wie in einer Datenbank
• Replikation (Broadcase Join):
• 1 Tabelle ist klein, andere ist gross
• Repliziere kleine Tabelle, um Netzwerkbandbreite für Reduze-Phase zu
reduzieren
25
Zürcher Fachhochschule
Pig Beispiel: Wörterzählen
input_lines = LOAD '/tmp/my-copy-of-all-pages-on-internet' AS (line:chararray);
-- Extract words from each line and put them into a pig bag
-- datatype, then flatten the bag to get one word on each row
words = FOREACH input_lines GENERATE FLATTEN(TOKENIZE(line)) AS word;
-- filter out any words that are just white spaces
filtered_words = FILTER words BY word MATCHES '\\w+';
-- create a group for each word
word_groups = GROUP filtered_words BY word;
-- count the entries in each group
word_count = FOREACH word_groups GENERATE COUNT(filtered_words) AS count, group AS word;
-- order the records by count
ordered_word_count = ORDER word_count BY count DESC;
STORE ordered_word_count INTO '/tmp/number-of-words-on-internet';
Zürcher Fachhochschule
26
Pig vs. MapReduce
• MapReduce kombiniert drei primitive Operationen:
• Datensätze prozessieren
• Gruppierung
• Prozessiere gruppierte Datensätze
• In Pig sind diese Operation
• explizit
• unabhängig
• vollständig kombinierbar
• Pig hat zusätzliche primitive Operation für:
• Filter
• Projektion
• Vereinigung zwei oder mehrerer Datensätze
27
Zürcher Fachhochschule
Pig Latin vs. SQL
Pig Latin
SQL
Prozedural (Sequenz von Schritten)
Deklarativ (Sequenz von Constraints)
Schema-on-Read: Jeglicher Datensatz ohne
bestimmtes Schema kann gelesen werden
Schema-on-Write: Vordefiniertes
Datenschema
Bulk Lese- und Schreiboperationen, keine
Indizes und Transaktionen
Bulk und Random Lese- und
Schreibeoperationen, Indizes und
Transaktionen
Lazy evaluation: Evaluierung eines Ausdrucks
erst bei Verwendung
Eager evaluation
Verwendung für ETL
Verwendung vor allem für Queries
Implementierung kann spezifiert werden (z.B.
für Join)
User definiert Join, jedoch nicht welche
Implementierung verwendet wird (Query
Optimizer entscheidet)
Einfache Verwendung und Integration von
User Defined Functions (UDF)
UDFs nicht so gut integriert wie logische
Ausdrücke
28
Zürcher Fachhochschule
Pig Latin ist prozedural und erlaubt
Pipelining und Checkpointing
Aufgabe:
•
Join der Sourcen users und clicks
•
Join mit geoinfo
•
Aggregation und Speicherung in
ValuableClicksPerDMA
Pig Latin:
Users = load 'users' as (name, age, ipaddr);
Clicks = load 'clicks' as (user, url, value);
ValuableClicks = filter Clicks by value > 0;
UserClicks = join Users by name, ValuableClicks by user;
SQL:
Geoinfo = load 'geoinfo' as (ipaddr, dma);
UserGeo = join UserClicks by ipaddr, Geoinfo by ipaddr;
ByDMA = group UserGeo by dma;
insert into ValuableClicksPerDMA
select dma, count(*)
ValuableClicksPerDMA = foreach ByDMA generate group,
COUNT(UserGeo);
from geoinfo join (
store ValuableClicksPerDMA into 'ValuableClicksPerDMA';
select name, ipaddr from users join clicks on
(users.name = clicks.user) where value > 0;
) using ipaddr group by dma;
29
Zürcher Fachhochschule
Herunterladen