Cloud Data Management Kapitel 7: Large-Scale Datenanalyse Dr. Anika Groß Wintersemester 2016 Universität Leipzig Institut für Informatik http://dbs.uni-leipzig.de 1 Einführung • Hadoop (und darauf aufbauende Frameworks) – – – – Einfache Parallelisierung von Berechnungen in Cluster-Umgebungen Fehlertoleranz Frei verfügbar Große Community • Viele Optimierungsansätze aus Forschung ( letztes Kapitel) – Design-Schwächen • SPOF, begrenzte horizontale Skalierbarkeit ( Hadoop 2.x.x) • Nicht optimal für alle Anwendungsfälle – Iterative Anwendungen • z.B. maschinelle Lernverfahren ( Spark) – Echtzeit-Anwendungen • Ad-hoc Aggregation großer Datenmengen ( Dremel) 2 Inhaltsverzeichnis • Apache Spark – Parallele In-memory Datenanalyse in Cloud-Umgebungen • Google Dremel – Hierarchisches, spaltenorientiertes Echtzeit-Anfragesystem zur Aggregation verteilter, geschachtelter Datensätze 3 Apache Spark: Motivation • MapReduce ist ungeeignet für iterative und interaktive Anwendungen – Materialisierung von Map-Ergebnissen im lokalen FS – Materialisierung und Replikation von Reduce-Ergebnissen im HDFS Quelle: [Spark2] • Ansatz: Caching im Hauptspeicher – Eine Größenordnung schneller als Externspeicherzugriff – 95% aller Anfragen an Facebooks Hive Cluster könnten komplett aus HS beantwortet werden [Spark] Quelle: [Spark2] 4 Apache Spark: Übersicht • Ursprünglich Forschungsprojekt der UC Berkely zur parallelen Analyse großer Datenmengen in Cluster-Umgebungen [Spark] – Apache Top Level-Projekt (seit 02-2014) – http://spark.apache.org • Auf Datensätze eines Datasets können verschiedene Funktionen (höherer Ordnung) angewendet werden Erzeugen neues Dataset – map, flatMap, filter, groupByKey, reduceByKey, join, cogroup, mapValues, partition, … • In-memory Caching von Datasets für effiziente Multi-pass-Operationen – Iterative Algorithmen (Machine Learning, PageRank, K-means, …) – Interaktive Datenanalyse (z.B. SQL) • Fehlertoleranz – Keine Replikation gecacheter Daten sondern Neuberechnung verlorener Partitionen 5 Spark Überblick + SparkR (R on Spark) Bildquelle: http://spark.apache.org • APIs: Scala, Java, Python, R (Lambda-Ausdrücke) • Run Spark: Standalone Mode / on YARN / on Mesos 6 Apache Spark: Resilient Distributed Datasets • Verteilte Kollektion von Datensätzen – Read-only – Generiert durch Anwendung bestimmter Transformationen auf • Basisdatenquelle (HDFS) oder • Existierendem RDD • Nutzer kann RDDs im Hauptspeicher der Worker-Nodes cachen – Erlaubt Wiederverwendung in späteren Operationen – Ausschreiben auf Sekundärspeicher bei “HS-Mangel” • Lineage-Information – RDD kennt seine “Vater-RDDs” sowie die Berechnungsvorschrift, mit welcher es aus diesen abgeleitet wurde – “Verlorene Partitionen” können bei Bedarf neuberechnet werden 7 Apache Spark: Beispiel (1) • In Scala: val sc = new SparkContext( “spark://...”, “MyJob”, home, jars) Resilient distributed datasets (RDDs) val file = sc.textFile(“hdfs://...”) val errors = file.filter(_.contains(“ERROR”)) errors.cache() errors.count() Action Quelle: [Spark3] 8 Apache Spark: Beispiel (2) • Zeitstempel (=vierte Spalte) der Log-Einträge die mit “ERROR” starten und “HDFS” enthalten 9 Apache Spark: Transformationen & Aktionen Quelle: [Spark] Beispiele: pets = sc.parallelize([(cat, 1), (dog, 1), (cat, 2)]) => [(cat,1), (dog,1), (cat,2)] //verteilt pets.sortByKey().collect() => [(cat,1), (cat,2), (dog,1)] pets.groupByKey().collect() => [(cat,[1,2]), (dog,[1])] pets.reduceByKey(f(x,y)=x+y).collect(); => [(cat,3), (dog,1)] pets.mapValues(f(x)=x+1).collect() 10 => [(cat,2), (dog,2), (cat,3)] Apache Spark: K-means Clustering • Analog zu Beispiel aus Kapitel 4 S.24/25 – Lokale Ausführung 11 Apache Spark: K-means Clustering (2) 12 Apache Spark: Architektur http://spark.apache.org/docs/latest/cluster-overview.html • BlockManager – “Write-once” Key-Value Store pro Worker – Caching von RDDs – Bereitstellen eines “Shuffle-Services” – Verwalten eines StorageLevels pro Block • Externspeicher, RAM 13 • Auslagern gecacheter Daten auf Externspeicher bei HS-Mangel Quelle: [Spark3] Apache Spark: Scheduling • DAGScheduler: – Kind-Partition hängt von einer konst. Anzahl von Partitionen der Vater-RDDs ab • Pipelining mehrerer Transformationen in 1 Task – Shuffle-Operationen markieren Stage-Grenzen • Parallele Ausführung der Tasks eines Stages • Keine Neuberechnung von Stages, deren Ergebnis bereits im Cache vorliegt (Stage 1) • TaskScheduler – Berücksichtigen Datenlokalität (Cache, HDFS Blöcke) – Task-Failure Neuberechnung auf anderen Knoten solange abhängige Partitionen verfügbar sind sonst: parallele RDD-Recovery 14 Apache Spark: RDD Recovery • Lineage-Information eines RDDs – – – – Menge von Partitionen (z.B. HDFS Blöcke, “Reduce-Tasks”, …) Menge von Abhängigkeiten zu “Vater-RDDs” Funktion um eine Partition ausgehend von “Vater-RDD” zu berechnen Bevorzugte Knoten zur Berechnung einer Partition (HDFS block location, Knoten welcher Partition eines RDDs cached, “shuffle on each parent”,…) – Information über Partitionierung • Fehlertoleranz für “Shuffle-Transformationen” – Join, groupByKey, reduceByKey, … – Pufferung der Zwischenergebnisse im Haupt- bzw. Externspeicher der “Erzeuger” 15 Quelle: [Spark2] Apache Spark: Evaluation [Spark] Logistic Regression (1Mrd Punkte, 100GB) K-Means (1Mrd. Punkte, 100GB, 10 Cluster) 10016Knoten Inhaltsverzeichnis • Apache Spark – Parallele In-memory Datenanalyse in Cloud-Umgebungen • Google Dremel – Hierarchisches, spaltenorientiertes Echtzeit-Anfragesystem zur Aggregation verteilter, geschachtelter Datensätze 17 Google Dremel • Skalierbares System für verteilte Ausführung von Leseanfragen auf riesigen Mengen geschachtelter Daten – Effizientere Bearbeitung im Vgl. zur Ausführung einer Menge von MapReduce Jobs • Building Blocks – Spaltenbasierte Speicherung geschachtelter Datensätze – SQL-ähnliche Anfragesprache – Multi-level execution trees • Open Source-Implementierung: Apache Drill • Quellen für die folgenden Folien: [Dremel1], [Dremel2] 18 Google Dremel : Datenmodell • Attribute können – – – – Werte sein oder ein bis mehrere Unterattribute haben Pflichtattribute sein [1,1] required optional sein [0,1] optional wiederholt auftreten (Reihenfolge relevant) [0,*] repeated record-oriented A column-oriented B B C:c1 D:d1 D:d2 C:c2 D:d3 D:d4 A E:e1 E:e2 E:e3 vs. A.B.C=c1 A.B.C=c2 A.B.D=d1 A.B.D=d2 A.B.D=d3 A.B.D=d4 A.E=e1 A.E=e2 A.E=e3 - … 19 Bei Projektion Lesen weniger Daten Bessere (De)kompressionseigenschaften Google Dremel : Spaltenbasierte Zerlegung • Sequentielle Speicherung aller Werte eines Attributes • Zusätzliche Information (r,d) für jeden Wert um Datensätze rekonstruieren zu können r2 r1 Repetition level r: “At what repeated field in the field's path the value has repeated” Anzahl repeated fields im Pfadpräfix, der mit Vorgänger übereinstimmt (inkl. Element, das Record identifiziert) r1.Name1.Language1.Code: “en-us” (r=0) r1.Name1.Language2.Code: “en” (r=2) r1.Name2 (r=1) r1.Name3.Language1.Code: “en-gb” (r=1) r2.Name1 (r=0) Definition level d: “How many fields in paths that could be undefined are actually present” Anzahl optionaler und repeated fields im Pfad (ohne Element, das Record identifiziert): r1.Links.Forward2: 40 (d=2) 20 NULL: wenn d kleiner als max. mögliche Anzahl optionaler und repeated fields im Pfad Google Dremel : Rekonstruktion • Notwendig für Interoperabilität mit satzorientierten Tools (z.B. MapReduce) – Gegeben: Teilmenge der benötigten Attribute – Gesucht: Ausgangsdatensätze mit originaler Schachtelung und Reihenfolge (nicht benötigte Attribute fallen weg) • Endlicher Automat (Finite State Machine) – Zustand entspricht einem Field Reader für entsprechendes Attribut – Bei Übergang in einen Zustand liest der Field Reader den nächsten Wert und fügt diesen zum aktuellem Datensatz hinzu – Anschließend wird r des darauffolgenden Wertes gelesen • Entspricht Zustandsübergang 21 Google Dremel : Rekonstruktion (2) • Wird nur eine Teilmenge der Attribute benötigt, kann ein einfacherer Automat konstruiert/verwendet werden 22 Google Dremel : SQL-ähnliche Anfragesprache • Optimiert für select-project-aggregate auf einer Tabelle – Single scan – Intra-Record und Inter-Record Aggregation 23 Google Dremel: Anfrageausführung • Serving Tree – Hierarchische Anfrageausführung – Jeder Knoten schreibt Anfrage in Menge von Teilanfragen um und aggregiert die resultierenden Teilergebnisse – Blattknoten kommunizieren mit Storage Layer – Fehlertoleranz durch Mehrfachvergabe einer Teilanfrage 24 Google Dremel: Evaluierung • SELECT SUM(CountWords(field)) / COUNT(*) FROM T1 – T1: 85 Mrd. Datensätze / 87 TB – Zugriff nur auf ein Attribut – MR Overhead execution time (sec) on 3000 nodes • Starten des/der Job/s • Task Scheduling • Lesen vollständiger Datensätze (87 TB) (0.5 TB) • “Tiefe” der Serving Tree-Topologie – Q2: SELECT country, SUM(item.amount) FROM T2 GROUP BY country – Q3: SELECT domain, SUM(item.amount) FROM T2 WHERE domain CONTAINS ’.net’ GROUP BY domain – T2: 24 Mrd. Datensätze/ 13TB 25 (60GB, 100s of records) (180GB, 1.1 Mio records) Zusammenfassung • Apache Spark – Parallele, fehlertolerante Datenanalyse mit In-memory Caching von Datensätzen – “Ausführungsumgebung” für Erweiterungen • • • • • Spark SQL Graph-Analyse - GraphX Machine Learning - MLlib Analyse auf kontinuierlichen Datenströmen: Spark Streaming R on Spark - SparkR • Google Dremel – Spaltenorientierte Speicherung geschachtelter Daten – Interaktive Analyse von read-only Daten • Scan & Aggregate (auf Teilmenge der Spalten) • Hierarchische Anfragebearbeitung: Multi-level aggregation tree 26 Quellen & Literatur • • • • • [Spark] Zaharia et. al.: Resilient Distributed Datasets: A Fault-Tolerant Abstraction for InMemory Cluster Computing. NSDI 2012 [Spark2]: http://www.cs.berkeley.edu/~rxin/talks/2013-05-14_BDAS_Oracle.pdf [Spark3]: http://spark.incubator.apache.org/talks/dev-meetup-dec-2012.pptx [Dremel1] Melnik et. al.: Dremel: Interactive Analysis of Web-Scale Datasets. Commun. ACM 2011 [Dremel2] http://net.pku.edu.cn/vc/read/Dremel.ppt 27