Architektur von Datenbanksystemen II MapReduce: Recap PROGRAMMERS MUST SPECIFY: map (k, v) → list(<k’, v’>) reduce (k’, list(v’)) → <k’’, v’’> All values with the same key are reduced together OPTIONALLY, ALSO: partition (k’, number of partitions) → partition for k’ Often a simple hash of the key, e.g., hash(k’) mod n Divides up key space for parallel reduce operations combine (k’, v’) → <k’, v’>* Mini-reducers that run in memory after the map phase Used as an optimization to reduce network traffic THE EXECUTION FRAMEWORK HANDLES EVERYTHING ELSE… 2 k1 v1 Example k2 v2 map a 1 k4 v4 map b 2 c 3 combine a 1 k3 v3 c 6 a 5 map b 7 c 2 combine c 9 partition k6 v6 map combine b 2 k5 v5 a 5 partition c 8 combine c 2 b 7 partition c 8 partition Shuffle and Sort: aggregate values by keys a 1 5 b 2 7 c 2 9 8 reduce reduce reduce r1 s1 r2 s2 r3 s3 3 „Everything Else" Scheduling: assigns workers to map and reduce tasks “Data distribution”: moves processes to data Synchronization: gathers, sorts, and shuffles intermediate data Errors and faults: detects worker failures and restarts All algorithms must expressed in m, r, c, p Where mappers and reducers run When a mapper or reducer begins or finishes Which input a particular mapper is processing Which intermediate key a particular reducer is processing 4 Design Pattern: Secondary Sorting Values are arbitarily ordered Buffer values in memory, then sort Why is this a bad idea? e.g. k (v1, r), (v3, r), (v4, r), (v8, r) „Value-to-key conversion“ design pattern: form composite intermediate key, (k, v1) Let execution framework do the sorting preserve state across multiple key-value pairs to handle processing anything else we need to do? 6 Relational Algebra Projection Selection Cartesian product Set union Set difference Rename Join (⋈) Group by… aggregation … 7 Projection R1 R1 R2 R2 R3 R4 R5 R3 R4 R5 8 Projection in MapReduce EASY! Map over tuples, emit new tuples with appropriate attributes Reduce: take tuples that appear many times and emit only one version (duplicate elimination) • Tuple t in R: Map(t, t) -> (t’,t’) • Reduce (t’, [t’, …,t’]) -> [t’,t’] BASICALLY LIMITED BY HDFS STREAMING SPEEDS Speed of encoding/decoding tuples becomes important Relational databases take advantage of compression Semistructured data? No problem! 9 Selection R1 R2 R3 R4 R1 R3 R5 10 Selection in MapReduce EASY! Map over tuples, emit only tuples that meet criteria No reducers, unless for regrouping or resorting tuples (reducers are the identity function) Alternatively: perform in reducer, after some other processing BUT VERY EXPENSIVE!!! HAS TO SCAN THE DATABASE Better approaches? 11 Set Operations SIMILAR IDEAS: each map outputs the tuple pair (t,t). For union, we output it once, for intersection only when in the reduce we have (t, [t,t]) For Set difference? Map Function: For a tuple t in R, produce key-value pair (t, R), and for a tuple t in S, produce key-value pair (t, S). Reduce Function: For each key t, do the following. - 1. If the associated value list is [R], then produce (t, t). - 2. If the associated value list is anything else, which could only be [R, S], [S, R], or [S], produce (t, NULL) 12 Group by… Aggregation EXAMPLE: WHAT IS THE AVERAGE TIME SPENT PER URL? IN SQL: SELECT url, AVG(time) FROM visits GROUP BY url IN MAPREDUCE: Map over tuples, emit time, keyed by url Framework automatically groups values by keys Compute average in reducer Optimize with combiners 13 Relational Joins R1 S1 R2 S2 R3 S3 R4 S4 R1 S2 R2 S4 R3 S1 R4 S3 14 Types of Relationsship Many-to-Many One-to-Many One-to-One 15 Join Algorithms in MapReduce Reduce-side Join Map-side Join In-Memory Join - Striped variant - memcached variant Basic idea: group by join key Map over both sets of tuples Emit tuple as value with join key as the intermediate key Execution framework brings together tuples sharing the same key Perform actual join in reducer Similar to a “sort-merge join” in database terminology 1-to-1 joins 1-to-many and many-to-many joins 16 Reduce-side Join: 1-to-1 Reduce-side Join: 1-to-1 Map keys values R1 R1 R4 R4 S2 S2 S3 S3 Reduce keys values R1 S2 S3 R4 Note: no guarantee if R is going to come first or S 17 Reduce-side Join: 1-to-many Reduce-side Join: 1-to-many Map keys values R1 R1 S2 S2 S3 S3 S9 S9 Reduce keys values R1 S2 S3 … 18 Reduce-side Join: V-to-K Conversion In reducer… keys values R1 S2 New key encountered: hold in memory Cross with records from other set S3 S9 R4 S3 New key encountered: hold in memory Cross with records from other set S7 19 Reduce-side Join: many-to-many Reduce-side Join: Many-to-Many In reducer… keys values R1 R5 Hold in memory R8 S2 Cross with records from other set S3 S9 20 Reduce-side Join - Example 21 Map-side Map-side Join: BasicJoin: Idea Basic Idea Assume two datasets are sorted by the join key: R1 S2 R2 S4 R4 S3 R3 S1 A sequential scan through both datasets to join (called a “merge join” in database terminology) 22 Map-side Join: Parallel Scans If datasets are sorted by join key, join can be accomplished by a scan over both datasets Partition and sort both datasets in the same manner A map-side join between large inputs works by performing the join before the data reaches the map function. For this to work, though, the inputs to each map must be partitioned and sorted in a particular way. Each input data set must be divided into the same number of partitions, and it must be sorted by the same key (the join key) in each source. All the records for a particular key must reside in the same partition. This may sound like a strict requirement (and it is), but it actually fits the description of the output of a MapReduce job. 23 In-Memory Join BASIC IDEA: LOAD ONE DATASET INTO MEMORY, STREAM OVER OTHER DATASET Works if R << S and R fits into memory Called a “hash join” in database terminology MAPREDUCE IMPLEMENTATION Distribute R to all nodes Map over S, each mapper loads R in memory, hashed by join key For every tuple in S, look up join key in R No reducers, unless for regrouping or resorting tuples 24 In-Memory Join: Variants STRIPED VARIANT: R too big to fit into memory? Divide R into R1, R2, R3, … s.t. each Rn fits into memory Perform in-memory join: n, Rn ⋈ S Take the union of all join results MEMCACHED JOIN: Load R into memcached (http://memcached.org) Replace in-memory hash lookup with memcached lookup 25 Which join to use? IN-MEMORY JOIN > MAP-SIDE JOIN > REDUCE-SIDE JOIN Why? LIMITATIONS OF EACH? In-memory join: memory Map-side join: sort order and partitioning Reduce-side join: general purpose 26 Kritik an MapReduce MapReduce wiederholt bereits gemachte Fehler „Brute-force-Ansatz“, keine Indizierung o.ä. Performanz unklar Kein „Schema“ für Daten Konstrukte nicht abstrakt genug Keine angemessene Abfragesprache Parallele DBMS MapReduce Schemaunterstützung X - Indexe X - Programmiermodel Deklarativ (SQL) Beschreibung eines Algorithmus (C/C++, Java, …) Optimierung X - Flexibilität - X Fehlertoleranz - X 28 Benchmark 0.19.0 Java 1.6 Andrew Pavlo , Erik Paulson , Alexander Rasin , Daniel J. Abadi , David J. DeWitt , Samuel Madden , Michael Stonebraker, A comparison of approaches to large-scale data analysis. SIGMOD, June 29-July 02, 2009, Providence, Rhode Island, USA. Parallele shared-nothing, zeilenbasierte Datenbank Hash-partitioniert, Indexe angelegt und Kompression aktiviert Parallele shared-nothing, spaltenbasierte Datenbank (Version 2.6) Kompression aktiviert, Keine Sekundärindexe Standardeinstellung (+Hint, dass jeweils nur eine Anfrage ausgeführt wird) Load, Selektion, Aggregation und Join 29 Benchmark – Umgebung Dokumente : 600.000 unterschiedliche Dokument an einem Knoten 155 Mio. UserVisits-Tupel (20GB/Knoten) 18 Mio. Ranking-Tupel (1GB/Knoten) 30 Daten Laden Reorganisation Ladezeit 31 Selektion Finde die pageURLs in der Ranking-Tabelle (1GB/node) deren pageRank einen gewissen Schwellwert überschreiten SELECT pageURL, pageRank FROM Rankings WHERE pageRank > x Time to merge of the output files in a result file processing time x = 10, resultiert in ca. 36.000 Zeilen pro Knoten 32 Aggregation Berechnung des gesamten Werbeumsatzes (adRevenue) für jede sourceIP in der Tabelle UserVisits (20GB/Knoten). SELECT sourceIP, SUM(adRevenue) FROM UserVisits GROUP BY sourceIP Die Anfrage erzeugt 2,5 Mio. Ergebnistupel 33 Join Join of tables Ranking with Uservisits and computation of the overall turnover and average pageRank Descending sort and return of tuple with highest turnover SELECT INTO Temp sourceIP, AVG(pageRank) AS avgPageRank, SUM(adRevenue) AS totalRevenue FROM Rankings AS R, UserVisits AS UV WHERE R.pageURL = UV.destURL AND UV.visitDate BETWEEN Date(‘2011-01-15’) AND Date(‘2011-01-22’) GROUP BY UV.sourceIP SELECT sourceIP, totalRevenue, avgPageRank FROM Temp ORDER BY totalRevenue DESC LIMIT 1 34 Abschließender Vergleich Vorverarbeitung beschleunigt Ausführung z. B. haben Vertica und DBMS-X einen Index auf der pageRank-Spalte MapReduce ist gut für sogenannte on-demand Berechnungen (on-off processing), DBMS für sich wiederholende Datenanalysen 35 ■ Map and reduce are second-order functions Intuition for Parallelization Contracts □ Call first-order functions (user code) □ Provide first-order functions with subsets of the input data Definefunctions dependencies Call■first-order (user code) between the must obeyed when Providerecords first-order that functions with be subsets of the input data Key& Value& Independent& subsets& splitting them into subsets □ Required partition properties required paration properties ■ Map □ All records are independently processable Input&set& all records are independently processable ■ Reduce □ Records with identical key must be identical processed together records with key must be processed together 37 □ Each combination of records from the two inputs Contracts beyond Map and Reduce isContracts built and isbeyond independently Map processable and Reduce Cross on of records from the two inputs Match Two inputs processable dependently two inputs each combination of records with □ Two inputs, Each combination of records from the two inputs each combination of records from the two inputs equal key from the two inputs is built is built and isis independently processable processable is built and independently □ Each pair is independently processable h combination of records with Match the two inputs is built two inputs, each combination of records with equal CoGroup from processable the each two inputs is built Two key inputs, combination of records with ependently each pair is independently processable □ equal Multiple inputs key from the two inputs is built □ Each Pairs pair withisidentical key areprocessable grouped for each input independently □ Groups of all inputs with identical key inputs are multiple processed together pairs with identical key are grouped for each input CoGroup cal key are grouped for each input groups of all inputs with identical key are Multiple inputs processed together key puts with identical Pairs with identical key are grouped for each input ogether 38 Parallelization Contracts (PACTs) that defines properties on the input and output data of its associated first order function Specifies dependencies between records (What must be processed together?) Generalization of map/reduce Logically: abstraction a (set of) communication patterns Generic properties preserved or produced by the user code (key property, sort order, …) Relevant to parallelization of succeeding functions Apache Flink 39 Example (SQL) 40 Example (Clustering) 41 Q3 43 Task Think about how a join can be realized in the Map/Reduce paradigm, and in its Hadoop implementation It‘s ok to use two jobs, and concatenate them What other join implementations for a two-way join could you think of? Is it possible to do a threeway join with only one Map/Reduce job? 44