Sprachen, die nach MapReduce kompiliert werden Hauptseminar: Multicore-Programmierung Franz Pius Hübl 10. November 2011 Zusammenfassung Ziel dieser Arbeit ist es, einen Einblick in die Arbeitsweise verschiedener Sprachen, die nach MapReduce kompiliert werden, zu geben. Hierfür wurden zwei Beispielsprachen ausgewählt: Pig und Hive. Anhand eines gemeinsamen Beispiels wird die unterschiedliche Verwendung, sowie Vorund Nachteile der Sprachen betrachtet. ii Inhaltsverzeichnis 1 Einleitung 1.1 Was ist MapReduce? . . . . . . . . . . . . . . . . . . . . . . . . . 1.2 Apache Hadoop . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1 1 1 2 Apache Pig 2.1 Pig Latin . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 2.2 Beispiel . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 2.3 Bewertung . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 2 2 5 6 3 Apache Hive 3.1 Hive Architektur 3.2 HiveQL . . . . . 3.3 Beispiel . . . . . 3.4 Bewertung . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 8 9 9 10 11 4 Zusammenfassung und Ausblick 12 A Wörter zählen mit Apache Hadoop 14 iii 1 Einleitung In dieser Ausarbeitung werden Sprachen betrachtet, die nach MapReduce kompiliert werden. Das sind Sprachen, die zunächst keine Ähnlichkeit zu MapReduce Programmen haben und dann mit einem Compiler in eine Folge von MapReduce Programmen überführt werden. Dadurch abstrahiert man von den Folgen von MapReduce Aufgaben und der Nutzer muss nur noch ein semantisches Programm definieren, ohne die Reihenfolge der MapReduce Schritte beachten zu müssen. Dadurch können auch technisch weniger versierte Mitarbeiter Abfragen erstellen, die sonst mit komplexen MapReduce Programmen realisiert werden müssten. Hierfür gibt es viele Sprachen, von denen zwei, Pig und Hive, in den folgenden Kapiteln näher betrachtet werden. 1.1 Was ist MapReduce? Seit der Erfolgsgeschichte von Google existiert ein Verfahren, das von vielen Personen als das parallele Verfahren für die Verarbeitung großer Datenmengen betrachtet wird, MapReduce (Vgl. [DG08]). MapReduce stammt ursprünglich aus der funktionalen Programmierung und bezeichnet zwei Funktionen, die nacheinander abgearbeitet werden. Zunächst wird mittels einer Map Funktion aus den Eingabedaten „Key-Value“ Paare erzeugt. Diese Paare werden dann nach dem Key sortiert und mit der Reduce Funktion kombiniert. 1.2 Apache Hadoop Hadoop ist ein freies in Java geschriebenes Framework. Es verfügt über ein eigenes verteiltes Dateisystem, HDFS. Hadoop ist dafür gedacht, große Datenmengen auf Computerclustern zu bearbeiten und Berechnungen mit diesen Daten durchzuführen. Seit 2008 wird Hadoop als Apache Top-Level Projekt geführt. Hadoop enthält eine Implementierung des MapReduce Algorithmus (Vgl. [HAD]). 2008 und 2009 gewann das Hadoop Projekt den Terabyte Sort Benchmark Preis als erstes Open-Source Programm. Die Programmiersprachen Pig und Hive nutzen jeweils Hadoop als Plattform und lassen ihre kompilierten MapReduceProgramme auf der Hadoop Plattform ausführen. In den folgenden Kapiteln wird zur Verdeutlichung das Beispiel „Wörter zählen“ verwendet. Um einen Vergleich mit einem in Hadoop implementierten Programm zu haben, wurde im Anhang A das Beispiel als Hadoop Programm implementiert. 1 2 Apache Pig Apache Pig ist eine Plattform, die dem Nutzer eine Highlevel Sprache (Pig Latin) zur Verfügung stellt, um mit einfachen Anweisungen ein Programm für Apache Hadoop zu schreiben. Pig ist dafür gedacht, große Datensätze zu analysieren. Hierfür nutzt Pig einen Compiler, der aus den PigLatin Kommandos Folgen von MapReduce Programmen erzeugt. Diese können dann in dem bereits vorgestellten Framework Apache Hadoop berechnet werden. Für die Entwickler von Pig stehen drei Schlüsselmerkmale im Vordergrund (Vgl. [PIG]): • Einfache Programmierung Das Programm sollte möglichst einfach zu entwickeln sein. Damit wird von dem zugrundeliegenden Map Reduce System abstrahiert. • Optimierungsmöglichkeiten Nicht die Optimierungsmöglichkeiten, sondern die Semantik des Programms stehen im Vordergrund. Die Optimierung wird dann vom System selbst durchgeführt. • Erweiterbarkeit Der Nutzer soll mit sogenannten User Defined Functions (UDF) das Programm erweitern können. Dadurch können einfache PigSkripte komplexe Operationen aufrufen, die beispielsweise als Java Funktion bereits implementiert wurden. Pig ist als eine Datenflusssprache konzipiert und stellt standardmäßig Operationen bereit um Daten zu analysieren und zu verändern. Im nächsten Kapitel wird hierauf genauer eingegangen. Zwar wurde Pig für Apache Hadoop entworfen, doch könnte es mit einem geeigneten Compiler auch für andere MapReduce Frameworks verwendet werden. Im Jahr 2006 wurde Pig von Yahoo entwickelt. Damals war es für Entwickler eine einfache Möglichkeit, schnell Abfragen auf großen Datensätzen zu programmieren, ohne eine MapReducestruktur neu implementieren zu müssen. 2007 wurde das Projekt der Apache Software Foundation unterstellt und seit September 2010 wird Pig als Apache Top-Level Projekt geführt. 2.1 Pig Latin Pig kann über eine interaktive Shell, eingebettet in einer Host-Sprache (beispielsweise Java), oder über ScriptFiles verwendet werden. In all diesen Fällen benötigt man die Pig eigene Sprache Pig Latin. Diese wird im folgenden Abschnitt näher beschrieben (Vgl. [ORS+ 08]). Am Anfang jedes Pig Skriptes steht meist ein Lade Operation. Diese wird mit dem Schlüsselwort LOAD eingeleitet. Anschließend wird die zu ladende Datei mit dem Dateipfad angegeben. Diese Datei befindet sich typischerweise innerhalb des HDFS Dateisystems von Hadoop. Besteht die Datei aus mittels Tabulator separierten Feldern und aus einem Eintrag pro Zeile, so kann man nun ein Schema angeben. Dadurch ist es möglich, den einzelnen Spalten Namen zu geben und Datentypen dafür festzulegen. Im folgenden Beispiel soll die Datei ”file.txt” eingelesen werden und anschließend in File gespeichert werden. Analog zum Befehl LOAD existiert auch der Befehl STORE, der Daten speichern kann. 2 file.txt: Hallo 1 Welt! 2 Pig Skript: File = LOAD ’file.txt’ AS (word:chararray, line:int); Pig kennt viele Schlüsselworte, die meistens schon von anderen Sprachen bekannt sind und deshalb in ihrer Bedeutung leicht erschlossen werden können. Beispielsweise wird JOIN, semantisch wie ein Join in einer SQL Sprache, und FOREACH, semantisch wie eine Foreach-Schleife in Java, verwendet. Durch diese Übertragungen sind viele Kommandos von Pig leicht zu verstehen und bis auf kleine Syntaxabweichungen leicht zu benutzen. Daher fällt dem Programmierer ein Umstieg auf Pig Latin leicht und man gewöhnt sich sehr schnell an die Syntax. A = FOREACH File GENERATE COUNT(word) as count; C = JOIN A BY count FULL, B BY line; Pig Latin arbeitet mit verschiedenen Datentypen. Darunter sind „simple Datentypen“ wie Integer, Long Integer, Float, Double, Chararray (ein Character Array in Unicode UTF-8 Kodierung) und Bytearray. Aus dieses simplen Datentypen können nach Belieben komplexere Datentypen erzeugt werden. Pig verwendet bei komplexen Datentypen Tupel, Bags (eine Liste von Tupeln) und Maps (Key-Value Listen). Tupel: (1, 2, hallo) Bag: {(1,2),(1,3),(2,3)} oder (1,2) (1,3) (2,3) Map: [word#Hallo,line#1] [word#Welt!,line#2] Hinzu kommen zahlreiche Operatoren, die auf den verschiedenen Datenobjekten verwendet werden können, wie beispielsweise boolesche Operatoren (==, <, >, ! =, <=, >=), arithmetische Operatoren (+, −, ∗, /, % ) oder Aggregatfunktionen ( SUM, AVG, MIN, MAX, COUNT). Zusätzlich kann man die einzelnen Datenobjekte mit einem FILTER auswählen. Im folgenden Beispiel enthält die Relation Y drei Spalten f1, f2 und f3, die jeweils vom Typ Integer sind. Die Relation X enthält nach dieser Zeile alle Datenobjekte, die die gesuchte Bedingung erfüllen. Anschließend werden in der Relation X alle Elemente nach dem Wert von f1 gruppiert und in Z gespeichert. 3 X = FILTER Y BY (f1==8) OR (NOT (f2+f3 > f1)); Z = GROUP X BY f1; Reichen die vielen integrierten Funktionen nicht aus, so kann man ganz einfach selbst eine Funktion schreiben und diese dann von Pig aufrufen. Dazu muss zunächst die JAR-Datei mit dem Schlüsselwort REGISTER Pig mitgeteilt werden. Anschließend kann man die Funktionen der JAR-Datei einfach in dem Pig Skript aufrufen. REGISTER myudfs.jar; A = LOAD ’student_data’ AS (name: chararray, age: int, gpa: float); B = FOREACH A GENERATE myudfs.UPPER(name); In der JAR-Datei existiert nun die unten angegebene Klasse UPPER. Die Methode exec() in dieser Klasse implementiert dann die gewünschte Funktion. Im Beispiel oben konvertiert UPPER jeden String in Großschreibung und gibt diesen dann zurück. 1 2 3 4 package myudfs ; import j a v a . i o . IOException ; import o r g . apache . p i g . EvalFunc ; import o r g . apache . p i g . data . Tuple ; 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 p u b l i c c l a s s UPPER e x t e n d s EvalFunc<S t r i n g >{ p u b l i c S t r i n g e x e c ( Tuple i n p u t ) throws IOException { i f ( i n p u t == n u l l | | i n p u t . s i z e ( ) == 0 ) { return null ; } else { try { String s t r = ( String ) input . get (0) ; r e t u r n s t r . toUpperCase ( ) ; } catch ( Exception e ) { throw new IOException ( " Caught e x c e p t i o n p r o c e s s i n g i n p u t row " , e ) ; } } } } Die Klasse UPPER (Quelle:[STL]) So ist es möglich, komplizierte Berechnung in eine Java Klasse auszulagern oder von bereits vorhandenen Bibliotheken zu profitieren. Dadurch verringert sich die Entwicklungszeit mit Pig enorm. Zusätzlich stehen in der Bibliothek von Pig zahlreiche Interfaces und Klassen zur Verfügung, die bei der Entwicklung von benutzerdefinierten Funktionen behilflich sind. Weiterhin ist es möglich einen Mapreduce Job direkt mit Pig aufzurufen. Gibt es also bereits fertige MapReduce Jobs, die mit Java und Apache Hadoop implementiert wurden, so kann dieser Job in Pig direkt aufgerufen und mit Daten versorgt werden. Hierbei kann jede JAR-Datei verwendet werden, die auch mit hadoop jar mymr.jar params lauffähig ist. Hierzu verwendet man das Schlüsselwort MAPREDUCE mit dem gewünschten MapReduce.jar und zusätzlichen Eingabeparametern. Das Laden und Speichern kann von Pig oder 4 dem MapReduce Job selbst übernommen werden. Im folgenden Beispiel wird ein MapReduceJob namens WordCount aufgerufen und mit dem Text aus A befüllt. A = LOAD ’WordcountInput.txt’; B = MAPREDUCE ’wordcount.jar’ STORE A INTO ’inputDir’ LOAD ’outputDir’ AS (word:chararray, count: int) ’org.myorg.WordCount inputDir outputDir’; In der Dokumentation von Pig sind noch viele weitere nützliche Sprachkonstrukte beschrieben, die Pig dem Nutzer bereitstellt (Vgl. [PIG]). Diese aber alle aufzuzählen, würde den Rahmen dieser Arbeit eindeutig übersteigen. Die vorgestellten Konstrukte und Mechanismen genügen jedoch, um ein einfaches Beispiel mit Pig auf einem Hadoop-Cluster auszuprobieren und zu untersuchen. 2.2 Beispiel Um nun die Leistungsfähigkeit im Vergleich mit Apache Hadoop zu untersuchen, wird ein einfaches Beispiel (Wörter zählen) genutzt. Die Arbeitsweise von Pig ist hierbei gut zu erkennen und die Vorteile gegenüber MapReduce mit Hadoop treten deutlich hervor. Der benötigte Java Programmcode für dieses Beispiel findet sich im Anhang A. Insgesamt benötigt dieses Hadoop MapReduce Programm circa 60 Zeilen Programmcode. Der Teil mit der eigentlichen Programmlogik benötigt allerdings nur elf Zeilen. Die restlichen Zeilen sind allein Verwaltungsaufgaben ohne wirkliche Funktionalität, so genannter „glue code“. Im Vergleich dazu wirkt der Programmcode eines Pig Skriptes ziemlich kurz. 1 2 3 4 5 6 7 A = load ’ input / f i l e . txt ’ ; B = f o r e a c h A g e n e r a t e f l a t t e n (TOKENIZE( ( c h a r a r r a y ) $0 ) ) a s word ; C = f i l t e r B by word matches ’ \ \w+ ’; D = group C by word ; E = f o r e a c h D g e n e r a t e COUNT(C) a s count , group a s word ; F = o r d e r E by count d e s c ; s t o r e F i n t o ’ output / ’ ; Wörter zählen als Pig Skript (Quelle:[WCP]) Trotzdem kann man bei erster Betrachtung leicht erkennen, was genau in welcher Zeile gemacht wird. Die Eingabedatei enthält folgenden Text: Hallo Welt Hallo Nutzer So soll zunächst die Datei eingelesen werden (Zeile 1) und in A gespeichert werden. (Hallo Welt) (Hallo Nutzer) Anschließend wird pro Zeile eine Liste von Wörtern generiert. Diese Zeile wird nun aufgespalten, so dass schließlich in C pro Zeile genau ein Wort steht. 5 (Hallo) (Welt) (Hallo) (Nutzer) Damit ist der Einlesevorgang beendet. In Zeile 4 werden alle vorkommenden Wörter gruppiert. Ausgabe von D ist ein Tupel, bestehend aus dem Wort und einer Liste des Wortes. (Hallo, {(Hallo),(Hallo)} (Welt,{(Welt)}) (Nutzer,{(Nutzer)}) Die Länge der Listen werden dann in Zeile 5 gezählt und dem Wort zu geordnet. Die Sortierung im Schritt von E nach F dient nur als Schönheitskorrektur. (2,Hallo) (1,Welt) (1,Nutzer) Abschließend wird das Ergebnis in einer Ausgabedatei gespeichert. 2.3 Bewertung In dem vorherigen Beispiel ist der Datenfluss von der Eingabe, über die Verarbeitung und Aggregation hin zur Ausgabe sehr gut zu erkennen. Hier wird der Vorteil, eine möglichst einfache Programmierung, von Pig ganz besonders deutlich. Zusätzlich zeigt sich eine Abstraktion von MapReduce. Der Nutzer muss sich in diesem Beispiel keine Gedanken um MapReduce selbst machen. Dies wird vom Pig Compiler komplett übernommen. Der Pig Compiler zerlegt dabei das Skript in einzelne MapReduce Jobs, die dann in der richtigen Reihenfolge ausgeführt das gewünschte Ergebnis liefern. Dies ist ein weiterer Vorteil gegenüber Programmen, die direkt für das Apache Hadoop System entwickelt wurden. Während man bei diesen nur einen einzelnen MapReduce Schritt pro Programm hat und nur umständlich eine Folge von mehreren MapReduce Schritten nacheinander schalten kann, wird dies von Apache Pig ganz automatisch erledigt. Aus dem obigen Beispiel wird so ein Folge von drei MapReduce Schritten. Der Hauptteil der Arbeit (Zeile 1-5) wird dabei in einem einzelnen Job erledigt. Das Sortieren und Speichern auf dem Dateisystem wird zu zwei weiteren Jobs kompiliert. Die genaue Aufteilung der Jobs wird aus Abbildung 1 ersichtlich. Dies verlangsamt zwar den Ablauf eines Pig Programms, doch wie Abbildung 2 zeigt, sind Pig Programme nur etwa 1,5x so langsam wie selbst geschriebene Hadoop MapReduce Jobs. Demgegenüber stehen die geringe Größe des Quellcodes und die geringe Dauer an Entwicklungszeit (siehe Abbildung 3). Beides spricht eher für eine Verwendung von Pig. Das ist wahrscheinlich einer der Gründe, warum bereits 2009 mehr als 50% der Hadoop Jobs, die bei Yahoo ausgeführt wurden, Pig verwendeten. 6 Abbildung 1: Job Ablaufdiagramm des Beispielprogramms Abbildung 2: Performancevergleich Hadoop und Pig, (Quelle: [Ols]) 7 Abbildung 3: Vergleich Hadoop und Pig in Lines-of-Code und Entwicklungszeit, (Quelle: [Ols]) 3 Apache Hive Einen Schritt weiter als Yahoo gingen die Entwickler von Facebook, als sie 2008 ihre Entwicklung einer Anfragesprache an ein Hadoop Cluster vorstellten. Ihre Anfragesprache, HiveQL, war nicht mehr als eine reine Datenfluss Sprache konzipiert, sondern ist kompatibel zum SQL-92 Standard. Dadurch wird in einem weit größerem Maße von der zugrundeliegenden Datenhaltung abstrahiert. Musste bisher der Nutzer von Hadoop oder auch Pig noch genau wissen, wie die Daten aussahen und wo welche Datei liegt, so ist dies bei Hive nur noch in geringem Maße nötig. In Hive spielen vor allem Tabellen eine wichtige Rolle. Dies ist begründet durch die Entwicklungsgeschichte von Hive. Ursprünglich wurden bei Facebook berechnungsintensive Analysen über Nutzungsverhalten und viele andere Statistiken hauptsächlich nachts durchgeführt. Dadurch sollte eine hohe Belastung der Server vermieden werden. Mit der wachsenden Nutzerzahl und vor allem der stetig größer werdenden Datenmenge wurde es immer schwieriger, bestimmte Analysen im laufenden Betrieb durchzuführen, ohne die Servicequalität von Facebook zu beeinträchtigen. Deshalb wurde nach einem System gesucht, das auf einer tabellarisch geordneten Datenmenge basiert. Ziel war es, an das System gestellte Anfragen schneller beantworten zu können. Mit Hadoop wurde zunächst ein System gefunden, mit dem große Datenmengen verteilt gespeichert werden können. Anschließend wurde Hive mit der Anfragesprache HiveQL entwickelt. Wie oben erwähnt ist diese Anfragesprache zum SQL-92 Standard kompatibel. Das hatte zur Folge, dass alle bisherigen Anfragen, die zur Statistik und Analyse auf den Datenbankserver entwickelt wurden nun bei gleicher Datenhaltung auch im Hadoop Cluster mit Hive wiederverwendet werden konnten. Außerdem musste nicht für jede Analyse ein eigenes MapReduce Programm in Java für Hadoop implementiert werden, sondern konnte weiterhin von den Datenanalytikern selbst erledigt werden (Vgl. [TSJ+ 10]). Zu diesem Zweck können Daten aus dem laufenden Betrieb heraus in das Hive System importiert werden. Dadurch werden die Analysen unabhängig vom laufenden Betrieb durchführbar und können auch längere Zeit in Anspruch nehmen als nächtliche CronJobs. Um Hive nun genauer verstehen zu können, wird im folgenden Kapitel der Aufbau von Hive näher betrachtet. 8 3.1 Hive Architektur Abbildung 4: Die Hive Architektur (Quelle: [DZ]) Wie in Abbildung 4 deutlich wird, fungiert HiveQL als zentrale Schnittstelle zwischen der Anfrageschicht, bei der man mittels Webinterface oder JDBC/ODBC Anfrage stellen kann, und der Hadoop Schicht. Diese kann man aufteilen in eine MapReduce und die HDFS Schicht, welche die Daten verteilt speichert. HiveQL besitzt einen Parser, Planer, Optimierer und eine Ausführungskomponente, um Anfragen an Hive zu bearbeiten. Zusätzlich hat Hive mehrere PlugIn Schnittstellen, um den Funktionsumfang von Hive zu erweitern. So können beispielsweise mit Hive auch eigene MapReduce Skripte ausgeführt werden, ohne dass diese zuerst in Hive neu geschrieben werden müssen. Außerdem ist es möglich, eigene definierte Funktionen beziehungsweise Aggregationsfunktionen zu schreiben und diese dann in Hive zu verwenden. Als eine Art Proxy fungiert die SerDe Komponente, eine Kurzform von „serealization“ und „deserealization“, mit der man auf verschiedene Dateiformate zugreifen kann. Dadurch wird es unwichtig, ob die Datei in CSV Form oder in Tabellenform vorliegt. Außerdem kann man auch eigene Speicherformate implementieren, um Hive im eigenen Kontext zu verwenden. Aufgrund von SerDe sind in Hive alle Daten in Tabellenform verfügbar und können über HiveQL abgefragt werden. 3.2 HiveQL Wie bereits erwähnt ist HiveQL zum Standard von SQL aus dem Jahr 1992 kompatibel. Dadurch können Datenanalytiker sehr leicht mit Hive arbeiten, wenn sie bereits Erfahrung mit SQL haben. Hierzu gehören die Standardanfragen, die im folgenden Abschnitt näher erläutert werden1 . Anschließend wird mit einem 1 Eine ausführliche Beschreibung der Hivesyntax findet sich unter [HIV] 9 kleinen Beispiel die Arbeitsweise von Hive näher betrachtet. Bevor man Daten mit Hive analysieren kann, benötigt man Tabellen mit Daten. Das Erzeugen einer neuen Tabelle funktioniert mit dem Kommando CREATE: CREATE TABLE pokes (foo INT, bar STRING); Damit kennt das System eine Tabelle „pokes“ und deren Spaltendefinition. Die genaue Speicherung, den Speicherort und die Partitionierung kann man hier ebenfalls angeben, um bei größeren Tabellen das System effizient zu halten. Um diese Tabelle mit Daten zu füllen, hat man mehrere Möglichkeiten. Falls die Daten bereits in einer anderen Tabelle vorliegen oder aus den anderen Tabellen generiert werden können, so arbeitet man mit SELECT und INSERT. Zusätzlich kann man Textdateien, die bereits tabellarisch formatiert sind, mit dem Befehl LOAD DATA vom System in eine Tabelle laden. LOAD DATA LOCAL INPATH ’./examples/files/kv3.txt’ OVERWRITE INTO TABLE pokes; Anschließend kann man mit den gewohnten SELECT Statements auf die Tabellen zugreifen. Hierbei ist es möglich, einzelne Spalten zu selektieren, Bedingungen anzugeben, Filter zu verwenden und die Daten zu Gruppieren. SELECT a.bar, count(*) FROM invites a WHERE a.foo > 0 GROUP BY a.bar; Besonders hilfreich ist die in SQL Anfragesprache übliche Funktion des Joins. Während dies bei Hadoop schwer zu realisieren ist, benötigt man hierfür mit Hive nur eine Select Anfrage. SELECT pv.pageid, u.age_bkt FROM page_view p JOIN user u ON (pv.uhash = u.uhash) JOIN newuser x on (u.uhash = x.uhash); Dadurch gewinnt man bei Hive eine sehr große Abstraktion vom eigentlichen Map Reduce. Dies führt zu einer nützlichen Erweiterung. Im folgenden Beispiel wird das bereits bekannte Wörterzählen-Beispiel wieder aufgegriffen. 3.3 Beispiel Zur Verdeutlichung wurde erneut das Beispiel „Wörter zählen“ verwendet. Hierbei ergibt sich jedoch gleich ein Problem. Wie bringt man den Text in eine tabellarische Form, so dass man Hive verwenden kann? Text der Eingabedatei: Hallo Welt Hallo Nutzer Eine Lösung ist eine Vorverarbeitung mit einer anderen Programmiersprache, also beispielsweise mittels Java oder Pig. Anschließend lädt man den Text in eine vorher erzeugte Hive Tabelle. Text nach Vorverarbeitung: Hallo Welt 10 Hallo Nutzer Befehl zum erzeugen der Tabelle: CREATE TABLE words (word String) STORED AS TEXTFILE; Befehl zum Laden des Textes: LOAD DATA LOCAL INPATH ’words.txt’ INTO TABLE words; Nun hat man die gewünschten Daten in der Tabelle. Das eigentliche HiveQL Statement zum Zählen der Wörter ist nun nur noch ein einfaches Select Statement mit Gruppierung nach dem Wort und einer Count Aggregationsfunktion. Selbstverständlich kann man das Ergebnis nun auch noch absteigend nach der Anzahl der Wörter sortieren. SELECT word , COUNT(word) AS count FROM words GROUP BY word ORDER BY count DESC; Dadurch erhält man das gewünschte Ergebnis, welches in dem obigen Beispiel nur auf der Konsole zurückgegeben wird. Um das Ergebnis zu speichern kann man es mit INSERT auch in eine Ergebnistabelle einfügen. Die Abarbeitung dieser Query teilt Hive in zwei aufeinanderfolgende MapReduce Jobs auf. Der erste Job erzeugt aus der Eingabetabelle eine Tabelle mit gruppierten Wörtern und der Anzahl des Wortes. Im darauffolgenden MapReduce Job wird das Ergebnis nach der Anzahl geordnet. Die Erzeugung der MapReduce Jobs übernimmt Hive komplett. 3.4 Bewertung Das Beispiel zeigt sehr deutlich die einfache Nutzung von Hive auf, aber auch seine Schwächen sind gut erkennbar. Wenn die Daten nicht in tabellarischer Form gegeben sind, muss man diese zunächst über Drittprogramme einlesen. Dieser Nachteil ist jedoch zu vernachlässigen, wenn man bedenkt, in welchem Umfeld Hive entwickelt wurde und eingesetzt wird. Hive soll rechen- und zeitintensive Datenbankaufgaben von der Datenbank in ein Hadoop Cluster verlagern, ohne alles neu programmieren zu müssen. Dadurch sind die Daten meist in tabellarischer Form gegeben. Demgegenüber steht die einfache Nutzung aufgrund der verwendeten SQL Anfragesprache. Zusätzlich stellt man dem Benutzer mit Join ein vielseitig einsetzbares Hilfmittel zur Verfügung, welches in Hadoop nicht so einfach verwendbar ist. Hive ist außerdem sehr leicht durch eigene Funktionen erweiterbar. Sogar eigene MapReduce Jobs und flexible Speicherung der Dateien sind möglich. Für viele Probleme wird Hive dadurch zu einem leicht einsetzbaren und flexiblen System. 11 4 Zusammenfassung und Ausblick In den vorangegangenen Kapiteln wurden die Programmiersprachen Pig und Hive individuell beleuchtet. In diesem Abschnitt soll nun ein Vergleich von beiden gezogen werden. Dieser Sachverhalt ist aufgrund der gemeinsamen Entwicklungsgeschichte interessant. Facebook hat Hive entwickelt, weil Pig zum damaligen Zeitpunkt große Performanceverluste bei der Nutzung hatte. Ein weiteres Problem von Pig bestand im Trainingsaufwand der Entwickler für den Einsatz einer neuen Programmiersprache. Mit der Entscheidung eine Programmiersprache zu entwickeln, deren Syntax auf SQL basiert, konnte man eine „steile Lernkurve“, wie man dies von anderen Programmiersprachen kennt, vermeiden. Dieses Problem haben auch die Entwickler bei Hadoop und insbesondere auch bei Pig erkannt. Deshalb soll es in Zukunft auch eine SQL API für Pig geben. Fraglich bleibt, warum die Ingenieure von Pig sich nicht von Anfang an für eine SQL Anfragemöglichkeit entschieden haben. Andererseits hat man mit Pig eine viel größere Kontrolle wie und in welcher Reihenfolge die Daten verarbeitet werden. Dadurch wird Pig für manchen Nutzer möglicherweise einfacher zu bedienen sein. Pipeline-Verarbeitungen kann man in Pig leichter programmieren, was sich in Hive eher schwierig gestaltet. Beim Typsystem geht Hive einen restriktiveren Weg als Pig, denn bei Pig muss der Datentyp nicht unbedingt angegeben werden. Dadurch ist in manchen Situationen der Import von Fremddaten einfacher als bei Hive. Liegen die Daten jedoch bereits in Tabellenform vor, so sind die Daten schon strukturiert und haben auch einen Typ. Dadurch wiegt dieser Nachteil von Hive gegenüber Pig nicht so schwer. Sowohl Pig als auch Hive unterstützen einen Aufruf von externen Bibliotheken. Dadurch ist es möglich die Funktionen, zu erweitern und neue Funktionalitäten bereitzustellen. Auch lassen sich fertige MapReduce Jobs aufrufen. Somit ist eine gewisse Rückwärtskompatibilität garantiert, da alle alten Hadoop MapReduce Programme wiederverwendet werden können. Ein weiterer Vorteil von Hive gegenüber Pig ist die Exportschnittstelle über JDBC/ODBC. Dadurch ist es sehr leicht, die Anfragen an das Hadoop Cluster von einer anderen Programmiersprache zu stellen. Bei Pig muss man hierfür einen Umweg über das Dateisystem gehen, was zu einer Verschlechterung der Performanz führt. Betrachtet man die Performanz der beiden Systeme, so scheint es, dass Hive schneller ist als Pig. Doch dies ist nur eine Tendenz. Tatsächlich ist es wohl eher ein gemischtes Ergebnis. So ist bei manchen Anfragen Hive schneller und bei anderen Pig. In der Tabelle 1 wurde versucht mit einer Gegenüberstellung die wichtigsten Gemeinsamkeiten und Unterschiede darzustellen. Abschließend muss man grundsätzlich je nach Anwendung entscheiden, welche Technik beziehungsweise welches System man einsetzt. Gerade bei großen Datenmengen und komplizierten Systemen gibt es keinen optimalen Lösungsweg. In dieser Arbeit wurde der Fokus vor allem auf die beiden Sprachen Pig und Hive gelegt, weil sie zusammen in den weltweit größten Clustern eingesetzt werden. So betreibt Facebook ein Cluster mit etwa 800 Knoten und Yahoo ein Cluster mit circa 1000 Knoten. Beide Firmen wollen ihre Cluster stark vergrößern und nebenbei auch beweisen, dass ihre Anfragesprache besser als die der Konkurrenten ist. Es bleibt also abzuwarten, wie sich dieser Konkurrenzkampf in Zukunft entwickelt. 12 Kriterium Funktionalität Programmierschnittstelle Eingabedatenstruktur Eingabe/Ausgabeformat Datenimportunterstützung Schnittstellen zur Weiterverarbeitung Benutzerschnittstellen Benutzerfreundlichkeit Eclipse Plugins Benutzerdefinierte Funktionen (UDFs) Lernkurve Hive Pig Hive API Pig API strukturiert unstrukturiert Rohdaten/Rohdaten Rohdaten/Rohdaten Sqoop - Datenimport keine Toolunterstützung ODBC/JDBC CLI, WebGUI keine Exportschnittstellen CLI(Grunt) nicht vorhanden nutzbar PigPen nutzbar einfache Einarbeitung durch Nähe zu SQL lange Einarbeitung in Pig eigene ScriptSprache Leistungsfähigkeit Performanz kein Schwerpunkt Stabilität (Ausfallsicherheit) Skalierbarkeit hohe Stabilität durch HDFS hoch skalierbar durch Hadoop Framework Allgemeine Projektinformationen Installationsaufwand Erweiterung von Hadoop Dokumentation Tutorials, Wikis AnwendungsLogfile Analyse, Realszenarien Time Webanalyse, Data Mining, Data Warehousing Zielgruppe Business Analyst mit SQL Kenntnissen Lizenzmodell Anfragesprache Metadaten Schemata Unterstützung Plattform tendenziell langsamer als Hive hohe Stabilität durch HDFS hoch skalierbar durch Hadoop Framework Erweiterung von doop Tutorials, Wikis Logfile Analyse Ha- APL v2.0 / GPL HiveQL (deklarativ) vorhanden ja (nicht optional) Programmierer mit Bedarf für Ad-hoc Abfragen APL v2.0 / GPL PigLatin (prozedural) nicht verhanden nicht benötigt Apache Hadoop Apache Hadoop Tabelle 1: Vergleich zwischen Hive und Pig [VHP] 13 A 1 Wörter zählen mit Apache Hadoop package o r g . myorg ; 2 3 4 import j a v a . i o . IOException ; import j a v a . u t i l . ∗ ; 5 6 7 8 9 10 11 12 13 import o r g . apache . hadoop . f s . Path ; import o r g . apache . hadoop . c o n f . ∗ ; import o r g . apache . hadoop . i o . ∗ ; import o r g . apache . hadoop . mapreduce . ∗ ; import o r g . apache . hadoop . mapreduce . l i b . i n p u t . F i l e I n p u t F o r m a t ; import o r g . apache . hadoop . mapreduce . l i b . i n p u t . TextInputFormat ; import o r g . apache . hadoop . mapreduce . l i b . output . FileOutputFormat ; import o r g . apache . hadoop . mapreduce . l i b . output . TextOutputFormat ; 14 15 p u b l i c c l a s s WordCount { 16 17 18 19 p u b l i c s t a t i c c l a s s Map e x t e n d s Mapper<LongWritable , Text , Text , I n t W r i t a b l e > { p r i v a t e f i n a l s t a t i c I n t W r i t a b l e one = new I n t W r i t a b l e ( 1 ) ; p r i v a t e Text word = new Text ( ) ; 20 p u b l i c v o i d map( LongWritable key , Text v a l u e , Context c o n t e x t ) throws IOException , I n t e r r u p t e d E x c e p t i o n { String l i n e = value . toString () ; S t r i n g T o k e n i z e r t o k e n i z e r = new S t r i n g T o k e n i z e r ( l i n e ) ; w h i l e ( t o k e n i z e r . hasMoreTokens ( ) ) { word . s e t ( t o k e n i z e r . nextToken ( ) ) ; c o n t e x t . w r i t e ( word , one ) ; } } 21 22 23 24 25 26 27 28 29 } 30 31 p u b l i c s t a t i c c l a s s Reduce e x t e n d s Reducer<Text , I n t W r i t a b l e , Text , I n t W r i t a b l e > { 32 p u b l i c v o i d r e d u c e ( Text key , I t e r a b l e <I n t W r i t a b l e > v a l u e s , Context c o n t e x t ) throws IOException , I n t e r r u p t e d E x c e p t i o n { i n t sum = 0 ; for ( IntWritable val : values ) { sum += v a l . g e t ( ) ; } c o n t e x t . w r i t e ( key , new I n t W r i t a b l e ( sum ) ) ; } 33 34 35 36 37 38 39 40 41 } 42 43 44 p u b l i c s t a t i c v o i d main ( S t r i n g [ ] a r g s ) throws E x c e p t i o n { C o n f i g u r a t i o n c o n f = new C o n f i g u r a t i o n ( ) ; 45 46 Job j o b = new Job ( c o n f , " wordcount " ) ; 14 47 j o b . setOutputKeyClass ( Text . c l a s s ) ; job . setOutputValueClass ( IntWritable . c l a s s ) ; 48 49 50 j o b . s e t M a p p e r C l a s s (Map . c l a s s ) ; j o b . s e t R e d u c e r C l a s s ( Reduce . c l a s s ) ; 51 52 53 j o b . s e t I n p u t F o r m a t C l a s s ( TextInputFormat . c l a s s ) ; j o b . setOutputFormatClass ( TextOutputFormat . c l a s s ) ; 54 55 56 F i l e I n p u t F o r m a t . addInputPath ( job , new Path ( a r g s [ 0 ] ) ) ; FileOutputFormat . setOutputPath ( job , new Path ( a r g s [ 1 ] ) ) ; 57 58 59 j o b . waitForCompletion ( t r u e ) ; 60 } 61 62 63 } Die Klasse WordCount (Quelle:[CLO]) für einen 15 Apache Hadoop MapReduce Job Literatur [CLO] Website von Cloudera. http://www.cloudera.com/ [DG08] Dean, Jeffrey ; Ghemawat, Sanjay: MapReduce: simplified data processing on large clusters. In: Commun. ACM 51 (2008), January, 107–113. http://dx.doi.org/10.1145/1327452.1327492. – DOI 10.1145/1327452.1327492. – ISSN 0001–0782 [DZ] Dhruba, Borthakur ; Zheng, Shao: Hadoop and Hive Development at Facebook [HAD] Website von Apache Hadoop. http://hadoop.apache.org/ [HIV] Website von Apache Hive. http://hive.apache.org/ [Ols] Olston, Christopher: Pig - web-scale data processing [ORS+ 08] Olston, Christopher ; Reed, Benjamin ; Srivastava, Utkarsh ; Kumar, Ravi ; Tomkins, Andrew: Pig latin: a not-so-foreign language for data processing. In: Proceedings of the 2008 ACM SIGMOD international conference on Management of data. New York, NY, USA : ACM, 2008 (SIGMOD ’08). – ISBN 978–1–60558–102–6, 1099–1110 [PIG] Website von Apache Pig. http://pig.apache.org/ [STL] Stewart, R.J. ; Trinder, P.W. ; Loidl, H.W.: Comparing High Level MR Query Languages [TSJ+ 10] Thusoo, A. ; Sarma, J.S. ; Jain, N. ; Shao, Zheng ; Chakka, P. ; Zhang, Ning ; Antony, S. ; Liu, Hao ; Murthy, R.: Hive - a petabyte scale data warehouse using Hadoop. In: Data Engineering (ICDE), 2010 IEEE 26th International Conference on, 2010, S. 996 –1005 [VHP] Vergleich von Hive und Pig. http://wiki.fh-stralsund.de/ [WCP] Pig Beispiel. http://en.wikipedia.org/wiki/Pig_ (programming_language) 16