In reducer… - Database Technology Group

Werbung
Architektur von Datenbanksystemen II
MapReduce: Recap
PROGRAMMERS MUST SPECIFY:
map (k, v) → list(<k’, v’>)
reduce (k’, list(v’)) → <k’’, v’’>
 All values with the same key are reduced together
OPTIONALLY, ALSO:
partition (k’, number of partitions) → partition for k’
 Often a simple hash of the key, e.g., hash(k’) mod n
 Divides up key space for parallel reduce operations
combine (k’, v’) → <k’, v’>*
 Mini-reducers that run in memory after the map phase
 Used as an optimization to reduce network traffic
THE EXECUTION FRAMEWORK HANDLES EVERYTHING ELSE…
2
k1 v1
Example
k2 v2
map
a 1
k4 v4
map
b 2
c 3
combine
a 1
k3 v3
c 6
a 5
map
b 7
c 2
combine
c 9
partition
k6 v6
map
combine
b 2
k5 v5
a 5
partition
c 8
combine
c 2
b 7
partition
c 8
partition
Shuffle and Sort: aggregate values by keys
a
1 5
b
2 7
c
2 9 8
reduce
reduce
reduce
r1 s1
r2 s2
r3 s3
3
„Everything Else"




Scheduling: assigns workers to map and reduce tasks
“Data distribution”: moves processes to data
Synchronization: gathers, sorts, and shuffles intermediate data
Errors and faults: detects worker failures and restarts
 All algorithms must expressed in m, r, c, p




Where mappers and reducers run
When a mapper or reducer begins or finishes
Which input a particular mapper is processing
Which intermediate key a particular reducer is processing
4
Design Pattern: Secondary Sorting
 Values are arbitarily ordered
 Buffer values in memory, then sort
 Why is this a bad idea?
 e.g. k  (v1, r), (v3, r), (v4, r), (v8, r)
 „Value-to-key conversion“ design pattern: form
composite intermediate key, (k, v1)
 Let execution framework do the sorting
 preserve state across multiple key-value pairs to
handle processing
 anything else we need to do?
6
Relational Algebra






Projection
Selection
Cartesian product
Set union
Set difference
Rename
 Join (⋈)
 Group by… aggregation
 …
7
Projection
R1
R1
R2
R2
R3
R4
R5

R3
R4
R5
8
Projection in MapReduce
EASY!
 Map over tuples, emit new tuples with appropriate attributes
 Reduce: take tuples that appear many times and emit only one version
(duplicate elimination)
• Tuple t in R: Map(t, t) -> (t’,t’)
• Reduce (t’, [t’, …,t’]) -> [t’,t’]
BASICALLY LIMITED BY HDFS STREAMING SPEEDS
 Speed of encoding/decoding tuples becomes important
 Relational databases take advantage of compression
 Semistructured data? No problem!
9
Selection
R1
R2
R3
R4

R1
R3
R5
10
Selection in MapReduce
EASY!
 Map over tuples, emit only tuples that meet criteria
 No reducers, unless for regrouping or resorting tuples (reducers are the
identity function)
 Alternatively: perform in reducer, after some other processing
BUT VERY EXPENSIVE!!! HAS TO SCAN THE DATABASE
 Better approaches?
11
Set Operations
SIMILAR IDEAS:
 each map outputs the tuple pair (t,t).
 For union, we output it once, for intersection only when in
the reduce we have (t, [t,t])
 For Set difference?
 Map Function: For a tuple t in R, produce key-value pair
(t, R), and for a tuple t in S, produce key-value pair (t, S).
 Reduce Function: For each key t, do the following.
- 1. If the associated value list is [R], then produce
(t, t).
- 2. If the associated value list is anything else, which
could only be [R, S], [S, R], or [S], produce (t, NULL)
12
Group by… Aggregation
EXAMPLE: WHAT IS THE AVERAGE TIME SPENT PER URL?
IN SQL:
 SELECT url, AVG(time) FROM visits GROUP BY url
IN MAPREDUCE:
 Map over tuples, emit time, keyed by url
 Framework automatically groups values by keys
 Compute average in reducer
 Optimize with combiners
13
Relational Joins
R1
S1
R2
S2
R3
S3
R4
S4
R1
S2
R2
S4
R3
S1
R4
S3
14
Types of Relationsship
Many-to-Many
One-to-Many
One-to-One
15
Join Algorithms in MapReduce
 Reduce-side Join
 Map-side Join
 In-Memory Join
- Striped variant
- memcached variant
 Basic idea: group by join key
 Map over both sets of tuples
 Emit tuple as value with join key as the intermediate
key
 Execution framework brings together tuples sharing
the same key Perform actual join in reducer
 Similar to a “sort-merge join” in database
terminology
 1-to-1 joins
 1-to-many and many-to-many joins
16
Reduce-side
Join: 1-to-1
Reduce-side Join: 1-to-1
Map
keys
values
R1
R1
R4
R4
S2
S2
S3
S3
Reduce
keys
values
R1
S2
S3
R4
Note: no guarantee if R is going to come first or S
17
Reduce-side
Join: 1-to-many
Reduce-side
Join: 1-to-many
Map
keys
values
R1
R1
S2
S2
S3
S3
S9
S9
Reduce
keys
values
R1
S2
S3
…
18
Reduce-side
Join: V-to-K Conversion
In reducer…
keys
values
R1
S2
New key encountered: hold in memory
Cross with records from other set
S3
S9
R4
S3
New key encountered: hold in memory
Cross with records from other set
S7
19
Reduce-side Join: many-to-many
Reduce-side Join: Many-to-Many
In reducer…
keys
values
R1
R5
Hold in memory
R8
S2
Cross with records from other set
S3
S9
20
Reduce-side Join - Example
21
Map-side
Map-side
Join: BasicJoin:
Idea Basic Idea
Assume two datasets are sorted by the join key:
R1
S2
R2
S4
R4
S3
R3
S1
A sequential scan through both datasets to join
(called a “merge join” in database terminology)
22
Map-side Join: Parallel Scans
 If datasets are sorted by join key, join can be accomplished by a scan over both datasets
 Partition and sort both datasets in the same manner
 A map-side join between large inputs works by performing the join before the data reaches the map function.
 For this to work, though, the inputs to each map must be partitioned and sorted in a particular way.
 Each input data set must be divided into the same number of partitions, and it must be sorted by the same key (the join
key) in each source.
 All the records for a particular key must reside in the same partition.
 This may sound like a strict requirement (and it is), but it actually fits the description of the output of a MapReduce job.
23
In-Memory Join
BASIC IDEA: LOAD ONE DATASET INTO MEMORY, STREAM OVER OTHER DATASET
 Works if R << S and R fits into memory
 Called a “hash join” in database terminology
MAPREDUCE IMPLEMENTATION
 Distribute R to all nodes
 Map over S, each mapper loads R in memory, hashed by join key
 For every tuple in S, look up join key in R
 No reducers, unless for regrouping or resorting tuples
24
In-Memory Join: Variants
STRIPED VARIANT:




R too big to fit into memory?
Divide R into R1, R2, R3, … s.t. each Rn fits into memory
Perform in-memory join: n, Rn ⋈ S
Take the union of all join results
MEMCACHED JOIN:


Load R into memcached (http://memcached.org)
Replace in-memory hash lookup with memcached lookup
25
Which join to use?
IN-MEMORY JOIN > MAP-SIDE JOIN > REDUCE-SIDE JOIN

Why?
LIMITATIONS OF EACH?



In-memory join: memory
Map-side join: sort order and partitioning
Reduce-side join: general purpose
26
Kritik an MapReduce






MapReduce wiederholt bereits gemachte Fehler
„Brute-force-Ansatz“, keine Indizierung o.ä.
Performanz unklar
Kein „Schema“ für Daten
Konstrukte nicht abstrakt genug
Keine angemessene Abfragesprache
Parallele DBMS
MapReduce
Schemaunterstützung
X
-
Indexe
X
-
Programmiermodel
Deklarativ
(SQL)
Beschreibung eines
Algorithmus
(C/C++, Java, …)
Optimierung
X
-
Flexibilität
-
X
Fehlertoleranz
-
X
28
Benchmark
 0.19.0
 Java 1.6
Andrew Pavlo , Erik Paulson , Alexander Rasin , Daniel J. Abadi , David J. DeWitt , Samuel Madden
, Michael Stonebraker, A comparison of approaches to large-scale data analysis. SIGMOD, June
29-July 02, 2009, Providence, Rhode Island, USA.
 Parallele shared-nothing, zeilenbasierte Datenbank
 Hash-partitioniert, Indexe angelegt und Kompression aktiviert
 Parallele shared-nothing, spaltenbasierte Datenbank (Version 2.6)
 Kompression aktiviert, Keine Sekundärindexe
 Standardeinstellung (+Hint, dass jeweils nur eine Anfrage ausgeführt wird)
 Load, Selektion, Aggregation und Join
29
Benchmark – Umgebung
 Dokumente : 600.000 unterschiedliche Dokument an einem Knoten
 155 Mio. UserVisits-Tupel (20GB/Knoten)
 18 Mio. Ranking-Tupel (1GB/Knoten)
30
Daten Laden
Reorganisation
Ladezeit
31
Selektion
 Finde die pageURLs in der Ranking-Tabelle (1GB/node)
deren pageRank einen gewissen Schwellwert überschreiten
 SELECT pageURL, pageRank
FROM Rankings WHERE pageRank > x
Time to merge of
the output files in
a result file
processing time
 x = 10, resultiert in ca. 36.000 Zeilen pro Knoten
32
Aggregation
 Berechnung des gesamten Werbeumsatzes (adRevenue) für jede sourceIP in der Tabelle UserVisits (20GB/Knoten).
 SELECT sourceIP, SUM(adRevenue)
FROM UserVisits GROUP BY sourceIP
 Die Anfrage erzeugt 2,5 Mio. Ergebnistupel
33
Join
 Join of tables Ranking with Uservisits and computation of the
overall turnover and average pageRank
 Descending sort and return of tuple with
highest turnover
 SELECT INTO Temp sourceIP,
AVG(pageRank) AS avgPageRank,
SUM(adRevenue) AS totalRevenue
FROM Rankings AS R, UserVisits AS UV
WHERE R.pageURL = UV.destURL
AND UV.visitDate BETWEEN Date(‘2011-01-15’) AND Date(‘2011-01-22’)
GROUP BY UV.sourceIP
 SELECT sourceIP, totalRevenue, avgPageRank FROM Temp
ORDER BY totalRevenue DESC LIMIT 1
34
Abschließender Vergleich
 Vorverarbeitung beschleunigt Ausführung
 z. B. haben Vertica und DBMS-X einen Index auf der pageRank-Spalte
 MapReduce ist gut für sogenannte on-demand Berechnungen (on-off processing), DBMS für sich wiederholende
Datenanalysen
35
■ Map and reduce are second-order functions
Intuition
for Parallelization Contracts
□ Call first-order functions (user code)
□ Provide first-order functions with subsets of the input data
Definefunctions
dependencies
 Call■first-order
(user code) between the
must
obeyed
when
 Providerecords
first-order that
functions
with be
subsets
of the input
data
Key& Value&
Independent&
subsets&
splitting them into subsets
□ Required partition properties
 required paration properties
■ Map
□ All records are independently
processable
Input&set&
 all records are independently processable

■ Reduce
□ Records with identical key must
be identical
processed
together
records with
key must
be processed
together
37
□ Each combination of records from the two inputs
Contracts
beyond
Map
and Reduce
isContracts
built and isbeyond
independently
Map processable
and Reduce
Cross
on
of records from the two inputs
Match
Two inputs processable
dependently
 two
inputs each combination of records with
□ Two
inputs,
Each
combination
of records
from
the two inputs
 each
combination
of
records
from
the two
inputs
equal
key
from
the
two
inputs
is
built
is built
and isis
independently
processable processable
is built
and
independently
□ Each pair is independently processable
h combination of records with
Match
the two inputs is built
 two inputs, each combination of records with equal
CoGroup
from processable
the each
two inputs
is built
Two key
inputs,
combination
of records with
ependently
 each pair
is independently processable
□ equal
Multiple
inputs
key from the two inputs is built
□ Each
Pairs pair
withisidentical
key areprocessable
grouped for each input
independently
□ Groups of all inputs with identical key
inputs
are multiple
processed
together
 pairs with identical key are grouped for each input
CoGroup
cal key
are grouped for each input
 groups of all inputs with identical key are
Multiple
inputs
processed
together key
puts
with
identical
Pairs with identical key are grouped for each input
ogether
38
Parallelization Contracts (PACTs)
 that defines properties on the input and output data of its associated first order function
 Specifies dependencies between records (What must be processed together?)
 Generalization of map/reduce
 Logically: abstraction a (set of) communication patterns
 Generic properties preserved or produced by the user code (key property, sort order, …)
 Relevant to parallelization of succeeding functions
Apache Flink
39
Example (SQL)
40
Example (Clustering)
41
Q3
43
Task
 Think about how a join can be realized in the Map/Reduce paradigm, and in its Hadoop implementation
 It‘s ok to use two jobs, and concatenate them
 What other join implementations for a two-way join could you think of?
 Is it possible to do a threeway join with only one Map/Reduce job?
44
Herunterladen