Sprachen, die nach MapReduce kompiliert werden

Werbung
Sprachen, die nach MapReduce kompiliert
werden
Hauptseminar: Multicore-Programmierung
Franz Pius Hübl
10. November 2011
Zusammenfassung
Ziel dieser Arbeit ist es, einen Einblick in die Arbeitsweise verschiedener Sprachen, die nach MapReduce kompiliert werden, zu geben. Hierfür
wurden zwei Beispielsprachen ausgewählt: Pig und Hive. Anhand eines
gemeinsamen Beispiels wird die unterschiedliche Verwendung, sowie Vorund Nachteile der Sprachen betrachtet.
ii
Inhaltsverzeichnis
1 Einleitung
1.1 Was ist MapReduce? . . . . . . . . . . . . . . . . . . . . . . . . .
1.2 Apache Hadoop . . . . . . . . . . . . . . . . . . . . . . . . . . . .
1
1
1
2 Apache Pig
2.1 Pig Latin . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
2.2 Beispiel . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
2.3 Bewertung . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
2
2
5
6
3 Apache Hive
3.1 Hive Architektur
3.2 HiveQL . . . . .
3.3 Beispiel . . . . .
3.4 Bewertung . . . .
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
8
9
9
10
11
4 Zusammenfassung und Ausblick
12
A Wörter zählen mit Apache Hadoop
14
iii
1
Einleitung
In dieser Ausarbeitung werden Sprachen betrachtet, die nach MapReduce kompiliert werden. Das sind Sprachen, die zunächst keine Ähnlichkeit zu MapReduce
Programmen haben und dann mit einem Compiler in eine Folge von MapReduce Programmen überführt werden. Dadurch abstrahiert man von den Folgen
von MapReduce Aufgaben und der Nutzer muss nur noch ein semantisches Programm definieren, ohne die Reihenfolge der MapReduce Schritte beachten zu
müssen. Dadurch können auch technisch weniger versierte Mitarbeiter Abfragen
erstellen, die sonst mit komplexen MapReduce Programmen realisiert werden
müssten. Hierfür gibt es viele Sprachen, von denen zwei, Pig und Hive, in den
folgenden Kapiteln näher betrachtet werden.
1.1
Was ist MapReduce?
Seit der Erfolgsgeschichte von Google existiert ein Verfahren, das von vielen Personen als das parallele Verfahren für die Verarbeitung großer Datenmengen betrachtet wird, MapReduce (Vgl. [DG08]). MapReduce stammt ursprünglich aus
der funktionalen Programmierung und bezeichnet zwei Funktionen, die nacheinander abgearbeitet werden. Zunächst wird mittels einer Map Funktion aus
den Eingabedaten „Key-Value“ Paare erzeugt. Diese Paare werden dann nach
dem Key sortiert und mit der Reduce Funktion kombiniert.
1.2
Apache Hadoop
Hadoop ist ein freies in Java geschriebenes Framework. Es verfügt über ein
eigenes verteiltes Dateisystem, HDFS. Hadoop ist dafür gedacht, große Datenmengen auf Computerclustern zu bearbeiten und Berechnungen mit diesen Daten durchzuführen. Seit 2008 wird Hadoop als Apache Top-Level Projekt geführt. Hadoop enthält eine Implementierung des MapReduce Algorithmus (Vgl.
[HAD]).
2008 und 2009 gewann das Hadoop Projekt den Terabyte Sort Benchmark
Preis als erstes Open-Source Programm. Die Programmiersprachen Pig und Hive
nutzen jeweils Hadoop als Plattform und lassen ihre kompilierten MapReduceProgramme auf der Hadoop Plattform ausführen. In den folgenden Kapiteln
wird zur Verdeutlichung das Beispiel „Wörter zählen“ verwendet. Um einen Vergleich mit einem in Hadoop implementierten Programm zu haben, wurde im
Anhang A das Beispiel als Hadoop Programm implementiert.
1
2
Apache Pig
Apache Pig ist eine Plattform, die dem Nutzer eine Highlevel Sprache (Pig Latin) zur Verfügung stellt, um mit einfachen Anweisungen ein Programm für
Apache Hadoop zu schreiben. Pig ist dafür gedacht, große Datensätze zu analysieren. Hierfür nutzt Pig einen Compiler, der aus den PigLatin Kommandos
Folgen von MapReduce Programmen erzeugt. Diese können dann in dem bereits
vorgestellten Framework Apache Hadoop berechnet werden. Für die Entwickler
von Pig stehen drei Schlüsselmerkmale im Vordergrund (Vgl. [PIG]):
• Einfache Programmierung
Das Programm sollte möglichst einfach zu entwickeln sein. Damit wird
von dem zugrundeliegenden Map Reduce System abstrahiert.
• Optimierungsmöglichkeiten
Nicht die Optimierungsmöglichkeiten, sondern die Semantik des Programms
stehen im Vordergrund. Die Optimierung wird dann vom System selbst
durchgeführt.
• Erweiterbarkeit
Der Nutzer soll mit sogenannten User Defined Functions (UDF) das Programm erweitern können. Dadurch können einfache PigSkripte komplexe
Operationen aufrufen, die beispielsweise als Java Funktion bereits implementiert wurden.
Pig ist als eine Datenflusssprache konzipiert und stellt standardmäßig Operationen bereit um Daten zu analysieren und zu verändern. Im nächsten Kapitel
wird hierauf genauer eingegangen. Zwar wurde Pig für Apache Hadoop entworfen, doch könnte es mit einem geeigneten Compiler auch für andere MapReduce
Frameworks verwendet werden. Im Jahr 2006 wurde Pig von Yahoo entwickelt.
Damals war es für Entwickler eine einfache Möglichkeit, schnell Abfragen auf
großen Datensätzen zu programmieren, ohne eine MapReducestruktur neu implementieren zu müssen. 2007 wurde das Projekt der Apache Software Foundation unterstellt und seit September 2010 wird Pig als Apache Top-Level Projekt
geführt.
2.1
Pig Latin
Pig kann über eine interaktive Shell, eingebettet in einer Host-Sprache (beispielsweise Java), oder über ScriptFiles verwendet werden. In all diesen Fällen
benötigt man die Pig eigene Sprache Pig Latin. Diese wird im folgenden Abschnitt näher beschrieben (Vgl. [ORS+ 08]).
Am Anfang jedes Pig Skriptes steht meist ein Lade Operation. Diese wird
mit dem Schlüsselwort LOAD eingeleitet. Anschließend wird die zu ladende
Datei mit dem Dateipfad angegeben. Diese Datei befindet sich typischerweise
innerhalb des HDFS Dateisystems von Hadoop. Besteht die Datei aus mittels
Tabulator separierten Feldern und aus einem Eintrag pro Zeile, so kann man
nun ein Schema angeben. Dadurch ist es möglich, den einzelnen Spalten Namen
zu geben und Datentypen dafür festzulegen. Im folgenden Beispiel soll die Datei
”file.txt” eingelesen werden und anschließend in File gespeichert werden. Analog
zum Befehl LOAD existiert auch der Befehl STORE, der Daten speichern kann.
2
file.txt:
Hallo
1
Welt!
2
Pig Skript:
File = LOAD ’file.txt’ AS (word:chararray, line:int);
Pig kennt viele Schlüsselworte, die meistens schon von anderen Sprachen bekannt sind und deshalb in ihrer Bedeutung leicht erschlossen werden können.
Beispielsweise wird JOIN, semantisch wie ein Join in einer SQL Sprache, und
FOREACH, semantisch wie eine Foreach-Schleife in Java, verwendet. Durch diese Übertragungen sind viele Kommandos von Pig leicht zu verstehen und bis
auf kleine Syntaxabweichungen leicht zu benutzen. Daher fällt dem Programmierer ein Umstieg auf Pig Latin leicht und man gewöhnt sich sehr schnell an
die Syntax.
A = FOREACH File GENERATE COUNT(word) as count;
C = JOIN A BY count FULL, B BY line;
Pig Latin arbeitet mit verschiedenen Datentypen. Darunter sind „simple Datentypen“ wie Integer, Long Integer, Float, Double, Chararray (ein Character
Array in Unicode UTF-8 Kodierung) und Bytearray. Aus dieses simplen Datentypen können nach Belieben komplexere Datentypen erzeugt werden. Pig
verwendet bei komplexen Datentypen Tupel, Bags (eine Liste von Tupeln) und
Maps (Key-Value Listen).
Tupel:
(1, 2, hallo)
Bag:
{(1,2),(1,3),(2,3)}
oder
(1,2)
(1,3)
(2,3)
Map:
[word#Hallo,line#1]
[word#Welt!,line#2]
Hinzu kommen zahlreiche Operatoren, die auf den verschiedenen Datenobjekten verwendet werden können, wie beispielsweise boolesche Operatoren
(==, <, >, ! =, <=, >=), arithmetische Operatoren (+, −, ∗, /, % ) oder Aggregatfunktionen ( SUM, AVG, MIN, MAX, COUNT). Zusätzlich kann man die
einzelnen Datenobjekte mit einem FILTER auswählen. Im folgenden Beispiel
enthält die Relation Y drei Spalten f1, f2 und f3, die jeweils vom Typ Integer
sind. Die Relation X enthält nach dieser Zeile alle Datenobjekte, die die gesuchte
Bedingung erfüllen. Anschließend werden in der Relation X alle Elemente nach
dem Wert von f1 gruppiert und in Z gespeichert.
3
X = FILTER Y BY (f1==8) OR (NOT (f2+f3 > f1));
Z = GROUP X BY f1;
Reichen die vielen integrierten Funktionen nicht aus, so kann man ganz einfach selbst eine Funktion schreiben und diese dann von Pig aufrufen. Dazu muss
zunächst die JAR-Datei mit dem Schlüsselwort REGISTER Pig mitgeteilt werden. Anschließend kann man die Funktionen der JAR-Datei einfach in dem Pig
Skript aufrufen.
REGISTER myudfs.jar;
A = LOAD ’student_data’ AS (name: chararray, age: int, gpa: float);
B = FOREACH A GENERATE myudfs.UPPER(name);
In der JAR-Datei existiert nun die unten angegebene Klasse UPPER. Die
Methode exec() in dieser Klasse implementiert dann die gewünschte Funktion.
Im Beispiel oben konvertiert UPPER jeden String in Großschreibung und gibt
diesen dann zurück.
1
2
3
4
package myudfs ;
import j a v a . i o . IOException ;
import o r g . apache . p i g . EvalFunc ;
import o r g . apache . p i g . data . Tuple ;
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
p u b l i c c l a s s UPPER e x t e n d s EvalFunc<S t r i n g >{
p u b l i c S t r i n g e x e c ( Tuple i n p u t ) throws IOException {
i f ( i n p u t == n u l l | | i n p u t . s i z e ( ) == 0 ) {
return null ;
} else {
try {
String s t r = ( String ) input . get (0) ;
r e t u r n s t r . toUpperCase ( ) ;
} catch ( Exception e ) {
throw new IOException ( " Caught e x c e p t i o n p r o c e s s i n g
i n p u t row " , e ) ;
}
}
}
}
Die Klasse UPPER (Quelle:[STL])
So ist es möglich, komplizierte Berechnung in eine Java Klasse auszulagern
oder von bereits vorhandenen Bibliotheken zu profitieren. Dadurch verringert
sich die Entwicklungszeit mit Pig enorm. Zusätzlich stehen in der Bibliothek von
Pig zahlreiche Interfaces und Klassen zur Verfügung, die bei der Entwicklung
von benutzerdefinierten Funktionen behilflich sind.
Weiterhin ist es möglich einen Mapreduce Job direkt mit Pig aufzurufen.
Gibt es also bereits fertige MapReduce Jobs, die mit Java und Apache Hadoop
implementiert wurden, so kann dieser Job in Pig direkt aufgerufen und mit
Daten versorgt werden. Hierbei kann jede JAR-Datei verwendet werden, die
auch mit hadoop jar mymr.jar params lauffähig ist. Hierzu verwendet man
das Schlüsselwort MAPREDUCE mit dem gewünschten MapReduce.jar und
zusätzlichen Eingabeparametern. Das Laden und Speichern kann von Pig oder
4
dem MapReduce Job selbst übernommen werden. Im folgenden Beispiel wird
ein MapReduceJob namens WordCount aufgerufen und mit dem Text aus A
befüllt.
A = LOAD ’WordcountInput.txt’;
B = MAPREDUCE ’wordcount.jar’ STORE A INTO ’inputDir’ LOAD ’outputDir’
AS (word:chararray, count: int)
’org.myorg.WordCount inputDir outputDir’;
In der Dokumentation von Pig sind noch viele weitere nützliche Sprachkonstrukte beschrieben, die Pig dem Nutzer bereitstellt (Vgl. [PIG]). Diese aber
alle aufzuzählen, würde den Rahmen dieser Arbeit eindeutig übersteigen. Die
vorgestellten Konstrukte und Mechanismen genügen jedoch, um ein einfaches
Beispiel mit Pig auf einem Hadoop-Cluster auszuprobieren und zu untersuchen.
2.2
Beispiel
Um nun die Leistungsfähigkeit im Vergleich mit Apache Hadoop zu untersuchen,
wird ein einfaches Beispiel (Wörter zählen) genutzt. Die Arbeitsweise von Pig ist
hierbei gut zu erkennen und die Vorteile gegenüber MapReduce mit Hadoop treten deutlich hervor. Der benötigte Java Programmcode für dieses Beispiel findet
sich im Anhang A. Insgesamt benötigt dieses Hadoop MapReduce Programm
circa 60 Zeilen Programmcode. Der Teil mit der eigentlichen Programmlogik
benötigt allerdings nur elf Zeilen. Die restlichen Zeilen sind allein Verwaltungsaufgaben ohne wirkliche Funktionalität, so genannter „glue code“. Im Vergleich
dazu wirkt der Programmcode eines Pig Skriptes ziemlich kurz.
1
2
3
4
5
6
7
A = load ’ input / f i l e . txt ’ ;
B = f o r e a c h A g e n e r a t e f l a t t e n (TOKENIZE( ( c h a r a r r a y ) $0 ) ) a s
word ;
C = f i l t e r B by word matches ’ \ \w+ ’;
D = group C by word ;
E = f o r e a c h D g e n e r a t e COUNT(C) a s count , group a s word ;
F = o r d e r E by count d e s c ;
s t o r e F i n t o ’ output / ’ ;
Wörter zählen als Pig Skript (Quelle:[WCP])
Trotzdem kann man bei erster Betrachtung leicht erkennen, was genau in welcher
Zeile gemacht wird. Die Eingabedatei enthält folgenden Text:
Hallo Welt
Hallo Nutzer
So soll zunächst die Datei eingelesen werden (Zeile 1) und in A gespeichert
werden.
(Hallo Welt)
(Hallo Nutzer)
Anschließend wird pro Zeile eine Liste von Wörtern generiert. Diese Zeile wird
nun aufgespalten, so dass schließlich in C pro Zeile genau ein Wort steht.
5
(Hallo)
(Welt)
(Hallo)
(Nutzer)
Damit ist der Einlesevorgang beendet. In Zeile 4 werden alle vorkommenden
Wörter gruppiert. Ausgabe von D ist ein Tupel, bestehend aus dem Wort und
einer Liste des Wortes.
(Hallo, {(Hallo),(Hallo)}
(Welt,{(Welt)})
(Nutzer,{(Nutzer)})
Die Länge der Listen werden dann in Zeile 5 gezählt und dem Wort zu geordnet.
Die Sortierung im Schritt von E nach F dient nur als Schönheitskorrektur.
(2,Hallo)
(1,Welt)
(1,Nutzer)
Abschließend wird das Ergebnis in einer Ausgabedatei gespeichert.
2.3
Bewertung
In dem vorherigen Beispiel ist der Datenfluss von der Eingabe, über die Verarbeitung und Aggregation hin zur Ausgabe sehr gut zu erkennen. Hier wird der
Vorteil, eine möglichst einfache Programmierung, von Pig ganz besonders deutlich. Zusätzlich zeigt sich eine Abstraktion von MapReduce. Der Nutzer muss
sich in diesem Beispiel keine Gedanken um MapReduce selbst machen. Dies
wird vom Pig Compiler komplett übernommen. Der Pig Compiler zerlegt dabei
das Skript in einzelne MapReduce Jobs, die dann in der richtigen Reihenfolge
ausgeführt das gewünschte Ergebnis liefern. Dies ist ein weiterer Vorteil gegenüber Programmen, die direkt für das Apache Hadoop System entwickelt wurden.
Während man bei diesen nur einen einzelnen MapReduce Schritt pro Programm
hat und nur umständlich eine Folge von mehreren MapReduce Schritten nacheinander schalten kann, wird dies von Apache Pig ganz automatisch erledigt.
Aus dem obigen Beispiel wird so ein Folge von drei MapReduce Schritten.
Der Hauptteil der Arbeit (Zeile 1-5) wird dabei in einem einzelnen Job erledigt.
Das Sortieren und Speichern auf dem Dateisystem wird zu zwei weiteren Jobs
kompiliert. Die genaue Aufteilung der Jobs wird aus Abbildung 1 ersichtlich.
Dies verlangsamt zwar den Ablauf eines Pig Programms, doch wie Abbildung
2 zeigt, sind Pig Programme nur etwa 1,5x so langsam wie selbst geschriebene
Hadoop MapReduce Jobs. Demgegenüber stehen die geringe Größe des Quellcodes und die geringe Dauer an Entwicklungszeit (siehe Abbildung 3). Beides
spricht eher für eine Verwendung von Pig. Das ist wahrscheinlich einer der Gründe, warum bereits 2009 mehr als 50% der Hadoop Jobs, die bei Yahoo ausgeführt
wurden, Pig verwendeten.
6
Abbildung 1: Job Ablaufdiagramm des Beispielprogramms
Abbildung 2: Performancevergleich Hadoop und Pig, (Quelle: [Ols])
7
Abbildung 3: Vergleich Hadoop und Pig in Lines-of-Code und Entwicklungszeit,
(Quelle: [Ols])
3
Apache Hive
Einen Schritt weiter als Yahoo gingen die Entwickler von Facebook, als sie 2008
ihre Entwicklung einer Anfragesprache an ein Hadoop Cluster vorstellten. Ihre Anfragesprache, HiveQL, war nicht mehr als eine reine Datenfluss Sprache
konzipiert, sondern ist kompatibel zum SQL-92 Standard. Dadurch wird in einem weit größerem Maße von der zugrundeliegenden Datenhaltung abstrahiert.
Musste bisher der Nutzer von Hadoop oder auch Pig noch genau wissen, wie die
Daten aussahen und wo welche Datei liegt, so ist dies bei Hive nur noch in geringem Maße nötig. In Hive spielen vor allem Tabellen eine wichtige Rolle. Dies
ist begründet durch die Entwicklungsgeschichte von Hive. Ursprünglich wurden
bei Facebook berechnungsintensive Analysen über Nutzungsverhalten und viele
andere Statistiken hauptsächlich nachts durchgeführt. Dadurch sollte eine hohe
Belastung der Server vermieden werden.
Mit der wachsenden Nutzerzahl und vor allem der stetig größer werdenden
Datenmenge wurde es immer schwieriger, bestimmte Analysen im laufenden Betrieb durchzuführen, ohne die Servicequalität von Facebook zu beeinträchtigen.
Deshalb wurde nach einem System gesucht, das auf einer tabellarisch geordneten Datenmenge basiert. Ziel war es, an das System gestellte Anfragen schneller
beantworten zu können. Mit Hadoop wurde zunächst ein System gefunden, mit
dem große Datenmengen verteilt gespeichert werden können. Anschließend wurde Hive mit der Anfragesprache HiveQL entwickelt.
Wie oben erwähnt ist diese Anfragesprache zum SQL-92 Standard kompatibel. Das hatte zur Folge, dass alle bisherigen Anfragen, die zur Statistik und
Analyse auf den Datenbankserver entwickelt wurden nun bei gleicher Datenhaltung auch im Hadoop Cluster mit Hive wiederverwendet werden konnten.
Außerdem musste nicht für jede Analyse ein eigenes MapReduce Programm in
Java für Hadoop implementiert werden, sondern konnte weiterhin von den Datenanalytikern selbst erledigt werden (Vgl. [TSJ+ 10]). Zu diesem Zweck können
Daten aus dem laufenden Betrieb heraus in das Hive System importiert werden.
Dadurch werden die Analysen unabhängig vom laufenden Betrieb durchführbar
und können auch längere Zeit in Anspruch nehmen als nächtliche CronJobs. Um
Hive nun genauer verstehen zu können, wird im folgenden Kapitel der Aufbau
von Hive näher betrachtet.
8
3.1
Hive Architektur
Abbildung 4: Die Hive Architektur (Quelle: [DZ])
Wie in Abbildung 4 deutlich wird, fungiert HiveQL als zentrale Schnittstelle zwischen der Anfrageschicht, bei der man mittels Webinterface oder JDBC/ODBC Anfrage stellen kann, und der Hadoop Schicht. Diese kann man aufteilen in eine MapReduce und die HDFS Schicht, welche die Daten verteilt
speichert.
HiveQL besitzt einen Parser, Planer, Optimierer und eine Ausführungskomponente, um Anfragen an Hive zu bearbeiten. Zusätzlich hat Hive mehrere
PlugIn Schnittstellen, um den Funktionsumfang von Hive zu erweitern. So können beispielsweise mit Hive auch eigene MapReduce Skripte ausgeführt werden,
ohne dass diese zuerst in Hive neu geschrieben werden müssen. Außerdem ist es
möglich, eigene definierte Funktionen beziehungsweise Aggregationsfunktionen
zu schreiben und diese dann in Hive zu verwenden.
Als eine Art Proxy fungiert die SerDe Komponente, eine Kurzform von „serealization“ und „deserealization“, mit der man auf verschiedene Dateiformate
zugreifen kann. Dadurch wird es unwichtig, ob die Datei in CSV Form oder in
Tabellenform vorliegt. Außerdem kann man auch eigene Speicherformate implementieren, um Hive im eigenen Kontext zu verwenden. Aufgrund von SerDe
sind in Hive alle Daten in Tabellenform verfügbar und können über HiveQL
abgefragt werden.
3.2
HiveQL
Wie bereits erwähnt ist HiveQL zum Standard von SQL aus dem Jahr 1992 kompatibel. Dadurch können Datenanalytiker sehr leicht mit Hive arbeiten, wenn
sie bereits Erfahrung mit SQL haben. Hierzu gehören die Standardanfragen, die
im folgenden Abschnitt näher erläutert werden1 . Anschließend wird mit einem
1 Eine
ausführliche Beschreibung der Hivesyntax findet sich unter [HIV]
9
kleinen Beispiel die Arbeitsweise von Hive näher betrachtet. Bevor man Daten
mit Hive analysieren kann, benötigt man Tabellen mit Daten. Das Erzeugen
einer neuen Tabelle funktioniert mit dem Kommando CREATE:
CREATE TABLE pokes (foo INT, bar STRING);
Damit kennt das System eine Tabelle „pokes“ und deren Spaltendefinition. Die
genaue Speicherung, den Speicherort und die Partitionierung kann man hier
ebenfalls angeben, um bei größeren Tabellen das System effizient zu halten.
Um diese Tabelle mit Daten zu füllen, hat man mehrere Möglichkeiten. Falls
die Daten bereits in einer anderen Tabelle vorliegen oder aus den anderen Tabellen generiert werden können, so arbeitet man mit SELECT und INSERT.
Zusätzlich kann man Textdateien, die bereits tabellarisch formatiert sind, mit
dem Befehl LOAD DATA vom System in eine Tabelle laden.
LOAD DATA LOCAL INPATH ’./examples/files/kv3.txt’
OVERWRITE INTO TABLE pokes;
Anschließend kann man mit den gewohnten SELECT Statements auf die
Tabellen zugreifen. Hierbei ist es möglich, einzelne Spalten zu selektieren, Bedingungen anzugeben, Filter zu verwenden und die Daten zu Gruppieren.
SELECT a.bar, count(*) FROM invites a WHERE a.foo > 0 GROUP BY a.bar;
Besonders hilfreich ist die in SQL Anfragesprache übliche Funktion des Joins.
Während dies bei Hadoop schwer zu realisieren ist, benötigt man hierfür mit
Hive nur eine Select Anfrage.
SELECT pv.pageid, u.age_bkt
FROM page_view p
JOIN user u ON (pv.uhash = u.uhash)
JOIN newuser x on (u.uhash = x.uhash);
Dadurch gewinnt man bei Hive eine sehr große Abstraktion vom eigentlichen
Map Reduce. Dies führt zu einer nützlichen Erweiterung. Im folgenden Beispiel
wird das bereits bekannte Wörterzählen-Beispiel wieder aufgegriffen.
3.3
Beispiel
Zur Verdeutlichung wurde erneut das Beispiel „Wörter zählen“ verwendet. Hierbei ergibt sich jedoch gleich ein Problem. Wie bringt man den Text in eine
tabellarische Form, so dass man Hive verwenden kann?
Text der Eingabedatei:
Hallo Welt
Hallo Nutzer
Eine Lösung ist eine Vorverarbeitung mit einer anderen Programmiersprache,
also beispielsweise mittels Java oder Pig. Anschließend lädt man den Text in
eine vorher erzeugte Hive Tabelle.
Text nach Vorverarbeitung:
Hallo
Welt
10
Hallo
Nutzer
Befehl zum erzeugen der Tabelle:
CREATE TABLE words (word String) STORED AS TEXTFILE;
Befehl zum Laden des Textes:
LOAD DATA LOCAL INPATH ’words.txt’ INTO TABLE words;
Nun hat man die gewünschten Daten in der Tabelle. Das eigentliche HiveQL
Statement zum Zählen der Wörter ist nun nur noch ein einfaches Select Statement mit Gruppierung nach dem Wort und einer Count Aggregationsfunktion.
Selbstverständlich kann man das Ergebnis nun auch noch absteigend nach der
Anzahl der Wörter sortieren.
SELECT word , COUNT(word) AS count
FROM words
GROUP BY word
ORDER BY count DESC;
Dadurch erhält man das gewünschte Ergebnis, welches in dem obigen Beispiel
nur auf der Konsole zurückgegeben wird. Um das Ergebnis zu speichern kann
man es mit INSERT auch in eine Ergebnistabelle einfügen. Die Abarbeitung dieser Query teilt Hive in zwei aufeinanderfolgende MapReduce Jobs auf. Der erste
Job erzeugt aus der Eingabetabelle eine Tabelle mit gruppierten Wörtern und
der Anzahl des Wortes. Im darauffolgenden MapReduce Job wird das Ergebnis
nach der Anzahl geordnet. Die Erzeugung der MapReduce Jobs übernimmt Hive
komplett.
3.4
Bewertung
Das Beispiel zeigt sehr deutlich die einfache Nutzung von Hive auf, aber auch seine Schwächen sind gut erkennbar. Wenn die Daten nicht in tabellarischer Form
gegeben sind, muss man diese zunächst über Drittprogramme einlesen. Dieser
Nachteil ist jedoch zu vernachlässigen, wenn man bedenkt, in welchem Umfeld
Hive entwickelt wurde und eingesetzt wird. Hive soll rechen- und zeitintensive
Datenbankaufgaben von der Datenbank in ein Hadoop Cluster verlagern, ohne
alles neu programmieren zu müssen. Dadurch sind die Daten meist in tabellarischer Form gegeben.
Demgegenüber steht die einfache Nutzung aufgrund der verwendeten SQL
Anfragesprache. Zusätzlich stellt man dem Benutzer mit Join ein vielseitig einsetzbares Hilfmittel zur Verfügung, welches in Hadoop nicht so einfach verwendbar ist.
Hive ist außerdem sehr leicht durch eigene Funktionen erweiterbar. Sogar
eigene MapReduce Jobs und flexible Speicherung der Dateien sind möglich. Für
viele Probleme wird Hive dadurch zu einem leicht einsetzbaren und flexiblen
System.
11
4
Zusammenfassung und Ausblick
In den vorangegangenen Kapiteln wurden die Programmiersprachen Pig und
Hive individuell beleuchtet. In diesem Abschnitt soll nun ein Vergleich von beiden gezogen werden. Dieser Sachverhalt ist aufgrund der gemeinsamen Entwicklungsgeschichte interessant. Facebook hat Hive entwickelt, weil Pig zum
damaligen Zeitpunkt große Performanceverluste bei der Nutzung hatte.
Ein weiteres Problem von Pig bestand im Trainingsaufwand der Entwickler
für den Einsatz einer neuen Programmiersprache. Mit der Entscheidung eine
Programmiersprache zu entwickeln, deren Syntax auf SQL basiert, konnte man
eine „steile Lernkurve“, wie man dies von anderen Programmiersprachen kennt,
vermeiden. Dieses Problem haben auch die Entwickler bei Hadoop und insbesondere auch bei Pig erkannt. Deshalb soll es in Zukunft auch eine SQL API
für Pig geben. Fraglich bleibt, warum die Ingenieure von Pig sich nicht von Anfang an für eine SQL Anfragemöglichkeit entschieden haben. Andererseits hat
man mit Pig eine viel größere Kontrolle wie und in welcher Reihenfolge die Daten verarbeitet werden. Dadurch wird Pig für manchen Nutzer möglicherweise
einfacher zu bedienen sein. Pipeline-Verarbeitungen kann man in Pig leichter
programmieren, was sich in Hive eher schwierig gestaltet.
Beim Typsystem geht Hive einen restriktiveren Weg als Pig, denn bei Pig
muss der Datentyp nicht unbedingt angegeben werden. Dadurch ist in manchen
Situationen der Import von Fremddaten einfacher als bei Hive. Liegen die Daten
jedoch bereits in Tabellenform vor, so sind die Daten schon strukturiert und
haben auch einen Typ. Dadurch wiegt dieser Nachteil von Hive gegenüber Pig
nicht so schwer.
Sowohl Pig als auch Hive unterstützen einen Aufruf von externen Bibliotheken. Dadurch ist es möglich die Funktionen, zu erweitern und neue Funktionalitäten bereitzustellen. Auch lassen sich fertige MapReduce Jobs aufrufen.
Somit ist eine gewisse Rückwärtskompatibilität garantiert, da alle alten Hadoop
MapReduce Programme wiederverwendet werden können.
Ein weiterer Vorteil von Hive gegenüber Pig ist die Exportschnittstelle über
JDBC/ODBC. Dadurch ist es sehr leicht, die Anfragen an das Hadoop Cluster
von einer anderen Programmiersprache zu stellen. Bei Pig muss man hierfür
einen Umweg über das Dateisystem gehen, was zu einer Verschlechterung der
Performanz führt.
Betrachtet man die Performanz der beiden Systeme, so scheint es, dass Hive
schneller ist als Pig. Doch dies ist nur eine Tendenz. Tatsächlich ist es wohl eher
ein gemischtes Ergebnis. So ist bei manchen Anfragen Hive schneller und bei
anderen Pig. In der Tabelle 1 wurde versucht mit einer Gegenüberstellung die
wichtigsten Gemeinsamkeiten und Unterschiede darzustellen.
Abschließend muss man grundsätzlich je nach Anwendung entscheiden, welche Technik beziehungsweise welches System man einsetzt. Gerade bei großen
Datenmengen und komplizierten Systemen gibt es keinen optimalen Lösungsweg. In dieser Arbeit wurde der Fokus vor allem auf die beiden Sprachen Pig
und Hive gelegt, weil sie zusammen in den weltweit größten Clustern eingesetzt
werden. So betreibt Facebook ein Cluster mit etwa 800 Knoten und Yahoo ein
Cluster mit circa 1000 Knoten. Beide Firmen wollen ihre Cluster stark vergrößern und nebenbei auch beweisen, dass ihre Anfragesprache besser als die der
Konkurrenten ist. Es bleibt also abzuwarten, wie sich dieser Konkurrenzkampf
in Zukunft entwickelt.
12
Kriterium
Funktionalität
Programmierschnittstelle
Eingabedatenstruktur
Eingabe/Ausgabeformat
Datenimportunterstützung
Schnittstellen zur
Weiterverarbeitung
Benutzerschnittstellen
Benutzerfreundlichkeit
Eclipse Plugins
Benutzerdefinierte
Funktionen (UDFs)
Lernkurve
Hive
Pig
Hive API
Pig API
strukturiert
unstrukturiert
Rohdaten/Rohdaten
Rohdaten/Rohdaten
Sqoop - Datenimport
keine Toolunterstützung
ODBC/JDBC
CLI, WebGUI
keine Exportschnittstellen
CLI(Grunt)
nicht vorhanden
nutzbar
PigPen
nutzbar
einfache Einarbeitung
durch Nähe zu SQL
lange
Einarbeitung
in Pig eigene ScriptSprache
Leistungsfähigkeit
Performanz
kein Schwerpunkt
Stabilität (Ausfallsicherheit)
Skalierbarkeit
hohe Stabilität durch
HDFS
hoch skalierbar durch
Hadoop Framework
Allgemeine
Projektinformationen
Installationsaufwand Erweiterung von Hadoop
Dokumentation
Tutorials, Wikis
AnwendungsLogfile Analyse, Realszenarien
Time
Webanalyse,
Data Mining, Data
Warehousing
Zielgruppe
Business Analyst mit
SQL Kenntnissen
Lizenzmodell
Anfragesprache
Metadaten
Schemata
Unterstützung
Plattform
tendenziell langsamer
als Hive
hohe Stabilität durch
HDFS
hoch skalierbar durch
Hadoop Framework
Erweiterung von
doop
Tutorials, Wikis
Logfile Analyse
Ha-
APL v2.0 / GPL
HiveQL (deklarativ)
vorhanden
ja (nicht optional)
Programmierer mit Bedarf für Ad-hoc Abfragen
APL v2.0 / GPL
PigLatin (prozedural)
nicht verhanden
nicht benötigt
Apache Hadoop
Apache Hadoop
Tabelle 1: Vergleich zwischen Hive und Pig [VHP]
13
A
1
Wörter zählen mit Apache Hadoop
package o r g . myorg ;
2
3
4
import j a v a . i o . IOException ;
import j a v a . u t i l . ∗ ;
5
6
7
8
9
10
11
12
13
import o r g . apache . hadoop . f s . Path ;
import o r g . apache . hadoop . c o n f . ∗ ;
import o r g . apache . hadoop . i o . ∗ ;
import o r g . apache . hadoop . mapreduce . ∗ ;
import o r g . apache . hadoop . mapreduce . l i b . i n p u t . F i l e I n p u t F o r m a t ;
import o r g . apache . hadoop . mapreduce . l i b . i n p u t . TextInputFormat ;
import
o r g . apache . hadoop . mapreduce . l i b . output . FileOutputFormat ;
import
o r g . apache . hadoop . mapreduce . l i b . output . TextOutputFormat ;
14
15
p u b l i c c l a s s WordCount {
16
17
18
19
p u b l i c s t a t i c c l a s s Map e x t e n d s Mapper<LongWritable , Text ,
Text , I n t W r i t a b l e > {
p r i v a t e f i n a l s t a t i c I n t W r i t a b l e one = new I n t W r i t a b l e ( 1 ) ;
p r i v a t e Text word = new Text ( ) ;
20
p u b l i c v o i d map( LongWritable key , Text v a l u e , Context
c o n t e x t ) throws IOException , I n t e r r u p t e d E x c e p t i o n {
String l i n e = value . toString () ;
S t r i n g T o k e n i z e r t o k e n i z e r = new S t r i n g T o k e n i z e r ( l i n e ) ;
w h i l e ( t o k e n i z e r . hasMoreTokens ( ) ) {
word . s e t ( t o k e n i z e r . nextToken ( ) ) ;
c o n t e x t . w r i t e ( word , one ) ;
}
}
21
22
23
24
25
26
27
28
29
}
30
31
p u b l i c s t a t i c c l a s s Reduce e x t e n d s Reducer<Text ,
I n t W r i t a b l e , Text , I n t W r i t a b l e > {
32
p u b l i c v o i d r e d u c e ( Text key , I t e r a b l e <I n t W r i t a b l e >
v a l u e s , Context c o n t e x t )
throws IOException , I n t e r r u p t e d E x c e p t i o n {
i n t sum = 0 ;
for ( IntWritable val : values ) {
sum += v a l . g e t ( ) ;
}
c o n t e x t . w r i t e ( key , new I n t W r i t a b l e ( sum ) ) ;
}
33
34
35
36
37
38
39
40
41
}
42
43
44
p u b l i c s t a t i c v o i d main ( S t r i n g [ ] a r g s ) throws E x c e p t i o n {
C o n f i g u r a t i o n c o n f = new C o n f i g u r a t i o n ( ) ;
45
46
Job j o b = new Job ( c o n f , " wordcount " ) ;
14
47
j o b . setOutputKeyClass ( Text . c l a s s ) ;
job . setOutputValueClass ( IntWritable . c l a s s ) ;
48
49
50
j o b . s e t M a p p e r C l a s s (Map . c l a s s ) ;
j o b . s e t R e d u c e r C l a s s ( Reduce . c l a s s ) ;
51
52
53
j o b . s e t I n p u t F o r m a t C l a s s ( TextInputFormat . c l a s s ) ;
j o b . setOutputFormatClass ( TextOutputFormat . c l a s s ) ;
54
55
56
F i l e I n p u t F o r m a t . addInputPath ( job , new Path ( a r g s [ 0 ] ) ) ;
FileOutputFormat . setOutputPath ( job , new Path ( a r g s [ 1 ] ) ) ;
57
58
59
j o b . waitForCompletion ( t r u e ) ;
60
}
61
62
63
}
Die Klasse WordCount
(Quelle:[CLO])
für
einen
15
Apache
Hadoop
MapReduce
Job
Literatur
[CLO]
Website von Cloudera. http://www.cloudera.com/
[DG08]
Dean, Jeffrey ; Ghemawat, Sanjay: MapReduce: simplified data
processing on large clusters. In: Commun. ACM 51 (2008), January,
107–113. http://dx.doi.org/10.1145/1327452.1327492. – DOI
10.1145/1327452.1327492. – ISSN 0001–0782
[DZ]
Dhruba, Borthakur ; Zheng, Shao: Hadoop and Hive Development
at Facebook
[HAD]
Website von Apache Hadoop. http://hadoop.apache.org/
[HIV]
Website von Apache Hive. http://hive.apache.org/
[Ols]
Olston, Christopher: Pig - web-scale data processing
[ORS+ 08] Olston, Christopher ; Reed, Benjamin ; Srivastava, Utkarsh ; Kumar, Ravi ; Tomkins, Andrew: Pig latin: a not-so-foreign language
for data processing. In: Proceedings of the 2008 ACM SIGMOD international conference on Management of data. New York, NY, USA
: ACM, 2008 (SIGMOD ’08). – ISBN 978–1–60558–102–6, 1099–1110
[PIG]
Website von Apache Pig. http://pig.apache.org/
[STL]
Stewart, R.J. ; Trinder, P.W. ; Loidl, H.W.: Comparing High
Level MR Query Languages
[TSJ+ 10] Thusoo, A. ; Sarma, J.S. ; Jain, N. ; Shao, Zheng ; Chakka, P.
; Zhang, Ning ; Antony, S. ; Liu, Hao ; Murthy, R.: Hive - a
petabyte scale data warehouse using Hadoop. In: Data Engineering
(ICDE), 2010 IEEE 26th International Conference on, 2010, S. 996
–1005
[VHP]
Vergleich von Hive und Pig. http://wiki.fh-stralsund.de/
[WCP]
Pig
Beispiel.
http://en.wikipedia.org/wiki/Pig_
(programming_language)
16
Herunterladen