MapReduce Jens Dörre Universität Passau Projekt M AP R EDUCE F OUNDATION Funktionale Programmierung Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 1 / 25 Gliederung 1 MapReduce allgemein 2 MapReduce in Haskell Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 2 / 25 Gliederung 1 MapReduce allgemein 2 MapReduce in Haskell Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 3 / 25 Parallele und verteilte Systeme Theorie Nischendasein: keine von-Neumann-Architektur Praxis Standard-Hardware In allen Maßstäben (unsichtbar) vorhanden CPU: Bit, Instruktion, Hardware-Thread, Multicore Verteilt: Cluster, Grid, Cloud Spezialfälle Itanium, Cell, GPUs Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 4 / 25 Parallele und verteilte Systeme: Probleme Niedrige Abstraktionsebene Explizite Parallelität Schwierig zu schreiben: Synchronisationscode Oft geringer Parallelitätsgrad Geringe Portabilität → geringe Skalierbarkeit Lösung: Konzentration auf einfache Problemklasse: massive Datenparallelität Enter MapReduce! Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 5 / 25 MapReduce: Schematischer Überblick Allgemeiner Ablauf einer MapReduce-Berechnung partition1 partition2 output1 Mapper partition3 Reducer output2 Reducer output3 Mapper partition4 Mapper partition5 output4 partition6 Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 6 / 25 MapReduce: Schematischer Überblick (2) Verteilter Ablauf einer MapReduce-Berechnung partition1 partition2 output1 Mapper partition3 Reducer output2 Reducer output3 Mapper partition4 Mapper partition5 output4 partition6 Verteilte Ein- und Ausgabedaten Mapper- und Reducer-Tasks werden verteilt ausgeführt Möglichst lokale Ausführung des Mappers bei seinen Daten Entfernte Kommunikation zwischen jedem Mapper und jedem Reducer Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 7 / 25 MapReduce-Beispiel Google: Erzeugung eines invertierten Indexes (“invertierte Datei”) Eingabe: sehr viele Textdokumente Ausgabe: Index (Wort → − DokumentIDs) Abarbeitung in zwei Phasen 1 “Map” pro Dokument: Erzeuge Liste von Paaren (Wort, DokumentID) 2 “Reduce” pro Wort: Erzeuge Paar (Wort, sortierte Liste aller zugehörigen DokumentIDs) Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 8 / 25 MapReduce Was ist MapReduce? Ein Algorithmenskelett, vergleichbar mit Divide-and-Conquer, dem Tiefensuch-Vorgehen, Taskfarming und vielen anderen. Wie sieht MapReduce aus? Anwendungsspezifisches Eingabeformat Schlüssel/Wert-Paare als Ausgabe Anwender spezifiziert Algorithmus mittels zweier Funktionen “Mapper” erzeugt Zwischenergebnisse (Schlüssel/Wert-Paare) aus jedem Eingabedatum “Reducer”, angewendet auf alle Zwischenergebnisse mit demselben Schlüssel, erzeugt beliebig viele/wenige Endergebnisse Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 9 / 25 MapReduce: Eigenschaften Programmiermodell für massiv verteilte Ausführung (einige Tausend Rechner) Datenparallelität als Basis Durch Google (2004) populär Beispiel-Anwendung: Generierung des Indexes für die Web-Suche Tausende weitere Anwendungen im Einsatz mit einem Durchsatz von Petabytes pro Tag Clustering Häufigste Anfragen Extraktion semantischer Daten Framework mit zwei funktionalen Konzepten: map/concatMap und teilweise fold/scan Imperativer Benutzercode möglich Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 10 / 25 MapReduce: Vorteile Ermöglicht erst das Petascale-Computing Einfaches, aber breit anwendbares Programmiermodell Masse an Daten Notwendige Flexibilität Sequenzielle (=einfache) Sicht auf paralleles und verteiltes System Akzeptanz bei Mainstream-Programmierern Eingebaute Fehlertoleranz in vielen Frameworks Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 11 / 25 Implementierungen von MapReduce Google MapReduce Nicht erhältlich Verteilte C++-Implementierung Apache Hadoop MapReduce Open Source (http://hadoop.apache.org/mapreduce/) Entwickelt bei Yahoo Nutzbar bei Amazon als “Elastic MapReduce”-Service Verteilte Java-Implementierung Durch stdin/stdout Interface auch mit anderen Sprachen verwendbar Nokia Disco Open Source Verteilte Erlang+Python-Implementierung ... Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 12 / 25 Erweiterungen von MapReduce Schlüssel werden sortiert verarbeitet Benutzerdefinierte Ein- und Ausgabeformate Benutzerdefinierte Splitter zur Partitionierung der Daten Combiner: zusätzlicher, Reduce-ähnlicher Schritt direkt im Anschluss an Mapper Fehlerhafte Daten werden mehrfach zu verarbeiten versucht, dann ignoriert Verschiedene Debugging- und Reporting-Möglichkeiten Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 13 / 25 Phasen bei der verteilten Abarbeitung Sequenzielle Sicht auf ein einzelnes Datum original map phase in split1 map split2 sortA combine original reduce phase send Jens Dörre (Uni Passau) sortB reduce MapReduce out Funktionale Programmierung 14 / 25 Gliederung 1 MapReduce allgemein 2 MapReduce in Haskell Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 15 / 25 Modellierung mit Haskell Vereinfachungen 1 lokaler Thread statt verteilter Cluster Daher keine Partitionierung für verschiedene Rechenknoten Keine Ein-/Ausgabe Daher Berechnung im vorhandenen Hauptspeicher Sortier-Erweiterung ist vorhanden Combiner, Splitter, . . . sind nicht vorhanden Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 16 / 25 Das Framework in Haskell Typsignatur: mapReduce :: => -> -> -> forall k1 k2 v1 v2 v3. (Ord k2) (k1 -> v1 -> [(k2,v2)]) (k2 -> [v2] -> [v3]) [(k1,v1)] [(k2,v3)] ------ Needed for grouping and sorting The *’mapper’* function The *’reducer’* function An input key-value mapping An output key-value mapping Funktion höherer Ordnung Zwei benutzerdefinierte Funktionen als Parameter Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 17 / 25 Die Phasen in Haskell Typsignatur: mapReduce :: => -> -> -> forall k1 k2 v1 v2 v3. (Ord k2) (k1 -> v1 -> [(k2,v2)]) (k2 -> [v2] -> [v3]) [(k1,v1)] [(k2,v3)] ------ Needed for grouping and sorting The *’mapper’* function The *’reducer’* function An input key-value mapping An output key-value mapping 3 Hauptphasen in dieser Implementierung Map, Group/Shuffle, Reduce mapReduce mapper reducer = reducePerKey -- C. Apply *’reducer’* to each group . groupByKey -- B. Group intermediate data per key . mapPerKey -- A. Apply *’mapper’* to each key/value pair Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 18 / 25 Mehr Details mapReduce mapper reducer = reducePerKey -- C. Apply *’reducer’* to each group . groupByKey -- B. Group intermediate data per key . mapPerKey -- A. Apply *’mapper’* to each key/value pair where reducePerKey :: [(k2,[v2])] -> [(k2,v3)] reducePerKey = concatWithKey -- 2. Concatenate per-key lists . mapWithKey (uncurry reducer) -- 1. Apply *’reducer’* per key groupByKey :: [(k2,v2)] -> [(k2,[v2])] groupByKey = unConcatWithKey -- 3. Store key only once per group . groupBy fstEq -- 2. Group on same key . sortBy fstCmp -- 1. Sort on keys (*not* on values) mapPerKey :: [(k1,v1)] -> [(k2,v2)] mapPerKey = concat -- 2. Concatenate per-key lists . map (uncurry mapper) -- 1. Map *’mapper’* over list of pairs Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 19 / 25 Beispiel-Code Invertierter Index (Anfangsbeispiel): module InvertedIndex where import MapReduce import Data.List (sort) invertedIndex :: (Ord a) => [(a, String)] -> [(String, [a])] invertedIndex = mapReduce mapper reducer where mapper key = (map (flip (,) key) . words) -- each word with its docID reducer = const (wrap . sort) -- sort each result list wrap x = [x] simpleInput :: [(String, String)] simpleInput = ins "doc2" "appreciate the unfold" $ ins "doc1" "fold the fold" $ [] where ins k v = ((k,v) : ) Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 20 / 25 Zusammenfassung Parallelität und Verteiltes Rechnen schwierig MapReduce-Framework ermöglicht Konzentration auf Anwendung MapReduce erweitert Ansätze wie SETI um Group und Reduce → breitere Anwendbarkeit Mit Wissen aus Funktionalprogrammierung: keine Magie dahinter Algorithmen müssen für MapReduce stark angepasst werden Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 21 / 25 Es folgen (optionale und alternative) Folien mit noch mehr Details der Haskell-Implementierung von MapReduce. Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 22 / 25 Alle Details reducePerKey :: [(k2,[v2])] -> [(k2,v3)] reducePerKey = concatWithKey -- 2. Concatenate per-key lists . mapWithKey (uncurry reducer) -- 1. Apply *’reducer’* per key where mapWithKey f = let g (k,v) = (k, f (k,v)) in map g concatWithKey k_vs_s = [(k,v) | (k,vs) <- k_vs_s, v <- vs] groupByKey :: [(k2,v2)] -> [(k2,[v2])] groupByKey = unConcatWithKey -- 3. Store key only once per group . groupBy fstEq -- 2. Group on same key . sortBy fstCmp -- 1. Sort on keys (*not* on values) where fstEq (k,_v) (k’,_v’) = k==k’ fstCmp (k,_v) (k’,_v’) = k ‘compare‘ k’ unConcatWithKey kv_ss = [(k, vs) | kv_s <- kv_ss, let k = fst $ head kv_s, let vs = map snd kv_s] mapPerKey :: [(k1,v1)] -> [(k2,v2)] mapPerKey = concat -- 2. Concatenate per-key lists . map (uncurry mapper) -- 1. Map *’mapper’* over list of pairs Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 23 / 25 Mit fehlenden Details Allgemein: {-# LANGUAGE ScopedTypeVariables #-} module MapReduce (mapReduce) where import Data.List(groupBy, sortBy) Für reducePerKey und groupByKey: reducePerKey :: [(k2,[v2])] -> [(k2,v3)] reducePerKey = concatWithKey -- 2. Concatenate per-key lists . mapWithKey (uncurry reducer) -- 1. Apply *’reducer’* per key where mapWithKey f = let g (k,v) = (k, f (k,v)) in map g concatWithKey k_vs_s = [(k,v) | (k,vs) <- k_vs_s, v <- vs] groupByKey :: [(k2,v2)] -> [(k2,[v2])] groupByKey = unConcatWithKey -- 3. Store key only once per group . groupBy fstEq -- 2. Group on same key . sortBy fstCmp -- 1. Sort on keys (*not* on values) where fstEq (k,_v) (k’,_v’) = k==k’ fstCmp (k,_v) (k’,_v’) = k ‘compare‘ k’ unConcatWithKey kv_ss = [(k, vs) | kv_s <- kv_ss, let k = fst $ head kv_s, let vs = map snd kv_s] Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 24 / 25 Alle Hilfsdefinitionen Allgemein: {-# LANGUAGE ScopedTypeVariables #-} module MapReduce (mapReduce) where import Data.List(groupBy, sortBy) Für reducePerKey: mapWithKey f = let g (k,v) = (k, f (k,v)) in map g concatWithKey k_vs_s = [(k,v) | (k,vs) <- k_vs_s, v <- vs] Für groupByKey: fstEq (k,_v) (k’,_v’) = k==k’ fstCmp (k,_v) (k’,_v’) = k ‘compare‘ k’ unConcatWithKey kv_ss = [(k, vs) | kv_s <- kv_ss, let k = fst $ head kv_s, let vs = map snd kv_s] Jens Dörre (Uni Passau) MapReduce Funktionale Programmierung 25 / 25