- Fachgebiet Datenbanken und Informationssysteme

Werbung
Fakultät für Elektrotechnik und Informatik
Institut für Praktische Informatik
Fachgebiet Datenbanken und Informationssysteme
Parallelisierung von Data Mining Algorithmen
Masterarbeit
im Studiengang Informatik
Oliver Pabst
Matrikelnummer: 2297090
Prüfer: Prof. Dr. Udo Lipeck
Zweitprüfer: Dr. Hans Hermann Brüggemann
Betreuer: Prof. Dr. Udo Lipeck
13. Mai 2014
Zusammenfassung
In der heutigen Gesellschaft fallen in vielen Bereichen (Onlinehandel, Web-Mining für
Suchmaschinen, soziale Netzwerke, etc.) immer größere Datenmengen an, in denen potentiell nützliches Wissen enthalten sein kann. Diese bewegen sich in vielen Fällen bereits
im Bereich von Petabytes. Traditionelle sequentielle Algorithmen zur Assoziations- und
Clusteranalyse stoßen hier schon lange an ihre Grenzen, da eine effiziente Analyse der
anfallenden Daten mit diesen Verfahren nicht möglich ist. Um diesem Problem beizukommen ist eine Parallelisierung der Problemlösung unumgänglich, wenn keine grundsätzlich
effizienten Algorithmen bekannt sind.
Im ersten Teil dieser Arbeit wird eine Auswahl paralleler Data Mining-Algorithmen vorgestellt, die aus dem Bereich der Assoziations- und Clusteranalyse stammen. Grundlage
für diese Verfahren sind Parallelrechnerarchitekturen mit gemeinsam-globalem Speicher
oder Kommunikation über Nachrichten für Informationsaustausch sowie das relativ neue
MapReduce-Paradigma.
Der zweite Teil der Arbeit beginnt zunächst mit der Vorstellung der Implementierung
von vier parallelen Algorithmen, jeweils zwei aus dem Bereich der Assoziations- und
Clusteranalyse. Der Fokus der Implementierungen und der nachfolgenden Versuche liegt
hierbei auf dem MapReduce-Paradigma. In den Versuchen soll die Tauglichkeit für große
Datenmengen, soweit es im Rahmen dieser Arbeit und der vorhandenen Möglichkeiten
machbar ist, untersucht werden sowie der Einfluss der Steuerparameter auf die Algorithmen betrachtet werden.
Inhaltsverzeichnis
1 Einleitung
7
1.1
Motivation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
7
1.2
Überblick . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
8
2 Grundlagen
9
2.1
Knowledge Discovery in Databases . . . . . . . . . . . . . . . . . . . . .
9
2.2
Assoziationsanalyse . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
12
2.2.1
Der Apriori-Algorithmus . . . . . . . . . . . . . . . . . . . . . . .
13
2.2.2
Frequent Pattern Growth
. . . . . . . . . . . . . . . . . . . . . .
15
Clusteranalyse . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
22
2.3.1
Hierarchisches Clustering . . . . . . . . . . . . . . . . . . . . . . .
22
2.3.2
Partitionierendes Clustering . . . . . . . . . . . . . . . . . . . . .
24
2.4
Die UNION-Find-Struktur . . . . . . . . . . . . . . . . . . . . . . . . . .
30
2.5
Parallelrechner . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
32
2.6
MapReduce . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
33
2.6.1
Programmiermodell . . . . . . . . . . . . . . . . . . . . . . . . . .
33
2.6.2
Der Map-Reduce Prozess . . . . . . . . . . . . . . . . . . . . . . .
35
2.6.3
Hadoop . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
36
2.6.4
Ein- und Ausgabeformate . . . . . . . . . . . . . . . . . . . . . .
41
2.3
3 Parallele Algorithmen
3.1
43
Algorithmen zur Assoziationsanalyse . . . . . . . . . . . . . . . . . . . .
43
3.1.1
Partition-Algorithmus . . . . . . . . . . . . . . . . . . . . . . . .
43
3.1.2
Laufzeit . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
47
5
3.2
3.1.3
MapReduce-Partition . . . . . . . . . . . . . . . . . . . . . . . . .
48
3.1.4
MapReduce Frequent-Pattern-Growth (PFP) . . . . . . . . . . . .
50
3.1.5
Weitere Algorithmen . . . . . . . . . . . . . . . . . . . . . . . . .
55
Algorithmen zur Clusteranalyse . . . . . . . . . . . . . . . . . . . . . . .
56
3.2.1
Parallel Partially Overlapping Partitioning (pPOP) . . . . . . . .
56
3.2.2
Parallel Disjoint-Set DBSCAN (PDSDBSCAN) . . . . . . . . . .
60
3.2.3
MapReduce-DBSCAN . . . . . . . . . . . . . . . . . . . . . . . .
65
3.2.4
Shared-memory SLINK (SHRINK) . . . . . . . . . . . . . . . . .
79
3.2.5
MapReduce k-Means Clustering . . . . . . . . . . . . . . . . . . .
85
3.2.6
Weitere Algorithmen . . . . . . . . . . . . . . . . . . . . . . . . .
88
4 Implementierung
90
4.1
MapReduce-Partition . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
90
4.2
MapReduce-DBSCAN . . . . . . . . . . . . . . . . . . . . . . . . . . . .
93
4.2.1
97
MapReduce-kMeans und Parallel FP-Growth . . . . . . . . . . . .
5 Versuche
5.1
5.2
99
Assoziationsanalyse . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 100
5.1.1
Experimentelle Untersuchung der Korrektheit des Parallel FPGrowth-Algorithmus . . . . . . . . . . . . . . . . . . . . . . . . . 101
5.1.2
Untersuchung der Performance des Partition-Algorithmus . . . . . 104
5.1.3
Untersuchung der Performance des Parallel FP-Growth-Algorithmus105
5.1.4
Performancevergleich zwischen MR-Partitition und PFP-Growth . 108
Clusteranalyse . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 109
5.2.1
Auswirkungen der Blockgröße und Knotenanzahl auf die Laufzeit
am Beispiel des MR-kMeans-Algorithmus . . . . . . . . . . . . . . 110
5.2.2
Einfluss der Parameterwahl und Knotenzahl auf den MR-DBSCANAlgorithmus . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 110
5.2.3
MapReduce-kMeans und MapReduce-DBSCAN im Vergleich . . . 117
6 Fazit
121
6
Kapitel 1
Einleitung
1.1
Motivation
Das Erfassen von Daten in den verschiedensten Lebensbereichen und Umfeldern findet in der heutigen Zeit in immer größer werdenden Ausmaß statt. Um aus diesen gesammelten Daten potentiell nützliche Informationen zu extrahieren reichen traditionelle
sequentielle Data Mining-Verfahren schon lange nicht mehr aus, da beispielsweise in
hoch-frequentierten Bereichen des Internets, etwa Suchmaschinen oder Onlinehandel,
Datenmengen anfallen, die bereits den Petabyte-Bereich erreicht haben.
Um diese Datenmengen effizient analyisieren zu können wurden, im Wesentlichen im
vergangenen Jahrzehnt, parallele Data Mining-Verfahren entwickelt. Diese basieren zum
Großteil auf Parallelrechner-Architekturen, die als Grundlage einen gemeinsamen globalen Speicher oder Nachrichten für den Informationsaustausch verwenden. In den letzten
Jahren ist im verstärkten Ausmaß das MapReduce-Paradigma als Grundlage neuer Verfahren hervorgetreten. Dieses bezeichnet sowohl ein Programmiermodell, nach dem Anwendungen entwickelt werden, als auch eine Software-Infrastruktur für massiv-parallele
Berechnungen in großen verteilten Systemen.
In dieser Arbeit wird die Parallelisierung von Data Mining betrachtet. Zuerst wird eine Auswahl verschiedener Algorithmen aus den Bereichen der Assoziations- und Clusteranalyse vorgestellt, welche auf verschiedenen Parallelisierungs-Paradigmen basieren.
Anhand dieser werden verschiedene Ansätze vorgestellt, mit denen eine Parallelisierung,
zum Teil bekannter sequentieller Algorithmen, erreicht wird. Mit einer experimentellen
Untersuchung wird für eine Auswahl von Algorithmen analysiert, ob sich diese für die
Parallelisierung und somit Verarbeitung großer Datenmengen eignen. Der Fokus für die
Versuche liegt hier auf dem MapReduce-Paradigma.
7
1.2
Überblick
In Kapitel 2 werden zunächst die relevanten Grundlagen der Arbeit vorgestellt. Es soll
ein Überblick geliefert werden, wie die Entdeckung von Wissen aus Daten funktioniert.
Daran anschließend werden die für diese Arbeit relevanten Teilbereiche, die Assoziationsund Clusteranalyse mit den bekanntesten Verfahren vorgestellt. Weiterhin werden die
für die nachfolgenden Kapitel relevanten Themen Parallelrechner sowie MapReduce präsentiert. Außerdem wird mit der Union-Find-Struktur eine Datenstruktur vorgestellt,
die von einigen der vorgestellten parallelen Algorithmen verwendet wird.
Eine Auswahl paralleler Data Mining-Algorithmen wird in Kapitel 3 vorgestellt. Es werden Algorithmen aus den Bereichen Assoziations- und Clusteranalyse vorgestellt, die
zum einen auf verschiedenen Parallelisierungsparadigmen, wie etwa MapReduce oder
Shared-Memory, basieren. Zum anderen basieren diese auf vielfältigen Ansätzen, um
zum Teil populäre sequentielle Algorithmen zu parallelisieren.
Kapitel 4 präsentiert die Implementierungen der Algorithmen, die im Rahmen dieser Arbeit für Versuche auf Grundlage von Hadoop entwickelt worden sind. Bei diesen handelt
es sich um den MapReduce-Partition- sowie den MapReduce-DBSCAN-Algorithmus,
welche in Kapitel 3 vorgestellt werden. Außerdem werden oberflächlich die Implementierungen der beiden anderen untersuchten Algorithmen, MapReduce-kMeans und Parallel
FP-Growth, analysiert, die ebenfalls für die Versuche verwendet werden; diese stammen
aus dem Mahout-Projekt der Apache Software Foundation, welches skalierbare Algorithmen für maschinelles Lernen bereitstellt.
Die Tauglichkeit der MapReduce-Implementierungen wird in Kapitel 5 betrachtet. Hierbei steht unter anderen die Skalierbarkeit der Verfahren im Vordergrund und ob diese mit
verhältnismäßig großen Datenmengen umgehen können. Weiterhin wird untersucht, wie
sich die Laufzeit der Algorithmen bei Variation der Steuerparameter verhält sowie jeweils
ein Vergleich der Algorithmen aus dem jeweiligen Gebiet der Versuche vorgenommen.
In Kapitel 6 werden abschließend die wichtigsten Punkte der Arbeit zusammengefasst.
Zusätzlich wird ein abschließendes Fazit gezogen.
8
Kapitel 2
Grundlagen
In diesem Kapitel werden die beiden Bestandteile des Prozesses der Knowledge Discovery in Datenbanken vorgestellt, die in dieser Arbeit betrachtet werden. Hierbei handelt
es sich um die Data Mining-Techniken der Assoziationsanalyse sowie der Clusteranalyse. Der Fokus liegt auf diesen beiden Verfahren, da diese es ermöglichen, Muster in
unbekannten Daten zu erkennen. Diese sind die Grundlage um Regeln zu entwickeln,
mit denen Verfahren wie die Klassifikation oder die Generalisierung angewendet werden. Für diese beiden Analysearten werden grundlegende sequentielle Algorithmen präsentiert. Zusätzlich wird die weniger bekannte Union-Find-Struktur erläutert, die im
späteren Verlauf in einigen Implementierungen paralleler Clusteringverfahren zum Einsatz kommt, sowie das MapReduce-Paradigma, welches für einige Implementierungen
paralleler Algorithmen, die in dieser Arbeit vorgestellt werden, verwendet wird.
2.1
Knowledge Discovery in Databases
Ester und Sander definieren den Begriff der Knowledge Discovery in Databases (KDD)
[ES00] als einen Prozess, der, automatisch oder semi-automatisch, Wissen aus Datenbanken gewinnt. Dieses muss jedoch im statistischen Sinne gültig sein, darf bisher nicht
bekannt sein und soll nach Möglichkeit Nutzen bringen. Der iterative Prozess, der in Abbildung 2.1 dargestellt ist, besteht aus fünf Schritten, die im Folgenden kurz vorgestellt
werden.
Fokussierung
Das Ziel der Fokussierung ist es, zunächst ein Verständnis über die Anwendung zu gewinnen und in Erfahrung zu bringen, welches Anwendungswissen bereits vorhanden ist.
9
Gesamter
Datenbestand
Fokussieren
Relevanter
Datenbestand
Vorverarbeitung
Vorverarbeitete
Daten
Evaluation
Entdecktes
Wissen
Transformation
Transformierte
Daten
Data Mining
Erkannte Muster
Abbildung 2.1: Der KDD-Prozess
Hieraus wird das zu erreichende Ziel des KDD-Prozesses abgeleitet, insbesondere unter dem Gesichtspunkt, dass das neu generierte Wissen der Anwendung Nutzen bringen
muss und nicht bereits bekannt sein darf. Weiterhin gilt es festzulegen, welche Quellen herangezogen werden sollen für die Suche und wie diese verfügbar gemacht werden
können. Ein weiterer wichtiger Punkt ist, in welcher Form die zu verwendenden Daten
vorgehalten werden sollen. In vielen Fällen geschieht dies mit Hilfe eines Datenbanksystems, da dies beispielsweise im nächsten Schritt, der Vorverarbeitung, eine einfache
Selektion der gewünschten Daten aus dem für die Anwendung relevanten Datenbestand
erlaubt.
Vorverarbeitung
Im zweiten Schritt, der Vorverarbeitung, geht es um die Integration der für relevant
erachteten Werte, welche gegebenenfalls in eine konsistente Form zu bringen und zu
vervollständigen sind. Für Daten aus unterschiedlichen Quellen gilt, dass für diese beispielsweise verschiedene Einheiten (z.B. angloamerikanisches Maßsystem ↔ metrisches
Maßsystem) oder verschiedene Namen für dasselbe Objektattribut verwendet wurden.
Zusätzlich können Inkonsistenzen durch Schreibfehler auftreten oder fehlerhafte Messungen zu Rauschen führen, also zu einem Zufallsmuster, das wiederum die tatsächlich
vorhandenen Muster überlagern kann.
Transformation
Die vorverarbeiteten Daten sind zunächst in eine Form zu bringen, die für die zur Anwendung kommenden Data Mining-Verfahren geeignet sind. In den meisten Fällen bedeutet
dies, dass nur ausgewählte Attribute der vorverarbeiten Daten verwendet werden (Selektion), die für die Aufgabe relevant sind, oder dass Attributwertbereiche reduziert werden
10
(Diskretisierung), etwa die Reduktion eines Attributs ”Alter” von einem numerischen
Wert auf einen kategorischen Wert (Kind|Erwachsener).
Data Mining
Der vierte Schritt ist die Anwendung von effizienten Algorithmen auf die vorverarbeiteten Daten, um gültige Muster in dem Datenbestand zu finden. Hierfür ist es zunächst
notwendig, die Aufgaben zu identifizieren, die im Data Mining-Schritt zu bewältigen ist.
Die wichtigsten Aufgaben sind:
• Clusteranalyse: Die Aufgabe der Clusteranalyse ist die Gruppierung der Datenbankobjekte in Gruppen (Cluster), so dass Objekte desselben Clusters in ihren
Attributen möglichst ähnlich, Objekte verschiedener Objekte jedoch möglichst unähnlich sind. Aufgrund der Bedeutung im Rahmen dieser Arbeit wird die Clusteranalyse in Abschnitt 2.3 auf Seite 22 ausführlicher beschrieben.
• Klassifikation: Gegeben ist eine Menge von Trainingsobjekten mit Attributwerten,
für die eine Klassenzuordnung vorhanden ist. Ziel ist das Anlernen einer Funktion z.B. basierend auf Entscheidungsbäumen oder Regeln, die eine Klassifikation
zukünftiger Objekte anhand ihrer Attributausprägungen durchführt.
• Assoziationsanalyse: Aus einer gegebenen Datenbank mit Transaktionen (Itemsets) sollen Regeln der Form IF A AND B THEN C “gefunden werden, die
”
häufig auftretende und starke Zusammenhänge unter den vorkommenden Items
beschreiben. Da diese Data Mining-Aufgabe, ebenso wie die Clusteranalyse, für
diese Arbeit von zentraler Bedeutung ist, erfolgt eine tiefergehende Beschreibung
der Assoziationsanalyse in Abschnitt 2.2 auf Seite 12.
• Generalisierung: Das Ziel ist das Erreichen einer möglichst kompakten Beschreibung einer Datenmenge, indem Attributwerte generalisiert und die Anzahl von
Datensätzen reduziert wird.
Sobald die Aufgabe identifiziert ist kann basierend auf den Anwendungszielen und der
Beschaffenheit der Daten ein angemessener Algorithmus ausgewählt und ausgeführt werden.
Evaluation
Zum Abschluss des KDD-Prozesses werden vom ausführenden System die Ergebnisse,
in diesem Fall gefundene Muster, einem Experten der Anwendungsdomäne in geeigneter
Weise dargestellt. Dieser entscheidet darüber, ob die gesetzten Ziele erreicht worden sind,
11
oder ob eine zusätzliche Iteration des KDD-Prozesses durchzuführen ist. Hierbei ist festzuhalten, dass in einem weiteren Iterationsschritt nicht der vollständige Prozess abermals
auszuführen ist; es kann auch beispielsweise nur das Data Mining mit einer veränderten
Parameterwahl ausgeführt werden, wenn die zugrunde liegenden vorverarbeiteten Daten
von ausreichender Qualität sind. Damit auch zukünftige Prozesse neues Wissen generieren, ist es notwendig, gefundenes Wissen in das bestehende System einzupflegen, damit
eine wiederholte Durchführung des KDD-Prozesses auch weiterhin unbekanntes Wissen
liefert.
2.2
Assoziationsanalyse
Assoziationsregeln, die aus Verfahren der Assoziationsanalyse gewonnen werden, werden
auch als Warenkorbanalyse bezeichnet. Warenkörbe sind hier analog zu Kundeneinkäufen zu betrachten. Ein Warenkorb ist hierbei zusammengesetzt aus Objekten (Items),
die zusammen gekauft werden, unabhängig davon ob es sich um materielle (Artikel im
Supermarkt) oder immaterielle (Dienstleistungen, Informationen) Objekte handelt. Ein
anschauliches Beispiel für eine Datenbank von Transaktionen ist die Datensammlung,
die sich aus den Scanvorgängen an den Kassen eines Supermarkts ergibt. Eine Transaktion besteht in diesem Fall aus Informationen zu allen Artikeln (z.B. Barcodes), die
ein Kunde zu einem Zeitpunkt gekauft hat. Die Grundlage für eine tiefer gehende Analyse von Transaktionsdatenbanken ist das Bestimmen von Objektmengen, so genannten
Itemsets, die häufig in dieser Datenbank auftreten frequent Itemsets, wobei die Itemsets
auch als k-Itemsets bezeichnet werden; k bezeichnet hier die Anzahl der Elemente des
Itemsets.
Hierfür wird zunächst der Support s(X) für ein Itemset X berechnet, welcher den
Anteil von Transaktionen darstellt, die X enthalten. Von einem häufigen Itemset X
spricht man nun, wenn der Support s(X) größer gleich einem festgelegten Schwellenwert
minimumSupport ist, also s(X) ≥ minimumSupport. Für ein Itemset
{Chips, Brezeln, Bier}
wäre der Support beispielsweise
s({Chips, Brezeln, Bier}) =
|{Chips, Brezeln, Bier}|
,
|Transaktionsdatenbank|
also in wie vielen Transaktionen der Datenbank das Itemset vorhanden ist.
Auf Grundlage der häufigen Itemsets können Assoziationsregeln gesucht werden, welche
Zusammenhänge in den Transaktionen darstellen. Ein Beispiel hierfür wäre die Regel
s({Chips, Brezeln ⇒ Bier}), welche besagt, dass das Vorhandensein von Chips und Brezeln in einem Warenkorb auf das gleichzeitige Vorhandensein von Bier schließen lässt.
12
Es ist jedoch wichtig, auch die Qualität der Regeln zu bewerten. Ein Kriterium dafür
ist die Konfidenz. Diese bezeichnet die relative Häufigkeit, in der die Regel korrekt ist,
und bedeutet im Falle der Regel {Chips, Brezeln ⇒ Bier}, in wie vielen Regeln, die
{Chips, Brezeln} enthalten, zusätzlich auch {Bier} enthalten ist (Gleichung 2.1).
confidence({Chips, Brezeln ⇒ Bier}) =
s{Chips, Brezeln, Bier}
s{Chips, Brezeln}
(2.1)
Ein weiteres Kriterium ist der Lift, mit dessen Hilfe sich berechnen lässt, ob und wie weit
die Konfidenz für eine Regel den Erwartungswert übertrifft (Gleichung 2.2); der Lift ist
folglich ein Indikator für die generelle Bedeutung einer Regel.
lift({Chips, Brezeln ⇒ Bier}) =
s({Chips, Brezeln} ∪ {Bier})
s({Chips, Brezeln}) · s({Bier})
(2.2)
Der Fokus ist hinsichtlich großer Datenmengen auf den Aufgabenbereich der Bestimmung
häufiger Itemsets (Frequent-Itemset-Mining) gerichtet, da für diese Aufgabe zunächst
alle möglichen Itemsets, die sich aus dem Datenbestand ergeben, zu bestimmen sind.
Sei nun I die Menge aller Items, die in einer Transaktionsdatenbank mindestens einmal
auftreten. Dann ist die Menge aller möglichen Itemsets die Potenzmenge von I ohne die
leere Menge und somit |I| = 2n − 1.
Deshalb betrachtet diese Arbeit nur das Frequent-Itemset-Mining, für welches im Folgenden die beiden bekannten Algorithmen Apriori und FP-Growth vorgestellt werden.
2.2.1
Der Apriori-Algorithmus
Der Apriori-Algorithmus [AS94] ist ein Verfahren der Assoziationsanalyse, um häufige,
bzw. interessante Itemsets aus Transaktionsdatenbanken zu berechnen. Die Grundlage
dieses Algorithmus ist das Apriori-Prinzip, welches besagt, dass wenn ein Itemset häufig
ist, so sind auch dessen Teilmengen häufig. Zusätzlich gilt für häufige Itemsets eine AntiMonotonie (Gleichung 2.3), aus welcher gefolgert werden kann, dass der Support eines
Itemsets nicht größer sein kann als der größte Support der Teilmengen.
∀X, Y : (X ⊆ Y ) =⇒ s(X) ≥ s(Y )
(2.3)
Der Apriori-Algorithmus 2.1 zählt in der ersten Iteration zunächst alle häufigen 1Itemsets durch einen Scan der Transaktionsdatenbank. Die nachfolgenden Iterationen
(Zeile 2-8) bestehen jeweils aus zwei Phasen. In der ersten wird aus den im vorherigen
Durchlauf berechneten häufigen (k−1)-Itemsets mit dem Apriori-Gen-Algorithmus 2.2
13
die Kandidatenmenge der häufigen k-Itemsets berechnet (Zeile 5). In der zweiten Phase (Zeile 5-8) wird der Support für die Kandidaten in Ck gezählt. Itemsets der Länge k
mit einem Support größer gleich dem minimalen Support minSup werden zum Abschluss
der Menge der häufigen k-Itemsets hinzugefügt (Zeile 9). Am Ende gibt der Algorithmus
schließlich die Vereinigung aller häufigen Itemsets zurück (Zeile 10).
Algorithmus 2.1 Apriori-Algorithmus ([AS94] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
input: T - list of transactions
input: minSup - minimal support required for frequent itemsets
output: set of found frequent itemsets
procedure Apriori(T , minSup)
L1 = {frequent 1-itemsets};
for (k = 2; Lk−1 6= ∅; k + +) do
Ck = Apriori-Gen(Lk−1 );
for all transactions t ∈ T do
// calculate itemsets contained in transaction t
Ct = Subset(Ck , t);
for all candidates c ∈ Ct do
c.count++;
Lk = {c ∈ Ck |c.count ≥ minSup}
S
return k Lk
Der Algorithmus zur Erzeugung neuer, größerer Itemsets (Algorithmus 2.2) benutzt die
Menge Lk−1 , die Menge aller häufigen (k−1)-Itemsets und konstruiert daraus eine Obermenge von Itemsets der Länge k. In den Zeilen 2 bis 5 werden diese erzeugt, indem
zunächst zwei häufige (k − 1)-Itemsets p und q gewählt werden, die auf den Position
1, . . . , k−2 identisch sind und sich in der k−1-ten Position unterscheiden, das (k−1)-te
Element von p jedoch kleiner als das von q ist. Die Positionen 1, . . . , k −2 werden nun
mit den zwischen p und q gleichen Elementen besetzt, die (k −1)-te Position mit dem
(k−1)-ten Element aus p und das k-te Element mit dem aus q.
Im Anschluss an die Erzeugung der Itemsets ist abschließend die Apriori-Eigenschaft
zu prüfen, also ob die Kandidaten häufige (k-1)-Itemsets enthalten. Dadurch lässt sich
bereits die Menge der Itemsets, deren Support zu berechnen ist, reduzieren. Hierfür
werden alle Kandidaten durchlaufen (Zeile 7), von denen wiederum alle (k-1)-Teilmengen
durchlaufen werden. Wenn nun eine dieser Teilmengen nicht häufig ist, also nicht in Lk−1
enthalten ist, so folgt aus der Apriori-Eigenschaft, dass das k-Itemset nicht häufig sein
kann.
14
Algorithmus 2.2 Apriori-Gen - Erzeugung neuer Itemsets ([AS94] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
input: Lk−1 - frequent itemsets of length k−1
output: Ck - frequent itemsets of length k
procedure Apriori-Gen(Lk−1 )
// generating new itemsets
insert into Ck
select p.item1 , p.item2 , . . . , p.itemk−1 , q.itemk−1
from Lk−1 p, Lk−1 q
where p.item1 = q.item1 , . . . , p.itemk−1 = q.itemk2 , p.itemk−1 < q.itemk−1 ;
// prune candidates containing (k − 1)-subsets that are not in Lk−1
for all itemsets c ∈ Ck do
for all (k−1)-subsets s of c do
if (s ∈
/ Lk−1 ) then
delete c from Ck
return Ck
Laufzeit
Für den ersten Schritt, die Bestimmung der frequent 1-Itemsets, ist eine Laufzeit von
O(|T | · w) erforderlich, wobei w die durchschnittliche Transaktionslänge ist. Für die Erzeugung von Kandidaten werden Itemsets der Länge k −1 paarweise verglichen, wobei
hierfür maximal k − 2 Vergleiche pro Paar benötigt werden. Maximal müssen hier alle gefundenen Itemsets
betrachtet werden, so dass hier für die
Pwder vorherigen Iteration
2
Laufzeit maximal O( k=2 (k − 2) · |Lk−1 | ) beträgt. Für das Pruning ist für jeden generierten Kandidaten zu prüfen,
P ob alle (k −1) Teilmengen ebenfalls häufig sind, so dass
Bei der Zählung des Supportes
sich eine Laufzeit von O( w
k=2 k · (k −1) · |Ck |) ergibt.
|t|
ergeben sich für jede Transaktion mit
|t| k Itemsets. Insgesamt ist die LaufP Länge
w
zeit für die Supportzählung O(|T | · k · k αk ), wobei αk die Kosten zur Aktualisierung
eines Itemset-Kandidaten sind. Für einen sehr niedrigen Support ergibt sich insgesamt
aufgrund der Anzahl potentieller Itemsets eine exponentielle Laufzeit.
Nachteile
Bei einem niedrig angesetzten minimalen Support-Schwellwert kann es beim AprioriAlgorithmus passieren, dass es eine Vielzahl häufiger und langer Itemsets geben kann.
Dies führt dazu, dass der Apriori-Algorithmus sehr teuer hinsichtlich des Speicherbedarfs
wird, da eine große Anzahl von Kandidaten-Itemsets zu handhaben ist.
Beispielsweise führt eine Menge
von 104 häufigen 1-Itemsets bereits zu mehr als 107 2
10000
Itemset-Kandidaten ( 2
= 4, 9995 · 107 ). Zusätzlich ist für diese Kandidatenmenge
auch der Support zu bestimmen. Ein weiterer Punkt ist, dass für die Entdeckung ei-
15
nes häufigen Itemsets {a1 , . . . , a100 } der Länge 100 der Erzeugung von 2100 − 1 ≈ 1030
Kandidaten notwendig ist, was in Folge der Kandidatengenerierung nicht zu vermeiden
ist.
2.2.2
Frequent Pattern Growth
Der Frequent-Pattern-Growth(FP-Growth)-Algorithmus [HPY00] versucht die Nachteile
des Apriori-Algorithmus zu umgehen, in dem die Transaktionsdatenbank in Form einer
Baum-Struktur dargestellt wird, aus welcher die häufigen Itemsets gewonnen werden
können. Der Ansatz, eine Baum-Struktur zur Kodierung der Transaktionen zu verwenden, ermöglicht, dass die Menge der häufigen Itemsets berechnet werden kann, ohne
dass, wie im Apriori-Algorithmus, iterativ Kandidatenmengen berechnet und bewertet
werden müssen. Dieses wird durch drei verschiedene Ansätze erreicht.
Erstens wird eine erweiterte Präfix-Baum-Struktur und quantitative Informationen der
Itemsets in der Baum-Struktur gespeichert. Zusätzlich werden in einem FP-Baum nur
Knoten gespeichert, die in der Menge der häufigen 1-Itemsets vorkommen. Schließlich
werden die Knoten des Baumes so angeordnet, dass häufig vorkommende Items eine
größere Chance haben, sich Knoten mit anderen Pfaden zu teilen.
Der zweite Ansatz ist eine Methode zur Erzeugung wachsender Itemsets aus einem FPBaum, die mit häufigen 1-Itemsets beginnt (Ausgangs Suffix-Itemset) und im weiteren
Verlauf nur eine bedingte Itemset-Basis betrachtet, d.h. die Teildatenbank der Menge von
häufigen Itemsets, die zusammen mit dem Suffix-Pattern auftreten. Basierend auf dem
Suffix-Itemset und der daraus resultierenden bedingten Itemset-Basis wird ein bedingter
FP-Baum erzeugt, auf welchem die Itemset-Erzeugung rekursiv aufgeführt wird. Das
Wachstum der Itemsets geschieht durch das Aneinanderhängen der Suffix-Patterns mit
neuen Suffix-Patterns aus dem bedingten FP-Baum. Die Vollständigkeit dieser Methode
des Pattern-Wachstums wird garantiert, da häufige Itemsets einer Transaktion immer in
einem zugehörigen Pfad des FP-Baums codiert ist.
Als drittes wird als Suchtechnik zur Generierung häufiger Itemsets ein Divide-andConquer-Ansatz verwendet. Dieser reduziert die Größe der bedingten Itemset-Basis, da
das Problem des Findens langer, häufiger Itemsets in die Suche nach kurzen Itemsets
und das Aneinanderhängen von Suffixen umgewandelt wird.
Frequent-Pattern Baum
Ein Frequent-Pattern-Baum (Abbildung 2.2) ist eine Baumstruktur, die als Wurzel einen
als null“ bezeichneten Knoten besitzt, der als Attribut die Anzahl der im Baum gespei”
cherten Transaktionen festhält. An der Wurzel wird eine Menge von Präfix-Teilbäumen
16
der Elemente eingehängt. Die Struktur verfügt zusätzlich über eine Header-Tabelle, in
der die im Baum enthaltenen Items sowie Zeiger zum ersten Vorkommen des Items im
Baum enthalten sind.
null:5
Item
A
B
C
D
E
F
A:2
Header
C:1
F:1
B:2
D:1
E:1
D:1
C:1
F:1
F:1
Abbildung 2.2: Beispiel eines Frequent-Pattern Baums
Jeder Knoten besitzt drei Attribute: das erste ist der Knotenname, der das Item angibt,
welches der Knoten repräsentiert. Das zweite ist die Anzahl der Transaktionen, die durch
den Teilpfad, in dem der Knoten liegt, bis zu diesem Knoten repräsentiert werden. Das
dritte Element schließlich ist ein Zeiger, der auf den nächsten Knoten im Baum mit
demselben Namen zeigt. Die Header-Tabelle schließlich besitzt zwei Spalten; in der ersten
Spalte steht der Name des Items, die zweite enthält einen Zeiger zum ersten Vorkommen
des Elements im Baum. Dieses Konstrukt ermöglicht ausgehend von der Header-Tabelle
das einfache Finden jedes Vorkommens eines Items in einem FP-Baum durch Verfolgen
von Zeigern.
Algorithmus 2.3 Aufbau eines FP-Baums (basierend auf [HPY00] erarbeitet)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
input: tdb - database of transactions, ξ: minimum support threshold
output: T - a fp-tree
procedure ConstructFpTree(tdb, ξ)
define list F[]
. list of frequent 1-itemsets
for each transaction t in tdb do
for each item ai in t do
F [ai ] + +
sort F in descending order relating to item count
create root of FP-Tree T , labeled as null“
”
for each transaction t ∈ tdb do
order and filter frequent items in t according to F into f
InsertTree(f, T )
return T
Um den FP-Baum aufbauen zu können (Algorithmus 2.3), wird zunächst ein Scan der
17
Transaktionsdatenbank durchgeführt und eine Liste der häufigen Itemsets erstellt, welche
absteigend nach der Vorkommenshäufigkeit sortiert wird. Anschließend wird die Transaktionsdatenbank nochmals vollständig durchlaufen. Jede Transaktion wird nun hinsichtlich der Liste der häufigen Itemsets sortiert und gefiltert, so dass die Transaktion
nur noch häufige Items enthält und diese nach der Häufigkeit des Vorkommens sortiert
ist; diese wird mit der Funktion InsertTree in den Baum eingefügt.
Das Einfügen eines Itemsets in den FP-Baum (InsertTree, Algorithmus 2.4) beginnt
an der Wurzel des Baumes T . Hier wird überprüft, ob T einen Kindknoten n mit dem
Namen des ersten Items e aus t hat (Zeile 2). Wenn dies der Fall ist, wird im Kindknoten
n der Zähler für das Element um eins inkrementiert.
Algorithmus 2.4 Einfügen eines Itemsets in einen FP-Baum (basierend auf [HPY00]
erarbeitet)
input: t - itemset, T - a fp-tree
procedure InsertTree(t, T )
let e be the first item of t
if T.hasChild(n)and n.itemName == e.itemName then
increase n’s item count by 1
else
create new node n with name e and count 1
set n’s parent to T
set n’s node link to nodes with same item name using the node link structure
9:
remove item e from the itemset t
10:
if t is not empty then
11:
InsertTree(t, n)
1:
2:
3:
4:
5:
6:
7:
8:
Falls T jedoch kein Kindknoten mit dem Namen e besitzt, so wird ein neuer Knoten
n mit einen Itemzählwert von 1 und T als Vorgänger erzeugt. Zusätzlich wird n in die
node link Struktur des Baumes eingebunden. Im Anschluss an beide Fälle wird das Item
e aus t entfernt und erneut die Funktion InsertTree aufgerufen mit den verbliebenen
Rest von t sowie n als Wurzel, sofern t nicht leer ist.
Ein Beispiel für den Aufbau eines FP-Baumes ist in Abbildung 2.3 zu sehen; eingefügt
wird in diesen die Transaktionsmenge {T1 = {A, B}, T2 = {A, C}, T3 = {B, C}}.
Zu Beginn des Einfügevorgangs besteht der Baum nur aus der Wurzel null:0“. Wenn nun
”
also T1 eingefügt wird, muss ein neuer Knoten A“ mit einem Zähler von 1 als Kind von
”
der Wurzel erzeugt werden. Zusätzlich wird ein Zeiger in der Header-Tabelle angelegt,
der auf A“ zeigt. Das Einfügen von B“ beginnt mit Knoten A“ als Startknoten. Da
”
”
”
auch hier kein entsprechendes Kind vorhanden ist, wird analog zum ersten Item ein neuer
Knoten B“ mit einem Zählerwert von 1 angelegt und ein Zeiger von der Header-Tabelle
”
auf den Knoten erzeugt. Das Einfügen von T2 beginnt analog zu T1 , jedoch ist unterhalb
der Wurzel von T bereits ein Kindknoten mit dem Namen A“ vorhanden, so dass von
”
18
null:1
Items
Header
A:1
A
B
B:1
null:2
Items
Header
A
A:2
B
C
B:1
C:1
null:3
Items
Header
A
A:2
B
B:1
C
B:1
C:1
C:1
Abbildung 2.3: Konstruktion eines FP-Baumes
diesem lediglich der Zählerwert um eins inkrementiert wird. Um das Item C“ unterhalb
”
von A einzufügen, ist jedoch wieder ein neuer Knoten C“ mit einem Zählerwert von
”
eins anzulegen und zusätzlich wird ein Link von der Header-Tabelle auf C“ angelegt.
”
Das Einfügen von T3 erfolgt weitestgehend analog, jedoch müssen zusätzliche Zeiger von
den bereits vorhandenen Pfadenden der Headertabelle auf die neuen Knoten angelegt
werden.
Die Komplexität für das Einfügen eines Itemsets in einen FP-Baum beträgt, grob abgeschätzt, O(|t|), wobei t die Länge des Itemsets ist. Sei A nun die Menge aller in der
Transaktionsdatenbank vorkommen Items mit der Anzahl ihrer Vorkommen. Dann beträgt die Komplexität für den Aufbau eines FP-Baumes grob O((|tdb| ∗ |t|) + O(A ∗
log(A)) + O(|tdb| ∗ (|t| + |f |)) unter der Annahme, dass A in O(n ∗ log(n)) sortiert
werden kann.
19
Erzeugung von häufigen Itemsets aus einem FP-Baum
Das Erzeugen von häufigen Itemsets aus einem FP-Baum (FPGrowth, Algorithmus
2.5) lässt sich in zwei Fälle unterteilen. Im ersten Fall (Zeile 2-6), wenn der bedingte FPBaum T nur noch einen einzigen Pfad p enthält, so werden die verbleibenden Knoten
β des Pfades p in allen möglichen Kombinationen mit dem bisherigen Itemset-Fragment
α kombiniert und erhalten als Support das Minimum aus β. Ausgegeben werden die
so erzeugten Itemsets, wenn der Support von β den minimalen Support ξ erreicht oder
überschreitet. Im anderen Fall (Zeile 7-14) ist nicht nur ein einzelner Pfad in T verbleibend, sondern ein ganzer Baum. Zunächst wird jedes im Baum vorhandene Item ai
mit dem bis hierher entstandenen Itemset-Fragment α zu einen Itemset β verbunden,
welches den Support von ai erhält und ausgegeben, wenn der Support des neuen Itemsets den minimalen notwendigen Support ξ erreicht. Hierfür werden alle Pfade von der
Wurzel T ’s ausgehend betrachtet, die mit dem Item ai enden. Die Items, aus denen die
Pfade bestehen, bilden die bedingte Itemset-Basis (conditional pattern base). Auf dieser
Grundlage wird dann der bedingte FP-Baum (conditional frequent pattern tree) von Tβ
für das Itemset-Frament β aufgebaut (ConstructFPTree, Algorithmus 2.3), der auf
β endende Transaktionen enthält.
Algorithmus 2.5 Frequent-Pattern Growth ([HPY00] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
input: T - a fp tree
input: α - itemset fragment
input: ξ - minimum support
output: β - itemsets
procedure FPGrowth(T , α, ξ)
if T contains a single path p then
for each combination β of nodes in path p do
generate pattern β ∪ α with suppport = min{s(β)}
if β.support > ξ then
Output β
else
for each ai in header-table of T do
generate pattern β = ai ∪ α with sup = ai .sup
if β.sup > ξ then
Output β
construct ai ’s cond. pattern base and then β’s cond. FP-Tree Tβ
if Tβ 6= ∅ then
call FPGrowth(Tβ , β, ξ)
Eine Abschätzung der Laufzeit des Algorithmus nach unten ist nicht möglich. Dieses
liegt an der Tatsache, dass sich in keinem Rekursionsschritt vorhersagen lässt, wie groß
die Anzahl und Größe der resultierenden Teilprobleme ist. Eine Abschätzung nach oben
ist jedoch möglich. Angenommen wird hierfür der Worst-Case, in welchem jede Trans-
20
aktion aus Items besteht, die in keiner anderen Transaktion auftreten. In diese Fall ist
der FP-Baum mindestens so groß wie die Transaktionsdatenbank. Für die Suche nach
frequent Itemsets muss nun im schlimmsten Fall jeder Zweig des Baumes abgearbeitet
werden, so dass die Laufzeit für eine Transaktionsdatenbank T mit einer durchschnittlichen Transaktionslänge maximal O(|T | · w) beträgt.
Beispiel Ein einfaches Beispiel für die Erzeugung von Itemsets aus einem FP-Baum
ist in Abbildung 2.4 zu sehen. In (1) ist zunächst der initiale FP-Baum dargestellt, in
welchem Itemsets gesucht werden sollen, die auf c enden. Der Teilbaum in (2) ist der
resultierende Teilbaum, der lediglich Transaktionen enthält, die auf c enden.
null:3
A:2
(1)
B:1
B:1
C:1
C:1
null:3
(2)
A:2
B:1
C:1
C:1
null:3
null:3
(3)
A:2
B:1
Abbildung 2.4: Beispiel für die Generierung von Itemsets aus einem FP-Baum
In (3) sind schließlich die bedingten Bäume zu sehen, die aus (2) für jedes häufige Itemset
in T gebildet werden. Die Abarbeitung wird rekursiv fortgesetzt, so dass schlussendlich
die beiden Itemsets {AC} und {BC} als Ergebnis geliefert werden.
21
Nachteile
Der FP-Growth Algorithmus ist hinsichtlich seiner Fähigkeit, große Datenmengen verarbeiten, ebenso eingeschränkt wie der Apriori-Algorithmus. Zwar entfällt der Aufwand
für Erzeugung und Verwaltung der Itemset-Kandidaten, trotzdem können bei großen
Transaktionsdatenbanken mit vielen Kandidaten die FP-Bäume eine Größe erreichen,
die nicht mehr im Hauptspeicher handhabbar ist. Zusätzlich spielt der minimale Support eine entscheidende Rolle. Je größer der minimale Support ist, desto kleiner fallen
die FP-Bäume und die Menge der gefundenen Itemsets aus. Für die Warenkorbanalyse
mag dies ein gangbarer Weg sein, bei Internet-Suchmaschinen etwa können jedoch auch
Itemsets mit geringem Support von Interesse sein.
2.3
Clusteranalyse
Die Clusteranalyse wird mit dem Ziel durchgeführt, Daten automatisch bzw. semiautomatisch in Kategorien, Gruppen(Cluster) oder Klassen einzuordnen. Das Ziel ist
hierbei, dass Objekte, die sich in demselben Cluster befinden, hinsichtlich der betrachteten Attribute möglichst ähnlich zueinander sind, Objekte aus verschiedenen Clustern
jedoch möglichst unähnlich zueinander sind. Allerdings ist es zuvor notwendig, den Begriff der Ähnlichkeit, der Aufgabenstellung entsprechend, geeignet zu modellieren. Im
Falle numerischer Attributwerte kann für die Berechnung der Ähnlichkeit beispielsweise
die euklidische Distanz (Gleichung 2.4) verwendet werden, wenn für die Attributwerte xi
der Datensätze x = (x1 , . . . , xd ) Kategorien verwendet werden, so ergibt sich die Distanz
aus der Anzahl der verschiedenen Komponenten zwischen unterschiedlichen Datensätzen
(Gleichung 2.5).
p
(x1 − y1 )2 + . . . + (xn − yn )2
(2.4)
(
0, wenn(xi = yi )
δ(xi , yi ) mit δ(xi , yi ) =
1, wenn(xi 6= yi )
(2.5)
dist(x, y) =
dist(x, y) =
d
X
i=1
Im Folgenden werden drei bekannte sequentielle Algorithmen für die hierarchische und
partitionierende Clusteranalyse vorgestellt und hinsichtlich ihrer Eignung unter Berücksichtung aktueller Anforderungen betrachtet.
22
2.3.1
Hierarchisches Clustering
Im agglomerativen hierarchischen Clustering wird zunächst jeder Datenpunkt als ein
Cluster aufgefasst. Anschließend werden diese Cluster Schritt für Schritt vereinigt, bis am
Ende ein einzelner Cluster übrig bleibt (Algorithmus 2.6). Ein hierarchisches Clustering
wird oftmals durch ein Dendrogram dargestellt (Abbildung 2.5).
4
5
3
2
1
1
2
3
4
5
Abbildung 2.5: Hierarchisches Clustering - Beispiel
Die Vereinigung der Cluster erfolgt über die Betrachtung der Distanz zwischen den
Clustern; in jeder Iteration werden die Cluster vereinigt, die die geringste Distanz, bzw.
die größte Ähnlichkeit, zueinander besitzen.
Algorithmus 2.6 Naiver Algorithmus für agglomeratives hierarchisches Clustering
([TSK05] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
input: set of points D
output: dendrogram d representing the clustering
procedure AgglomerativeHierarchical-Clustering(D: Set of points)
compute proximity matrix P from point-set D
initialize dendrogram d
let each data point ∈ D be a cluster
repeat
merge closest pair of clusters
adjust d to represent this merging
update P to reflect changed proximity between new and old clusters
until only one cluster left
return dendrogram d
Für die Berechnung der Distanz bzw. der Ähnlichkeit zwischen den Clustern gibt es
verschiedene Ansätze, unter anderem:
23
• Min: Verwendung der Distanz zwischen den nächsten Punkten zweier verschiedener Cluster (single link)
• Max: Verwendung der Distanz zwischen den am weitesten voneinander entfernten
Punkten zweier verschiedener Cluster (complete link)
• Group-Average: Distanz zweier verschiedener Cluster zueinander ist der Durchschnitt der paarweisen
Distanz zwischen den Punkten der Cluster c1 und c2 :
P
dist(c1 , c2 ) :
pi ∈c1 ,pj ∈c2
dist(pi ,pj )
|c1 |∗|c2 |
Die Laufzeit des naiven Algorithmus aus 2.6 liegt bei Nutzung einer Distanzmatrix und
n−1 Vereinigungen bei O(n3 ). Ein optimaler Algorithmus mit einer Laufzeit von O(n2 )
wird in Kapitel 3.2.4 vorgestellt.
2.3.2
Partitionierendes Clustering
Beim partitionierenden Clustering wird eine Aufteilung der Daten berechnet, so dass
jeder Punkt genau einem oder keinem Cluster zugeordnet ist. Zwei der bekanntesten
Ansätze werden im Folgenden vorgestellt.
k-Means
Durch den k-Means-Algorithmus (siehe Algorithmus 2.7) wird aus einer Menge von
Punkten eine Partitionierung in k disjunkte Gruppen berechnet. Hierfür werden zunächst k initiale Schwerpunkte, die nicht Teil der zu partitionierenden Punktemenge
sein brauchen, bestimmt und jeder Punkt dem ihm nächsten Schwerpunkt zugewiesen.
Algorithmus 2.7 k-Means Clustering ([TSK05] angepasst)
1:
2:
3:
4:
5:
6:
7:
input: D - dataset to be clustered, K - initial centroids
output: K - final centroids
procedure KMeansClustering(D, K)
assign each point to its nearest centroid k ∈ K
repeat
compute new centroids based on groups of points assigned to former centroids
assign each point to its nearest centroid k ∈ K
until centroids have not moved or centroid movement < δ
return K
Mittels dieser Zuordnungen werden nun neue Schwerpunkte berechnet und die Punkte
wieder dem nächsten Schwerpunkt zugeordnet. Dieser Vorgang wird so lange iteriert,
24
bis sich die Schwerpunkte nicht mehr ändern oder die Änderungen einen festzulegenden
Schwellwert unterschreiten. Die Berechnung der Schwerpunkte ist im k-Means Algorithmus die Summe der quadrierten Abweichungen (SSE) von den Clusterschwerpunkten;
das Ziel des Algorithmus ist die Minimierung dieser Abweichung.
SSE =
K X
X
dist2 (mi , x)
(2.6)
i=1 x∈Ci
Formal ist dies ist in der Gleichung 2.6 dargestellt. Für jeden Datenpunkt ist der Fehler
die Distanz zum nächstliegenden Schwerpunkt. x ist hierbei ein Datenpunkt des Clusters
Ci , mi ist der Schwerpunkt von Ci .
Die Berechnung der Schwerpunkte erfolgt, nachdem die Summe der quadrierten Abweichung berechnet wurde, nach Gleichung 2.7.
mi =
1 X
x
|Ci | x inC
(2.7)
i
Die aufwändigste Berechnung in k-Means ist die Bestimmung der Distanzen der Datenpunkte zu den Clusterschwerpunkten. Hierbei ist zu beachten, dass diese unabhängig
voneinander sind, also parallel berechnet werden können. Jedoch werden nach jeder Iteration die Clusterschwerpunkte angepasst; die Iterationen müssen folglich sequentiell
berechnet werden.
Abbildung 2.6: beispielhafte Darstellung des k-Means Algorithmus
Ein Beispiel ist in Abbildung 2.6 gezeigt; die Schwerpunkte sind hier als Kreise, die Datenpunkte als Quadrate dargestellt und die Farbe der Quadrate symbolisiert die Zuordnung zu Schwerpunkten. Im ersten Bild sind farblich die initialen Schwerpunkte hervorgehoben. Im zweiten Bild sind die Datenpunkte hinsichtlich der aktuellen Schwerpunkte
(blass gefärbt) dargestellt, während Schwerpunkte in kräftigem Grün und Rot die neu
25
berechneten abgebildet sind. Das dritte Bild stellt schließlich die zweite Iteration dar,
in der erneut die Zuordnungen aktualisiert und auf Basis dieser die Schwerpunkte neu
berechnet wurden.
Ausschlaggebend für die Komplexität des Algorithmus ist an erster Stelle die Anzahl der
clusternden Punkte n. Aber auch die Anzahl der Cluster K, die Anzahl der Punktattribute d sowie die Anzahl der Iterationen I ist zu berücksichtigen, wobei die Anzahl der
Cluster üblicherweise kleiner als die Anzahl der Punkte ist und nur wenige Iterationen
benötigt werden. Daraus ergibt sich, dass die Zeitkomplexität O(n ∗ K ∗ I ∗ d) ist.
Nachteile Ein Problem des k-Means-Algorithmus ist, dass er ohne Eingreifen von
außen keine optimale Lösung berechnet; die Qualität des Ergebnisses hängt stark von
Anzahl und Lage der gewählten Schwerpunkte ab, die zudem vorgegeben werden müssen.
Ein weiterer wichtiger Punkt ist, dass k-Means die Bildung kugelförmiger Cluster anstrebt. Dies liegt an der Minimierung der Abstände zwischen Datenpunkten und ClusterSchwerpunkten.
DBSCAN
Der DBSCAN-Algorithmus [EKSX96] ist ein dichtebasierter Algorithmus, der ein partitioniertes Clustering berechnet. Zu den wichtigsten Eigenschaften des Algorithmus zählt,
dass die Anzahl der Cluster nicht vorab festgelegt werden muss, da der Algorithmus
Cluster selbstständig erkennen kann. Weiterhin kann der DBSCAN-Algorithmus Cluster
von beliebiger Form erkennen und unterliegt somit auch hier keiner Limitierung wie der
k-Means Algorithmus.
p
ε
Abbildung 2.7: -Nachbarschaft
Ein zentraler Aspekt des Algorithmus ist die -Nachbarschaft (Abbildung 2.7). Als Nachbarschaft eines Punktes p werden alle Punkte bezeichnet, die sich innerhalb des Umkreises mit dem Radius befinden. Auf Grundlage der -Nachbarschaft lassen sich nun die
26
drei verschiedenen Klassen von Punkten definieren, die im DBSCAN-Algorithmus auftreten:
• Kernpunkte, die sich dadurch auszeichnen, dass die Anzahl von Punkten in ihrer
Nachbarschaft einen bestimmten Schwellwert überschreitet; diese werden als dicht
bezeichnet.
• Randpunkte, die selber nicht dicht sind, sich jedoch innerhalb der Nachbarschaft
eines Kernpunkts befinden. Diese werden auch dichte-erreichbar genannt
• Rauschpunkte, die weder dicht noch dichte-erreichbar sind
Grundlage des Algorithmus ist das Prinzip der Dichte-Verbundenheit. Dieses besagt,
dass zwei Punkte dichte-erreichbar sind, wenn sie durch eine Kette von Kernpunkten
erreichbar sind. Die Punkte, die durch diese dichte-erreichbaren Punkte verbunden sind,
bilden eine dichte-verbundene Menge, die auch als Cluster bezeichnet wird.
Der DBSCAN-Algorithmus (Algorithmus 2.8) startet mit einer Menge von zu clusternden Datenpunkten, einem Wert für , der die Größe der Nachbarschaft eines Punktes
festlegt, sowie einem Wert minP ts, der definiert, ab welcher Nachbarschaftsgröße ein
Punkt ein Kernpunkt ist. Zu Beginn sind zusätzlich alle Punkte als unklassifiziert
markiert. Der Algorithmus durchläuft die Punktmenge vollständig (Zeile 3-7), überprüft
jedoch nur für noch nicht klassifizierte Punkte, ob von diesen ausgehend ein Cluster gebildet werden kann. Die Beschränkung auf unklassifizierte Punkte erfolgt, da durch das
Prinzip der Dichte-Verbundenheit Cluster während ihrer Entdeckung bis zu ihrer maximalen Ausdehnung erweitert werden. Die Beschränkung verhindert folglich eine erneute
Betrachtung von bereits zugeordneten Punkten zum Zwecke der Suche und Erkennung
von Clustern.
Algorithmus 2.8 Der DBSCAN-Algorithmus ([EKSX96] angepasst)
1:
2:
3:
4:
5:
6:
7:
input: points - dataset to be clustered
input: ε - size of the neighborhood
input: minPts - points required for a core point
procedure DBSCAN(points, ε, minPts)
clusterId = 0
for i from 1 to points.size do
point = points.get(i);
if point.clusterId = UNCLASSIFIED then
if ExpandCluster(points, point, clusterId, ε, minPts) then
clusterId = clusterId + 1
Die Clustererweiterung (Algorithmus 2.9) startet mit der gesamten zu clusternden Punktmenge, dem zur Suche und Erweiterung gewählten Startpunkt, dem aktuellen Cluster-
27
Identifikator und den bereits aus Algorithmus 2.8 bekannten Größen und minP ts als
Eingabe.
Algorithmus 2.9 DBSCAN - Clustererweiterung ([EKSX96] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
input: points - dataset to be clustered
input: point - starting point for cluster expansion
input: clusterId - identificator of the current cluster
input: ε - size of the neighborhood
input: minPts - points required for a core point
output: boolean - indicates, whether cluster was found
procedure ExpandCluster(points, point, clusterId, ε, minPts)
seeds = points.regionQuery(point, eps)
if seeds.size < minPts then
points.changeClusterId(point, NOISE )
return FALSE
else
points.changeClusterIds(seeds, clusterId )
seeds.delete(point)
while seeds 6= ∅ do
currentPoint = seeds.first()
result = points.regionQuery(currentPoint, ε)
if result.size ≥ minPts then
for i from 1 to result.size do
resultP oint = result.get(i)
if resultP oint.clusterId IN (UNCLASSIFIED, NOISE ) then
if resultP oint.clusterId == UNCLASSIFIED then
seeds.append(resultPoint)
points.changeClusterId(resultPoint, clusterId )
seeds.delete(currentP oint);
return TRUE;
Als erstes wird für den Startpunkt die Nachbarschaft im Umkreis, der durch definiert
ist, abgefragt (Zeile 3-5) und in der Variable seeds gespeichert; diese wird später zur
Erweiterung des Clusters verwendet. Wenn die Größe der Nachbarschaft größer gleich
minP ts ist, so handelt es sich bei dem Punkt um einen Kernpunkt. Ist die Nachbarschaft
jedoch kleiner als minP ts, so wird der Punkt vorläufig als Rauschen klassifiziert; diese
Zuordnung ist jedoch noch nicht endgültig, da der Punkt noch von einem Kernpunkt
dichte-erreichbar sein kann. Falls es sich bei dem Punkt um einen Kernpunkt handelt,
so werden zuerst alle Punkte der Nachbarschaft mit dem aktuellen Cluster-Identifikator
versehen und der aktuelle Punkt gelöscht (Zeile 7-8). In den Zeilen 9 bis 19 findet nun die
eigentliche Erweiterung des Clusters statt. Aus der Menge seeds wird der erste Punkt
entnommen und die Nachbarschaft hinsichtlich abgefragt und in der Variable result
gespeichert. Wenn die Kardinalität von result größer gleich minP ts ist, so ist auch der
28
aktuelle Punkt ein Kernpunkt und wird zur Erweiterung des Clusters genutzt. Hierfür
wird die vollständige Nachbarschaft durchlaufen (Zeile 13 - 19) und diesen die aktuelle
Cluster-ID zugewiesen, wenn es sich um unklassifizierte oder Rausch-Punkte handelt
(Zeile 18). Unklassifizierte Punkte werden zusätzlich der Menge seeds hinzugefügt und
bei der Erweiterung des Clusters genutzt.
Der DBSCAN-Algorithmus (Algorithmus 2.8) durchläuft in seiner äußeren Schleife jeden
Punkt genau einmal. Die Suche nach Clustern mittels der Funktion ExpandCluster
wird jedoch nur für noch nicht-klassifizierte Punkte ausgeführt. Wenn eine Clustererweiterung (Algorithmus 2.9) initiiert wird, wird zunächst die Nachbarschaft des Ausgangspunktes mit einer räumlichen Suche bestimmt. Die Punkte im Ergebnis dieser Abfrage
werden zusammen mit unklassifizierten Punkten aus weiteren Nachbarschaftsanfragen
für weitere Suchen verwendet. Geeignete Indexstrukturen wie etwa ein R-Baum ermöglichen eine Nachbarschaftssuche in einer Laufzeit von O(log(n)), während die Laufzeit
ohne Indexunterstützung O(n2 ) beträgt. Folglich ergibt sich bei der Nutzung einer räumlichen Indexstruktur näherungsweise eine Laufzeit im Bereich von O(n ∗ log(n)) für den
DBSCAN-Algorithmus.
Beispiel
Eine beispielhafte Szene zur Beschreibung des DBSCAN-Algorithmus ist in Abbildung
2.8 abgebildet. Die Kreise stellen hier die Nachbarschaft der Punkte p und q dar, welche
einen Radius von besitzen. Weiterhin sei eine Kardinalität der Punktnachbarschaft von
5 notwendiges Kriterium, damit ein Punkt ein Kernpunkt ist.
p
ε
q
r
s
Abbildung 2.8: DBSCAN - Beispiel
Der Punkt p ist, gemäß vorheriger Definition, ein Kernpunkt, da er über eine genü-
29
gend große Nachbarschaft verfügt. Innerhalb dessen Nachbarschaft liegt der Punkt q,
der somit dichte-erreichbar von p ist. Zusätzlich ist q ein Kernpunkt, da die Kardinalität der Nachbarschaft von q ≥ 5 ist; q gehört folglich dem gleichen Cluster an wie
p. Für den Punkt r gilt, dass er von q dichte-erreichbar ist. Jedoch ist die Kardinalität
der Nachbarschaft von r < 5, somit ist r ein Randpunkt. Der Punkt s besitzt eine
Nachbarschafts-Kardinalität von s < 5 und ist somit kein Kernpunkt. Zusätzlich ist s
auch von keinem Kernpunkt erreichbar; s ist somit Rauschen bzw. ein Rauschpunkt.
Nachteile
Der gravierendste Nachteil des DBSCAN-Algorithmus tritt zu Tage, wenn der Datensatz
Gebiete mit sehr stark voneinander abweichenden Punktdichten enthält. Der Algorithmus ist bei derartigen Datensätzen nicht in der Lage, ein brauchbares Clustering zu
erzeugen, da es keine Kombination der Parameter und minP ts gibt, die für alle vorhandenen Cluster passt.
2.4
Die UNION-Find-Struktur
Bestimmte Probleme, wie beispielsweise hierarchisches agglomeratives Clustering, haben
zu Beginn eine Menge von Objekten, die jedes für sich eine eigene Menge bilden. Im weiteren Verlauf werden diese Mengen in einer bestimmten Reihenfolge zusammengefügt,
um eine gröbere Partitionierung zu bilden, die jedoch immer vollständig und disjunkt
bleibt. Zusätzlich finden Abfragen statt, um die Menge in Erfahrung zu bringen, die
ein bestimmtes Objekt enthält. Diese Anforderungen lassen sich durch die Operationen
Union und Find auf Mengen realisieren. Die Union-Operation führt hierbei die Vereinigung zweier Mengen durch, während die Find-Operation für eine gegebenes Element
die enthaltende Menge liefert.
ADT DisjointSet
Der abstrakte Datentyp DisjointSet (bzw. Union-Find-Struktur oder auch Merge-FindStruktur) besteht aus einer Menge von Teilmengen, hier als Komponenten bezeichnet und
den folgenden Operationen:
• Union(A, B) bestimmt die Vereinigung der Teilmengen [A] und [B] und definiert
als Namen ein repräsentatives Element, dass in der neu entstandenen Menge enthalten sein muss
• Find(x) liefert den Namen der Komponente zurück, in der x enthalten ist
30
• Initial erzeugt eine Partitionierung, die aus disjunkten ein-elementigen Mengen
besteht.
Ein einfache Implementierung auf Array-Basis findet sich in Algorithmus 2.10 unter der
Bezeichnung Merge-Find-Set(MFSET), basierend auf [AHU83]. Die Operation Initial
erzeugt ein neues MFSET, in der sich jedes Element in einer eigenen Partition bzw. Menge befindet. Die Menge, in der ein Element x enthalten ist, liefert die Find-Operation,
indem einfach der Wert für x im MFSET zurückgegeben wird. Und auch die Vereinigung einer Menge A mit einer Menge B ist in der array-basierten Implementierung des
MFSETs durch die Operation Merge sehr einfach; das Array, welches die Komponenten der Struktur enthält, wird von Anfang bis Ende durchlaufen und jedes Feld, das der
Menge B zugewiesen ist wird der Menge A zugewiesen.
Algorithmus 2.10 ADT DisjointSet über {1, . . . , n}, Array-Implementierung ([AHU83]
angepasst)
output: returns new MFSET
procedure Initial
C := Array[1, . . . , n] of [1, . . . , n]
for x := 1 to n do
C[x] := x
5:
return C
1:
2:
3:
4:
input: x - integer in the range {1, . . . , n}
input: C - an existing MFSET
output: integer of the class, that includes x
6: procedure Find(x, C)
7:
return C[x]
8:
9:
10:
11:
12:
input: A, B - integers in the range {1, . . . , n}, that represent a class
input: C - an existing MFSET
output: MFSET with merged classes A and B
procedure Merge(A, B; C)
for x = 1 to C.length do
if C[x] = B then
C[x] = A
return C
Die Laufzeitkomplexität für die Initialisierung eines MFSET beträgt O(n), die Abfrage
eines Wertes für eine Komponente x beträgt O(1). Die Implementierung zur Vereinigung
zweier Klassen benötigt eine Laufzeit von O(n), da für die Suche nach einer Klasse B das
vollständige MFSET durchlaufen werden muss. Wenn nun Komponente für Komponente
das MFSET mit der Vereinigungs-Operation zu einer einzigen Klasse vereinigt wird, so
31
werden maximal n − 1 Operationen ausgeführt, so dass die Laufzeit insgesamt O(n2 )
beträgt.
Eine deutliche Verbesserung der Laufzeit ist zu erreichen, wenn das MFSET als Baum
und nicht als Array implementiert wird. Diese ermöglicht für die Suche nach der Klasse
von Komponenten eine Laufzeit von O(log(n)), für die Vereinigung von zwei Klassen
sogar eine konstante Laufzeit von O(1).
2.5
Parallelrechner
Parallelrechner sind Computer, in denen Rechenoperationen nebenläufig ausgeführt werden können. Die Parallelität basiert auf der Grundlage, dass Probleme in kleinere Probleme zerlegt und parallel zeitgleich ausgeführt werden können. Parallelrechner lassen
sich grundsätzlich in der Ebene, auf der die Parallelisierung stattfindet, unterteilen. So
kann ein einzelner Computer bereits mehrere Rechenelemente beeinhalten, wobei dieser
als Multi-Cores im Falle mehrerer Kerne in einem Prozessor oder Multi-Prozessor im
Falle mehrerer Prozessoren bezeichnet wird; jedoch arbeiten diese Rechenelemente nicht
zwingend alle an der gleichen Aufgabe sondern werden teilweise beispielsweise durch
Aufgaben des Betriebssystems ausgelastet. Rechencluster und Rechengrids zeichnen sich
dadurch aus, dass sie ein Verbund von Rechnern sind, die gemeinsam an der gleichen
Aufgabe arbeiten.
Zugleich ist die Entwicklung von Software für Parallelrechner aufwändiger als für sequentielle Rechner, da Probleme durch gleichzeitigen Zugriff auf Datenstrukturen oder
Race-Conditions auftreten können, die zu nicht vorhersehbarem Programmverhalten führen können. Dieses führt dazu, dass Kommunikation und Synchronisierung zwischen den
parallel laufenden Tasks stattfinden muss; dieses ist gleichzeitig einer der Faktoren, der
maßgeblich für die Performance von paralleler Software verantwortlich ist, da nur ohne Kommunikation und Synchronisierung eine wirklich parallele Ausführung von Tasks
erfolgt.
Dieser Sachverhalt lässt sich anschaulich durch das Amdahl’sche Gesetz verdeutlichen.
Ein paralleles Programm besteht aus zwei wesentlichen Komponenten, einem sequentiellen Anteil (fs ) und einem parallelen Anteil (fp ). Diese sind normiert und bewegen
sich im Bereich 0 ≤ fs ≤ 1 und folglich fp = 1 − fs . Für einen Parallelrechner mit n
Prozessoren ergibt sich nun eine Beschleunigung (Speed-Up) von:
sp(n) ≤
1
fp
n
+ fs
≤
1
fs
Wenn die Anzahl der Prozessoren gegen unendlich geht, geht folglich die Ausführungszeit des parallelen Anteils gegen null, so dass für diesen Fall die Ausführungszeit des
parallelen Programms nach unten durch den sequentiellen Programmanteil beschränkt
32
ist. Folglich ist während des Entwurfs und der Implementierung auf eine möglichst starke
Entkopplung der Teilaufgaben zu achten, um Synchronisation und Kommunikation, und
somit den sequentiellen Anteil, möglichst klein zu halten.
Ein weiteres wichtiges Unterscheidungsmerkmal für Parallelrechner ist die Speicherarchitektur. Diese können einerseits einen gemeinsamen globalen Speicher besitzen (SharedMemory-Architektur), der über einen gemeinsamen Adressraum adressiert wird; die
Kommunikation zwischen Prozessoren erfolgt in diesem Fall implizit über Bereiche im
gemeinsamen Speicher. Andererseits kann jeder Prozessor über einen eigenen lokalen
Speicher mit eigenem Adressraum verfügen, auf den andere Prozessoren keinen Zugriff
haben; die Kommunikation zwischen Prozessoren erfolgt in diesem Fall explizit über
Nachrichten, was als Message-Passing-Architektur bezeichnet wird.
2.6
MapReduce
MapReduce[DG04] bezeichnet sowohl ein Programmiermodell als auch eine Implementierung für die Verarbeitung und Generierung großer Datenmengen. Kern des Programmiermodells sind zwei Phasen, die M ap- und die Reduce-Phase, welche inspiriert sind durch
die gleichnamigen Funktionen aus der funktionalen Programmierung. Programme, die
nach dem MapReduce-Programmiermodell entwickelt wurden, lassen sich automatisch
durch MapReduce-Implementierungen, wie beispielsweise Hadoop, auf einem Cluster mit
einer Vielzahl von Rechenknoten ausführen, ohne dass Erfahrung für die Entwicklung
paralleler Software oder verteilter Systeme notwendig ist. Ein weiterer wichtiger Faktor ist, dass die Ausführung in eine Vielzahl von Tasks aufgeteilt wird, die unabhängig
voneinander ausgeführt werden. Der Vorteil dieser Lösung ist, dass selbst der Ausfall
von Teilen des Rechenclusters nicht einen Neustart der Berechnung bedingt; Tasks, die
nach Ablauf einer bestimmten Frist nicht erledigt worden sind, werden einfach abermals
zur Bearbeitung freigegeben und von verbleibenden Teilen des Rechenverbunds durchgeführt. Im Folgenden wird nun zunächst das hinter MapReduce stehende Programmiermodell erläutert, anschließend wird der hinter einem MapReduce-Programm steckende
Prozess vorgestellt. Hadoop, eine MapReduce-Implementierung, wird in Abschnitt 2.6.3
präsentiert.
2.6.1
Programmiermodell
In der Map-Phase (2.8) wird die Map-Funktion parallel auf jedes Schlüssel-Wert-Paar
(k1 , v1 ) der Eingabe angewendet. Als Ergebnis liefert die Map-Funktion für ein eingegebenes Paar eine Liste von Schlüssel-Wert-Paaren. Im Anschluss werden vom MapReduceFramework alle Schlüssel-Wert-Paare mit gleichem Schlüssel aus allen Listen eingesam-
33
melt und unter Beachtung des Schlüssels gruppiert.
M ap(hk1 , v1 i) = list(hk2 , v2 i)
(2.8)
In der Reduce-Phase (2.9) wird die Reduce-Funktion parallel für jeden Schlüssel und die
dazugehörige Liste von Werten ausgeführt und die Liste in reduzierter Form, beispielsweise durch Aufsummierung der Werte, als Schlüssel-Wert-Paar zurückgegeben.
Reduce(hk2 , list(v2 )i) = hk2 , v3 i
(2.9)
MapReduce - Beispiel
Das kurze Beispiel in Quelltext 2.1 stellt ein MapReduce-Programm dar, welches für
ein übergebenes Dokument für jedes Wort die Anzahl der Vorkommen innerhalb des
Dokumentes zählt und ausgibt.
1
2
3
4
5
map ( String key , String value ):
# key : document name
# value : document contents
for each substring w in value :
Output <w ,1 >;
6
7
8
9
10
11
12
13
reduce ( String key , Iterator values ):
# key : a word
# value : a list of numbers
int result = 0;
for each v in values :
result += v ;
Output <key , result >;
Quelltext 2.1: MapReduce - Wortvorkommen in einem Dokument zählen [DG04]
Die Aufgabe der Map-Funktion (Zeile 1) ist in diesem Beispiel sehr einfach. Sie durchläuft
alle Wörter des übergebenen Dokuments (Zeile 4) und gibt für jedes gefundene Wort ein
Schlüssel-Wert-Paar der Form hWort, 1i aus. Die Reduce-Funktion summiert die Anzahl
der Vorkommen (Liste von ”values”) eines Wortes (”key”) auf und gibt diese zurück.
Eine beispielhafte Ausführung des Beispiels in Quelltext 2.1 ist in Abbildung 2.9 zu
sehen.
2.6.2
Der Map-Reduce Prozess
Zu Beginn des Prozesses wird zunächst eine Aufteilung der zu verarbeitenden Daten
in sogenannte Shards vorgenommen. Anschließend wird der Prozess mit der Map-Phase
34
Die Sonne scheint und der
Himmel ist blau. Es scheint
ein schöner Tag zu werden,
wenn die Sonne weiter
scheint.
Map
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
die , 1>
sonne , 1>
scheint , 1>
und , 1>
der , 1>
himmel , 1>
ist , 1>
blau , 1>
es , 1>
scheint , 1>
ein , 1>
schöner , 1>
tag , 1>
zu , 1>
werden , 1>
wenn , 1>
die , 1>
sonne , 1>
weiter , 1>
scheint , 1>
Reduce
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
blau , 1>
der , 1>
die , 2>
ein , 1>
es , 1>
himmel , 1>
ist , 1>
scheint , 3>
schöner , 1>
sonne , 2>
tag , 1>
und , 1>
weiter , 1>
wenn , 1>
werden , 1>
zu , 1>
Abbildung 2.9: MapReduce-Beispiel
gestartet, in dem für jeden Shard ein Map-Task erstellt wird, die unabhängig voneinander
ausgeführt werden.
Map
Reduce
Shard 1
Shard 2
Map
Ergebnis
Shard 3
Reduce
Map
Abbildung 2.10: Der MapReduce-Prozess
Wenn ein Map-Task gestartet wird, liest dieser den Teil der Eingabedaten, die ihm
zugeordnet wurden. Aus diesem parst er die Schlüssel-Wert-Paare und leitet diese an die
Map-Funktion weiter. Diese bearbeitet die Eingabe-Paare gemäß ihrer Definition und
gibt sie wieder als Schlüssel-Wert-Paare aus.
Die Ausgabedaten der Map-Tasks werden nun in der Reduce-Phase an Reduce-Tasks
verteilt, was als ”Shuffling” bezeichnet wird. Die Verteilung erfolgt über eine Partitionierungsfunktion, welche die Ergebnisse der Map-Tasks auf verschiedene disjunkte Partitionen verteilt, wobei für jede Partition ein Reduce-Task erzeugt wird. Die Partitio-
35
nierungsfunktion lässt sich beispielsweise einfach mittels einer Hash-Funktion umsetzen
(z.B. hash(key) mod R mit R = Anzahl der Reducer).
Nun durchläuft der Reduce-Tasks die Schlüssel-Wert-Paare der ihm zugewiesenen Partition und sortiert diese nach dem Schlüssel, so dass die Werte nach diesen gruppiert
sind. Die derart gruppierten Paare werden nun gruppenweise an die Reduce-Funktion
übergeben, von dieser bearbeitet, und in Form eines einzelnen Schlüssel-Wert-Paares
ausgegeben.
Mit der Combine-Phase steht zusätzlich eine optionale Funktionalität bereit, die gegebenenfalls Kommunikationskosten reduzieren kann. Funktional ist diese ähnlich wie die
Reduce-Phase, wird jedoch lokal auf den Knoten nach Abschluss der Map-Phase ausgeführt. Der Sinn dieser Phase ergibt sich aus der Betrachtung des Beispiels im Quelltext 2.1, dem Wortzählen. Jeder Knoten berechnet aus dem zugewiesenen Textfragment
Schlüssel-Wert-Paare der Form h0 gefundenes Wort0 ,0 10 i. So gibt es beispielsweise eine
Häufung von Wörtern, etwa ’und’, ’der’ oder ’das’. Eine Reduktion der Kommunikationskosten lässt sich erreichen, indem diese Paare bereits lokal auf den Knoten zusammengefasst werden, so dass beispielsweise nicht 50 Paare h0 das0 ,0 10 i ausgegeben werden,
sondern nur h0 das0 ,0 500 i.
2.6.3
Hadoop
Hadoop ist eine MapReduce-Implementierung, die unter dem Dach der Apache Software Foundation entwickelt wird. In der Version 2.2.0, die in dieser Arbeit genutzt wird,
besteht Hadoop aus drei wesentlichen Komponenten: Hadoop Distributed Filesystem,
Hadoop YARN und Hadoop MapReduce, wobei Hadoop MapReduce das Programmiermodell darstellt, welches in allgemeiner Form bereits in Abschnitt 2.6.1 auf Seite 33 vorgestellt wurde. Alle Komponenten wurden vor dem Hintergrund entworfen, dass Ausfälle
von einzelnen Rechnern oder ganzen Racks in großen verteilten Systemen an der Tagesordnung sind; die Erkennung und Behandlung von Ressourcen-Ausfällen ist aus diesem
Grund direkt in Hadoop implementiert worden.
Hadoop Filesystem
Das Hadoop Filesystem (HDFS) ist ein Dateisystem, welches entwickelt wurde, um verschiedene Anforderungen zu erfüllen. Zum einen soll das Datensystem in der Lage sein,
sehr große Dateien mit einer Größe von mehreren hundert Terabytes zu speichern. Zum
anderen soll der Dateizugriff in Form von Streams möglich sein. Hintergrund hierfür ist,
dass für die Analyse Daten einmalig geschrieben, aber vielfach gelesen werden. In jeder
durchgeführten Datenanalyse werden die gesamten zu analysierenden Daten oder große
Teile davon gelesen, so dass der ausschlaggebende Performance-Faktor für eine Leseope-
36
ration die Gesamtzeit für das Lesen der gesamten Daten ist, nicht der Zugriff auf einzelne
Teile einer Datei. Außerdem soll das Dateisystem auf normaler Standard-Hardware arbeiten, die im Gegensatz zu wesentlich teurerer hoch-zuverlässiger Hardware, bei einer
großen Anzahl von Knoten in einem verteilten System eine hohe Ausfallwahrscheinlichkeit für einzelne Knoten hat. Im Falle eines Komponentenausfalls innerhalb des verteilten
Systems soll das Dateisystem ohne Unterbrechungen für den Anwender weiter funktionieren; dies bedeutet, dass das Dateisystem trotz eines Ausfalls von Knoten weiter verfügbar
bleibt und ein Ausfall zu keinem Datenverlust führt.
HDFS benutzt als grundlegende Aufteilung des Speichers, wie andere Dateisysteme,
Blöcke von Speicher. Jedoch ist die Blockgröße mit 64 Megabyte oder mehr bedeutend größer, als dies bei Dateisystemem für Standard-Massenspeicher, wie beispielsweise
NTFS oder EXT4, mit 4 Kilobyte. Dateien werden in Stücke entsprechend der Blockgröße aufgeteilt und unabhängig voneinander gespeichert. Im Unterschied zu normalen
Dateisystemen belegt ein Block, der nicht vollständig gefüllt ist, jedoch nicht den kompletten Speicher des Blockes im zugrunde liegenden Speichers sondern nur den tatsächlich
vom Block verwendeten Speicher.
Die Abstraktion des Speichers in Blöcke bringt einige Vorteile mit sich. Zum einen ist
es möglich, dass Dateien größer sein können als ein einzelner Massenspeicher der lokalen
Knoten. Außerdem vereinfacht die Verwendung von Blöcken die Replikation, um die
Verfügbarkeit und Fehlertoleranz zu verbessern. Um mit nicht verfügbaren Blöcken in
Folge von Hardware-Fehlern oder dem Ausfall von Knoten umzugehen, werden Blöcke
auf eine vom Administrator festzulegende Anzahl von Knoten repliziert, so dass diese
gegebenenfalls von einem anderen Knoten gelesen werden kann.
Das verteilte Dateisystem wird aus zwei verschiedenen Typen von Knoten aufgebaut,
einem Namenode (Master) und mehreren Datanodes (Worker). Der Namenode verwaltet das Dateisystem in Form eines Dateisystem-Baumes und Metadaten für alle Dateien
und Verzeichnisse des Dateisystems. Diese werden lokal auf dem Namenode persistent
gespeichert. Zusätzlich verfügt der Namenode hinsichtlich der Blöcke, aus denen eine
Datei besteht, über die Information, auf welchen Datanodes diese physikalisch gespeichert sind. Diese wird jedoch nicht persistent gespeichert sondern während des Starts
des Dateisystems aus den übermittelten Informationen der Datanodes rekonstruiert.
Der Zugriff auf das Dateisystem durch Nutzer erfolgt über einen Client, der nach Aufruf
durch den Nutzer mit dem Namenode und den Datanodes kommuniziert. Die Schnittstelle für den Dateizugriff auf das verteilte Dateisystem orientiert sich stark am Portable
Operating System Interface(POSIX), so dass der Nutzer kein Wissen über die Funktionsweise des Namenodes oder der Datanodes benötigt.
Wie zuvor bereits angedeutet sind die Datanodes für den Großteil der Arbeit des verteilten Dateisystems verantwortlich. Auf Anweisung von Clients oder des Namenodes
speichern und Empfangen sie Datei-Blöcke und liefern periodisch eine Liste der von
37
ihnen gespeicherten Blöcke an den Namenode.
Aus dem Aufbau des Dateisystems ist ersichtlich, dass der Namenode eine zentrale Komponente des verteilten Dateisystems und ein Single-Point-of-Failure ist. Tatsächlich kann
das Dateisystem ohne den Namenode nicht benutzt werden und als schwerwiegender
Nachteil kommt zusätzlich hinzu, dass ein Ausfall des Namenodes zum Verlust aller
Dateien des Dateisystems führt, da nur der Namenode über die Informationen zur Rekonstruktion der Dateien aus den Blöcken verfügt.
YARN
Das Yet-Another-Resource-Negotiator (YARN)-Framework ist die zentrale Komponente von Hadoop, welche zum einen die Ressourcen in Form von Rechenknoten verwaltet und zum anderen für das Scheduling der von Benutzern gestarteten MapReduceAnwendungen verantwortlich ist.
YARN erfüllt diese zwei wesentliche Aufgaben durch zwei verschiedene Dienste sind. Einerseits werden die Berechnungs-Ressourcen des Clusters durch den Resource Manager
verwaltet, andererseits steuert und überwacht YARN den Lebenszyklus von Anwendungen (z.B. Verfolgung des Ausführungsstatus von Tasks, fehlgeschlagenen Tasks neu
starten), die auf dem Cluster ausgeführt werden, wobei jedes MapReduce-Programm
einen eigenen Application-Master besitzt.
Die Grundidee ist, dass der Application-Master mit dem Resource-Manager die Nutzung
von Resourcen des Clusters in Form von Containern mit festgelegten Speicherlimits aushandelt und die Prozesse der MapReduce-Anwendung in diesen Containern ausführt. Die
Überwachung der Container erfolgt von den Node Managern, die auf jedem Clusterknoten laufen und dafür Sorge tragen, dass die Anwendungen sich an die durch die Container
vorgegebenen Ressourcenbeschränkungen halten.
Für die Ausführung einer MapReduce-Anwendung mit dem YARN-Framework sind die
folgenden Komponenten notwendig:
• Ein Client, der die auszuführende MapReduce-Anwendung übermittelt
• Der YARN-Resource-Manager, welcher die Ressourcenzuweisung des Clusters koordiniert
• Die YARN-Node-Manager, die Container für Tasks auf Clusterknoten starten und
überwachen
• Der Application-Master der MapReduce-Anwendung, der die Ausführung von Tasks
der Anwendung überwacht. Der Application-Master und die Tasks werden in Con-
38
tainern ausgeführt, die vom Resource-Manager zugeteilt werden und durch die
Node-Manager verwaltet werden.
• Das verteilte HDFS-Dateisystem, welches genutzt wird, um Dateien der Tasks
zwischen den Komponenten auszutauschen.
MapReduce
program
1: run job
2: get new application
Job
Resource Manager
4: submit applications
client JVM
client node
resource manager node
5a: start container
8: allocate resources
Node Manager
3: copy job
resources
5b: launch
6: initialize
job
Node Manager
9a: start
container
MRAppMaster
9b: launch
task JVM
node manager node
YarnChild
7: retrieve
input splits
Shared
FileSystem
(e.g. HDFS)
10: retrieve job resources
11: run
MapTask
or
ReduceTask
node manager node
Abbildung 2.11: Ablauf der Ausführung einer MapReduce-Anwendung mit dem YARNFramework [Whi12]
Der Ablauf einer MapReduce-Anwendung ist in Abbildung 2.11 dargestellt und beginnt,
indem die Anwendung, auch Job genannt, auf einem Client gestartet wird (1).
Für diesen Job wird vom Resource Manager ein Identifikator angefordert (2), um diesen
und die zugehörigen Daten im System zuordnen zu können.
Der Client berechnet nun die Input-Splits, bzw. die Aufteilung der Eingabedaten, und
kopiert diese Information zusammen mit der Anwendungskonfiguration und der Anwendung, welche in Form eines JAR-Archivs an Hadoop übergeben wird, in das verteilte
Dateisystem (3).
39
Die Anwendung wird nun zur Ausführung an den Resource Manager übergeben (4).
Der Resource-Manager liefert die Anfrage zur Ausführung einer Anwendung an einen
Job-Scheduler weiter, der einen Container anfordert, in welchem der Resource-Manager
eine Instanz des Application Masters unter Verwaltung des Node Managers startet (5a
und 5b).
Der Application Master startet die Ausführung der Anwendung, in dem zunächst eine
Reihe von Variablen zur Verfolgung des Fortschritts der Anwendung initialisiert werden,
da im Verlauf der Anwendung Fortschrittsmeldungen von Tasks und Meldungen über
abgeschlossene Tasks beim Application Master eingehen (6).
Im Anschluss wird die vom Client berechnete Aufteilung der Eingabedaten vom Application-Master geladen und ein Map-Task für jeden Teil der Eingabe erstellt; zusätzlich
wird eine festgelegte Anzahl von Reduce-Tasks erzeugt (7).
Nun fordert der Application Master vom Resource Manager Ressourcen in Form von
Containern für die Ausführung der Map- und Reduce-Tasks an (8). Die Anforderungen
enthalten zusätzlich Information hinsichtlich der Lokalität von zu Tasks gehörenden Daten, insbesondere Informationen, auf welchen Knoten und in welchen Racks der Teil der
Eingabe liegt. Diese Informationen können vom Scheduler für die Entscheidung genutzt
werden, um Tasks im Idealfall auf einem Knoten auszuführen, auf dem die Daten bereits
lokal vorhanden sind, mindestens jedoch auf einem Knoten im selben Rack. Mit den
Anfragen sind zusätzlich Anforderungen von Speicher verbunden, so dass für einzelne
Tasks gezielte Mengen von Speicher für die Container angefordert wird.
Sobald einem Task durch den Resource Manager ein Container zugewiesen wurde, startet
der Application Master den Container durch Benachrichtigung des Node Managers (9a
und 9b).
Der Task wird durch eine Java-Anwendung in einer Java Virtual Machine (JVM) ausgeführt. Jedoch müssen zuvor die Daten geladen werden, die für die Ausführung des Tasks
erforderlich sind, unter anderem die Konfiguration des Tasks und das JAR-Archiv, welches die MapReduce-Anwendung enthält (10).
Schließlich wird die der Map- oder Reduce-Task ausgeführt (11).
Fehlerbehandlung
Zusätzlich ist für MapReduce-Programme, die unter Nutzung des YARN-Frameworks
ausgeführt werden, der Fehlerfall zu betrachten. Dies betrifft Tasks, den Application
Master, Node Manager und den Resource Manager.
Falls ein laufender Tasks fehlschlägt, wird diese Information an den Application Master
40
übermittelt und der Ausführungsversuch des Tasks als fehlgeschlagen markiert. Ebenso werden Tasks als fehlgeschlagen markiert, die sich nicht innerhalb eines bestimmten
Zeitintervalls beim Application Master melden. Die fehlgeschlagenen Tasks werden vom
Application Master erneut zur Ausführung vergeben und ein erneuter Ausführungsversuch unternommen, wobei die Anzahl der Versuche variabel ist.
Ebenso wie einzelne Tasks kann auch für Ausführung einer kompletten MapReduceAnwendung mehrere Versuche unternommen werden. Ein Application Master sendet
periodisch Lebenszeichen (Heartbeats) an den Resource Manager, so dass der Resource
Manager einen Fehler eines Application Masters an ausbleibenden Lebenszeichen erkennen, und eine neue Instanz starten kann. Hierbei ist es möglich, den Status eines
fehlgeschlagenen Application Masters wiederherzustellen; bereits erfolgreich ausgeführte
Tasks müssen so nicht nochmals ausgeführt werden.
Beim Ausfall eines Node Managers hört dieser auf, Lebenszeichen an den Resource Manager zu senden. Dieser kann ihn daraufhin aus dem Pool verfügbarer Knoten entfernen.
Tasks oder Anwendungen, die auf diesem Knoten laufen, können gemäß der Vorgehensweisen von fehlgeschlagenen Tasks oder Fehlern des Applications Masters wiederhergestellt werden.
Am schwerwiegendsten ist der Ausfall des Resource Managers, da ohne diesen weder
Anwendungen noch Tasks gestartet werden können. Jedoch wurde dieser so entworfen,
dass periodisch der Zustand persistent gespeichert wird und eine neue Instanz des Resource Managers daraus wiederhergestellt werden kann. Der Zustand umfasst dabei die
Node Manager und laufende Anwendungen.
2.6.4
Ein- und Ausgabeformate
Ein weiterer Aspekt sind die Ein- und Ausgabeformate der Daten, die von Hadoop
verarbeitet und generiert werden. Die Eingabeformate lassen sich grob in zwei Kategorien
aufteilen: Dateieingaben und Datenbankeingaben. Die Dateieingaben wiederum lassen
sich unterteilen in einfache Textdateien und Binärdateien.
Textdateien sind unstrukturierte Eingabedaten die auf verschiedene Arten verarbeitet
werden können. Zwei Beispiele hierfür sind das TextInputFormat und das KeyValueInputFormat. In ersterem entspricht jede Zeile einem Datensatz, wobei der Schlüssel der
Byteoffset des Zeilenbeginns in der Eingabedatei ist und der Wert die gelesene Zeile.
Zweiteres sind zum Beispiel CSV-Dateien mit zwei Spalten pro Zeile, so dass die erste
Spalte der Schlüssel und die zweite der Wert ist.
Binärdateien, sogennante SequenceFiles, sind Dateien, in denen Schlüssel-Wert-Paare
strukturiert gespeichert werden. Strukturiert bedeutet in diesem Fall, dass zwischen jedem Schlüssel-Wert-Paar eine Marke existiert, so dass diese sauber voneinander getrennt
41
werden können. Diese haben den zusätzlichen Vorteil, dass sie komprimiert werden können und serialisierbare Datentypen nutzen können. Hadoop nutzt für die Serialisierung
ein eigenes Format, sogenannte Writables. Diese sind von zentraler Bedeutung für Hadoop, da diese oft bei der Ein- und Ausgabe von Tasks als Datentypen für die SchlüsselWert-Paare verwendet werden und somit im verteilten Dateisystem gespeichert und aus
diesem gelesen werden. Die beiden wichtigsten Vorteile von diesen sind, dass sie ein kompaktes Speichern und durch geringen Overhead schnelles Lesen und Schreiben erlauben.
Datenbankeingaben erlauben das Einlesen von Daten aus Datenbanken über JDBCTreiber. Dieses hat jedoch den Nachteil, dass es kein Sharding, also eine Aufteilung der
Eingabe, erlaubt, welches im verteilten Dateisystem durch die Speicherung in Blöcken
implizit geschieht.
Die Ausgabeformate von Hadoop entsprechen den Eingabeformaten, so dass diese nicht
nochmals aufgeführt werden.
42
Kapitel 3
Parallele Algorithmen
In den letzten Jahren führt eine immer stärkere Nutzung der Informationstechnik dazu,
dass immer größere Mengen von Daten anfallen. Sequentielle Data Mining-Algorithmen
sind aus verschiedenen Gründen nicht mehr in der Lage, diese Datenmengen zu bewältigen. Zum Einen steigt die Berechnungszeit in inakzeptable Höhen, zum Anderen sind
die Datenmengen derart gewaltig, so dass diese von einzelnen Computern nicht mehr zu
handhaben sind. Aus diesem Grund wurden verschiedene Verfahren aus dem Bereich der
Assoziations- und Clusteranalyse vorgeschlagen, um diese Herausforderungen zu bewältigen und eine effiziente Analyse der Daten zur ermöglichen. Von diesen wird in diesem
Kapitel eine Auswahl von Verfahren vorgestellt, die auf verschiedenen Parallelisierungsansätzen basieren.
Auf eine Betrachtung der Laufzeitkomplexität der MapReduce-Algorithmen wird in dieser Arbeit verzichtet, da sich noch kein Modell zur Beschreibung von dieser etabliert
hat.
3.1
3.1.1
Algorithmen zur Assoziationsanalyse
Partition-Algorithmus
Der Partition-Algorithmus [SON95](Algorithmus 3.1) benötigt für das Berechnen von
frequent Itemsets in Abhängigkeit eines minimalen Supports ξ in einer Menge von Transaktionen zwei vollständige Suchläufe auf diesem Datenbestand.
In der ersten Phase werden alle potentiellen frequent Itemsets durch eine Suche auf
dem gesamten Datenbestand bestimmt. Hierfür werden die Daten in nicht überlappende
Partitionen aufgeteilt. Auf diesen Partitionen wird unabhängig voneinander die Menge
43
der lokalen frequent Itemsets berechnet. Die Mengen der generierten lokalen frequent
Itemsets werden anschließend zu einer Menge zusammengeführt, der Kandidatenmenge
für potentielle globale frequent Itemsets.
Zu Beginn der zweiten Phase wird zunächst für jedes Itemset der globalen Kandidatenmenge ein Zähler erzeugt. Nun wird der tatsächliche Support der Kandidaten auf der
gesamten Datenbank berechnet, indem die Vorkommen der Kandidaten in jeder Partition gezählt und die Zähler entsprechend erhöht werden. Anschließend lässt sich durch
einen einfachen Vergleich mit dem minimalen Support die Menge der frequent Itemsets
berechnen.
Im Gegensatz zum Apriori-Algorithmus, der global auf der vollständigen Transaktionsdatenbank arbeitet, besitzt der Partition-Algorithmus durch die Partitionierung auch
eine lokale Komponente. Es findet eine Unterscheidung zwischen lokalen und globalen
frequent Itemsets statt. Die verschiedenen auftretenden Itemset-Menge sind in Tabelle
3.1 dargestellt. Zusätzlich sind einige Begriffe zu definieren, die für den PartitionAlgorithmus von Bedeutung sind. Im Kontext dieses Verfahrens ist eine Partition p ∈
D ein Ausschnitt aus der gesamten Menge von Transaktionen, wobei diese als nichtüberlappend definiert sind. Zwei Partitionen sind nicht überlappend, wenn
T sie paarweise disjunkt hinsichtlich der enthaltenen Transaktionen sind, formal: pi pj = ∅ |
∀i, j : i 6= j. Für diese wird nun ebenfalls ein lokaler minimaler Support definiert: dieser bezeichnet den Anteil der Transaktionen in der Partition, die ein Itemset benötigt,
um als lokales frequent Itemset bezeichnet zu werden. Festgelegt ist der Wert des lokalen
minimalen Supports auf ξ/|D|; ein Itemset ist also ein potentielles globales frequent Itemset, wenn dieses in mindestens einer Partition lokal häufig ist. Itemsets, die gegen den
lokalen Support auf Häufigkeit getestet werden, werden als lokale Kandidaten-Itemsets
bezeichnet.
Ckp
Lpk
Lp
CkG
CG
LG
k
Menge
Menge
Menge
Menge
Menge
Menge
der lokalen Kandidaten der k-Itemsets in Partition p
der lokalen frequent k-Itemsets in Partition p
aller lokalen frequent Itemsets in Partition p
aller Kandidaten der globalen k-Itemsets
aller globalen Itemsets-Kandidaten
aller globalen frequent k-Itemsets
Tabelle 3.1: Übersicht und Notation der verschiedenen Itemset-Mengen
Globaler Support, globale frequent Itemsets und globale Itemset-Kandidaten sind wie die
vorher genannten lokalen Varianten definiert, sie beziehen sich jedoch nicht auf eine
Partition p ∈ D sondern auf die vollständige Menge von Transaktionen D.
44
Algorithmus 3.1 Partition-Algorithmus ([SON95] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
input: D - database of transactions
input: n - number of partitions
input: ξ - minimum support
output: LG - list of global frequent itemsets
procedure Partition(D, n, ξ)
P = Partition Database(D)
τ = ξ/n
for i = 1 to n in parallel do
Read In Partition(pi ∈ P )
Li = Gen Large Itemsets(pi , τ )
for (i = 2;SLji 6= ∅, i = 1, 2, . . . , n; i + +) do
CiG = j=1,2,...,n Lji
for i = 1 to n in parallel do
Read In Partition(pi ∈ P )
Gen Count(C G , pi )
LG = {c ∈ C G |c.count ≥ ξ}
return frequent itemsets LG
. τ - lokaler minimaler Support
. Erzeugung der Kandidaten
. Merge der lokalen Kandidaten
. Berechnung des globalen Supports
Ablauf
Zu Beginn des Algorithmus wird die Transaktionsdatenbank D in n logische Partitionen
aufgeteilt. In der ersten Phase wird in n Iterationen parallel mit dem Algorithmus 3.2
für die jeweilige Partition pi als Eingabe die Menge Li der lokalen frequent Itemsets der
Längen 1, . . . , l berechnet mit Li = Li1 ∪ Li2 ∪ . . . ∪ Lil .
Anschließend werden sequentiell die berechneten Kandidaten-Mengen lokaler frequent
Itemsets aller n Partition zu einer großen Menge vereinigt, der Kandidatenmenge für
globale frequent Itemsets.
In der zweiten Phase wird nun parallel in jeder Partition der Support jedes Kandidaten mit der Funktion Gen Count gezählt und anschließend aufsummiert. Kandidaten,
deren Support über dem globalen minimalen Support ξ liegen werden in die Menge
LG , der Menge der globalen frequent Itemsets, eingefügt und am Ende des Algorithmus
zurückgegeben.
Erzeugung lokaler frequent Itemsets
Dier Erzeugung lokaler frequent Itemsets mit Algorithmus 3.2 beginnt mit einer Partition der Transaktionsmenge und dem lokalen minimalen Support τ . Zunächst wird aus
der Transaktionsmenge die Menge aller lokalen frequent 1-Itemsets berechnet und jedes
45
dieser Itemsets die Liste der Transaktionen angehängt, in denen das Itemset enthalten
ist. Nun beginnt die Erzeugung der lokalen frequent Itemsets mit der Länge k aus Kandidaten der Länge k−1 (Zeilen 3-11). Hierfür wird in einer inneren und äußeren Schleife
jeweils über die Kandidaten mit der Länge k−1 iteriert. Wenn zwei Kandidaten l1 und
l2 nun bis zur k − 1-ten Stelle identisch sind und sich an dieser unterscheiden, so dass
l1 [k−1] < l2 [k−1], so wird ein neues Itemset c generiert, welches bis zur k − 2-ten Stelle
identisch ist und dann l1 [k−1] und anschließend l2 [k−1] angehängt.
Algorithmus 3.2 Generierung großer Itemsets im Partition-Algorithmus ([SON95]
angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
input: p - database partition
input: τ - local minimum support
output: Lp - list of local frequent itemsets in p
procedure Gen Large Itemsets(p, τ )
Lp1 ={large 1-itemsets along with their tidlists}
for (k = 2; Lpk 6= ∅; k + +) do
for all itemsets l1 ∈ Lpk−1 do
for all itemsets l2 ∈ Lpk−1 do
if l1 [1] = l2 [1]∧l1 [2] = l2 [2]∧. . .∧l1 [k−2] = l2 [k−2]∧l1 [k−1] < l2 [k−1]
then
c = l1 [1] · l1 [2] · · · l1 [k − 1] · l2 [k − 1]
if Prune(c) == false then
c.tidlist = l1 .tidlist∩l2 .tidlist
if |c.tidlist| / |p| ≥ τ then
Lpk = Lpk ∪ {c}
return Lp
Für diesen Kandidaten ist erst zu testen, ob dieser auf Grund des Apriori-Prinzips (Abschnitt 2.2.1) verworfen werden kann. Dieses passiert mit Algorithmus 3.3, indem für alle
k − 1-Teilmengen getest wird, ob diese in Kandidaten-Menge lokaler frequent Itemsets
der Länge k −1, Lk−1 enthalten ist. Wenn dies der Fall ist, wird die Schnittmenge der
Transaktionen der Itemsets l1 und l2 , aus denen der Kandidat c enstanden ist, berechnet
und an c angehängt.
Wenn nun die Anzahl der Transaktionen, in den c enthalten ist, durch die Transaktionszahl der Partition p größer ist als der lokale minimale Support τ , so wird c zu der Menge
der lokalen frequent Itemsets der Länge k hinzugefügt. Dieser Vorgang wird solange fortgesetzt, bis in einer Iteration keine neuen Itemsets mehr generiert werden. Anschließend
wird die Liste Lp mit allen lokalen frequent Itemsets zurückgegeben.
46
Algorithmus 3.3 Der Pruning-Schritt im Partition-Algorithmus ([SON95] angepasst)
1:
2:
3:
4:
5:
6:
input: c - k-itemset
output: boolean that indicates, whether c can be pruned
procedure Prune(c: k-itemset)
for all (k − 1)-subsets s of c do
if s ∈
/ Lk−1 then
return true
else
return false
Erzeugung der globalen frequent Itemsets
Das Zählen der Vorkommen aller Itemsets c aus C G in einer Partition p beginnt zunächst,
in dem für jedes Vorkommende 1-Itemset eine Liste erzeugt wird, in welchen Transaktionen dieses vorkommt. Die Menge von Transaktion, die c enthält, ergibt sich, indem
Schritt für Schritt aus den Items von c der Schnitt der Transaktionslisten berechnet wird.
Dieses wird für alle c ∈ C G durchgeführt und abschließend C G zurückgegeben.
Algorithmus 3.4 Berechnung des Supports der Kandidaten in einer Partition ([SON95]
überarbeitet)
1:
2:
3:
4:
5:
6:
input: C G - list of all itemset candidates
input: p - partition
output: count of all c ∈ C G in p
procedure Gen Count(c, p)
for all 1-itemsets do
generate the tidlist for partition p
for all c ∈ C G do
T
T T
c.count = c[1].tidlist c[2].tidlist . . . c[k].tidlist
return C G
3.1.2
Laufzeit
Die Laufzeit des Partition-Algorithmus setzt sich im wesentlichen aus zwei Teilen zusammen, der Erzeugung von lokalen frequent-Itemset-Kandidatenmengen und der Bestimmung des Supports der globalen Kandidatenmenge.
Als erstes ist für die Bestimmung der frequent 1-Itemsets eine Laufzeit von O(|p| · w)
nötig, wobei w die durchschnittliche Transaktionslänge bezeichnet. Die Erzeugung der lokalen Kandidatenmengen erfolgt ähnlich zum Apriori-Algorithmus aus Abschnitt 2.2.1.
Für die Erzeugung eines neuen Kandidaten sind maximal k − 2 Vergleichsoperationen
47
auszuführen. Diese ist für alle Itemset-Paare der vorherigen Iteration durchzuführen.
Für das Pruning der Kandidaten sind, wie im Apriori-Algorithmus, alle k −1 Teilmengen hinsichtlich der Häufigkeit des Vorkommens zu überprüfen; zusätzlich sind hierfür
ebenfalls k − 1 Zugriffe auf eine Datenstruktur zur Speicherung der frequent Itemset
Kandidaten nötig, welche im Falle eines HashSets in O(1) erfolgen. Die Vereinigung der
Listen mit den Transaktions-Identifikatoren schließlich erfordert den einmaligen Durchlauf beider Listen, sofern
sind. Insgesamt erfolgt die Kandidatenerzeugung
Pdiese sortiert
p
2
also in O(|p| · w) + O( w
|L
|
·
((k
− 2) + k · (k − 1) + (|l1 | + |l2 |))). Wie für den
k−1
k=2
Apriori-Algorithmus gilt auch hier, dass die Laufzeit für einen sehr niedrigen Support
n
exponentiell sein; die Laufzeit-Komplexität liegt also bei O( 2p )
Für die Berechnung der Vorkommen der Kandidatenmenge ist zunächst die Bestimmung
der Listen mit den Transaktions-Identifikatoren für die 1-Itemsets erforderlich. Wie in
der im Schritt zur Kandidatengenerierung benötigt dies eine Laufzeit von O(|p| · w).
Für die Zählung des globalen Supports werden nun alle Kandidaten durchlaufen und die
Schnittmengen der Listen mit den Transaktions-Identifikatoren gebildet. Diese kann in
O(|C G | · ((k − 1) · (2 · l))) ausgeführt werden, wobei l die durchschnittliche Länge der
Listen ist.
Analog zum Apriori-Algorithmus ergibt sich für einen sehr niedrigen notwendigen Support eine exponentielle Laufzeit, so dass die Partition-Algorithmus im schlechtesten Fall
n
O( 2p ) beträgt.
3.1.3
MapReduce-Partition
Der Partition-Algorithmus aus dem vorherigen Abschnitt lässt sich auf relativ einfache
Weise auf das MapReduce-Paradigma übertragen [RU11]. Hierfür werden zwei aufeinander folgende MapReduce-Phasen verwendet. In der ersten Phase lesen die Mapper die
zugewiesenen Eingabepartitionen ein und generieren auf diesen lokale frequent Itemsets,
welche ausgegeben werden. Die Reducer der ersten Phase lesen diese ein und fassen
diese zu der Kandidatenmenge für potentielle globale frequent Itemsets zusammen. In
der zweiten Phase berechnen die Mapper für die zugewiesene Partition, wie oft jedes
Kandidaten-Itemset in dieser vorkommt. Das Itemset wird zusammen mit dem Vorkommen als Schlüssel-Wert-Paar ausgegeben. Die Reducer summieren nun die für jedes
Itemset die Vorkommen auf und geben das Itemset aus, wenn es den minimalen Support überschreitet. Der folgende Pseudocode zur Beschreibung der beiden Phasen des
Partition-Algorithmus ist auf Grundlage der Ausführungen von [RU11] und mit Referenzierung der bereits vorhanden Pseudocodes aus Abschnitt 3.1.1 erstellt worden.
48
Phase 1: Generierung der Kandidaten
Die Map-Funktion liest zunächst die ihr zugewiesene Partition, bzw. die darin enthaltenen Transaktionen, und berechnet die lokalen frequent Itemsets dieser Teilmenge nach
Algorithmus 3.2.
Algorithmus 3.5 Kandidatengenerierung
input: key: null, value: list of transactions
output: key: local frequent itemset, value - null
1: procedure Mapper(key, value)
2:
L ← Gen Large Itemsets(value)
3:
for each itemset in L do
4:
Output(h itemset, null i)
input: key - local frequent itemset, value - null
output: key - itemset, value - null
5: procedure Reducer(key,value)
6:
Output(hkey, nulli)
Die so berechneten Itemsets werden als Schlüssel-Wert-Paare der Form hitemset, nulli
ausgegeben. Die Funktionalität der Reducer ist im ersten Durchlauf extrem einfach gehalten; sie geben auftretende Itemsets unverändert als Schlüssel-Wert-Paar der Form h
Itemset, null i aus. Die Menge von Itemsets, die nach dem Ende der Reduce-Phase vorliegt, ist die Menge der Kandidaten-Itemsets, für die getestet werden muss, ob sie auch
global häufig sind.
Phase 2: Berechnung des globalen Supports
In der Map-Phase des zweiten Durchlaufes wird nun für jede Partition die Anzahl der
Vorkommen für jedes Kandidaten-Itemset berechnet. Die Berechnung erfolgt hierfür nach
Algorithmus 3.4. Für jedes Itemset wird das Vorkommen als Paar hItemset, Anzahl der
Vorkommeni ausgegeben.
In der Reduce-Phase werden nun von den Reducern für jedes Kandidaten-Itemset die
Vorkommen aufsummiert und der Support berechnet. Itemsets, deren Support den minimal notwendigen Support erreichen oder überschreiten sind globale frequent Itemsets
und werden als Schlüssel-Wert-Paar hItemset, counti ausgegeben.
49
Algorithmus 3.6 Berechnung des globalen Supports
input: key - null, value - list of transactions
input: candidates: list of frequent itemset candidates
output: key - itemset, value - itemset count
1: procedure Mapper(key, value)
2:
Gen Count(candidates, value)
3:
for each c ∈ candidates do
4:
Output (hc, c.counti)
5:
6:
7:
8:
9:
10:
input: key - itemset i , value - list of itemset i’s counts
output: key - itemset, value - itemset count
procedure Reducer(key, value)
count ← 0
for each v in value do
count ← count + v
if count ≥ minSupp then
Output(hitemset, counti)
3.1.4
MapReduce Frequent-Pattern-Growth (PFP)
Der MapReduce FP-Growth Algorithmus (PFP) [LWZ+ 08] ist eine parallelisierte Variante des FP-Growth Algorithmus (Abschnitt 2.2.2) für verteile Computercluster unter
Nutzung des MapReduce-Frameworks. Die Grundidee des Algorithmus ist eine Partitionierung der Transaktionsdatenbank in unabhängige Teile, so dass parallel ohne äußere
Abhängigkeiten mit dem FP-Growth Algorithmus häufige Itemsets gesucht werden können.
Der Ablauf des Algorithmus (Abbildung 3.1) gliedert sich in fünf Phasen. In der ersten
Phase wird die Datenbank in Teile zerlegt, die auf P verschiedenen Knoten in einem
verteilten Dateisystem gespeichert werden. Diese Zerlegung und Verteilung der Daten
wird als sharding bezeichnet, jedes einzelne Teil als Shard.
Die Supportwerte aller Items aus der Datenbank werden in der zweiten Phase berechnet. Hierfür wird ein MapReduce-Durchlauf ausgeführt (Algorithmus 3.7), in welchem
jeder Mapper einen Shard als Eingabe erhält und für jedes gefundene Item ai in einer
Transaktion Ti ein Schlüssel-Wert-Paar der Form hItem, 1i ausgibt. Die Reducer fassen nun die Menge der Werte für jedes Vorkommen von ai zusammen und geben ein
Schlüssel-Wert-Paar der Form hnull, ai + counti aus. In diesem Schritt wird außerdem
das Vokabular I der Datenbank bestimmt, welches für große Datensätze oftmals nicht
bekannt ist. Das Ergebnis, die häufigen 1-Itemsets und ihr Support, wird in einer Liste,
der F-Liste (Frequent Itemset List) gespeichert.
In der dritten Phase werden die |I| Items aus der F-Liste gleichmäßig in Q Gruppen
50
Eingabedaten
Map
CPU
CPU
CPU
1 & 2: Sharding
und paralleles
Zählen
Reduce
CPU
CPU
CPU
Liste häufiger 1-Itemsets
CPU
3: Items
gruppieren
Gruppenliste
Neue integrierte Daten
Map
CPU
CPU
CPU
QR
QR
QR
Gruppe 1
Gruppe 2
Gruppe Q
CPU
CPU
CPU
Reduce
4: paralleles
und selbstanpassendes
FP-Growth
Zwischenergebnis
Map
CPU
CPU
CPU
5.
Aggregierung
Reduce
CPU
CPU
CPU
Ergebnis
Abbildung 3.1: Ablaufdiagramm des PFP-Algorithmus (nach [LWZ+ 08])
51
Algorithmus 3.7 Berechnung des Supports aller 1-Itemsets ([LWZ+ 08] angepasst)
input: key - null, value - transaction Ti
output: key - item ai , value - 1
1: procedure Mapper(key, value)
2:
for each item ai in Ti do
3:
Output (hai ,0 10 i)
4:
5:
6:
7:
8:
input: key - an item ai , value - set of values corresponding to key ai
output: key - null, value - item with corresponding occurence count
procedure Reducer(key, value)
count ← 0;
for each 0 10 in value do
count ← count + 1
Output(h null, ai + counti)
verteilt, wobei die Liste der Gruppen als G-Liste (Group List) bezeichnet wird und
jede Gruppe einen eindeutigen Identifikator erhält. Aufgrund der geringen Größe und
Zeitkomplexität kann diese Phase auf einem einzelnen Knoten ausgeführt werden.
Das Kernstück des Algorithmus, das Generieren von häufigen Itemsets, erfolgt in der
vierten Phase mit Algorithmus 3.8. Diese benötigt einen MapReduce-Durchlauf, wobei
in der Map-Phase Transaktionen für gruppen-abhängige Partitionen erzeugt werden und
in der Reduce-Phase der FP-Growth Algorithmus auf den zuvor generierten gruppenabhängigen Transaktions-Partitionen ausgeführt wird.
Beim Start eines Mappers wird zunächst die Gruppen-Liste geladen, welche in Form einer
Hash-Tabelle organisiert wird (Zeile 2-3). Hierdurch wird jedes Item ai auf eine Gruppe
gid abgebildet. Als Eingabe erhalten die Mapper einen Shard der Eingabedaten, wobei
das Eingabe-Paar für die Map-Funktion von der Form hnull, Ti i ist und somit nur eine
einzige Transaktion als Wert enthält. Für jede gelesene Transaktion wird jedes Item aj
durch die Gruppen-ID substituiert, welche sich aus der Anwendung des Items auf die
Hash-Tabelle H ergibt (Zeile 6). Für jede nun vorkommende Gruppen-ID gid wird das
am weitesten rechts stehende Vorkommen bestimmt, hier j und entsprechende Paare der
Form h gid, {Ti [1] . . . Ti [L]} i ausgegeben. Jede Transaktion wird folglich Element für
Element, von rechts beginnend, verkürzt und anhand der Gruppenzugehörigkeit des am
weitesten rechts stehenden Element einer Gruppe zugeordnet. Wichtig hierbei ist jedoch,
dass die ursprüngliche Transaktion bzw. die aus der Verkürzung resultierenden Teilmengen jeder Gruppe höchstens einmal zugeordnet werden. Dieses wird gewährleistet durch
das Entfernen der Paare h HashNum, ai i aus der Hash-Tabelle H (Zeile 8). Wenn alle
Map-Tasks fertiggestellt wurden, werden von der MapReduce-Infrastruktur die von den
Mappern ausgegebenen Paare gesammelt und die Werte nach dem Schlüssel gruppiert.
Die gruppen-abhängigen Transaktion werden an die Reducer übergeben in Form von
Paaren hgid, DB gid i, wobei DB gid eine Menge von gruppen-abhängigen Transaktionen
52
Algorithmus 3.8 Suche von frequent Itemsets mit dem FP-Growth-Algorithmus
([LWZ+ 08] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
input: key - null, value - transaction Ti
output: key - hash-value, value - itemset
procedure Mapper(key, value)
load G-List
generate Hash Table H from G-List
a[] ← Split(Ti )
for j = |Ti | − 1 to 0 do
HashN um ← getHashNum(H, a[j])
if HashNum 6= N ull then
delete all pairs whose hash value is HashNum in H
output (hHashN um, a[0] + a[1] + . . . + a[j]i)
input: key - group-id, value - transaction-list corresponding to group-id
output: key - null, value - itemset with support value
procedure Reducer(key, value)
load G-List
nowGroup ← G − Listgid
LocalF P tree ← clear
LocalF P tree = ConstructFpTree(DBgid , ξ)
for each ai in nowGroup do
create a max heap HP with size K
TopKFPGrowth(LocalFPtree, ai , HP )
for each vi in HP do
call Output(hnull, vi + supp(vi )i)
mit der selben Gruppen-ID gid sind. Diese Menge wird als gruppen-abhängiger Shard,
bzw. Partition, bezeichnet.
In jedem Task erzeugt der Reducer einen FP-Baum (Zeile 14) und generiert mit einen
leicht abgewandelten FP-Growth Algorithmus (siehe Algorithmus 2.5 auf Seite 20 für das
Original) Itemsets (Zeile 15-19). Der Unterschied zum normalen FP-Growth Algorithmus
liegt in der Tatsache, dass für ein Itemset ai nur die K Itemsets mit dem größten Support
geliefert werden welche vom Algorithmus direkt in einem Maximum-Heap der Größe K
verwaltet werden. Diese werden anschließend als Schlüssel-Wert-Paare hnull, v+supp(v)i,
also ohne Schlüssel und als Wert das Itemset mit dem zugehörigen Support, ausgegeben.
Eine Aggregierung der Ergebnisse aus Algorithmus 3.8 erfolgt nun in der fünften Phase
mit Algorithmus 3.9. Für jedes Item ai werden die K Itemsets mit dem größten Support
ausgegeben. Als Eingabe erhalten die Mapper in diesem Schritt Schlüssel-Wert-Paare der
Form hnull, v + supp(v)i, also einem Itemset mit dem zugehörigen Support; aus diesem
wird für jedes aj ∈ v ein Paar der Form haj , v + supp(v)i ausgegeben.
53
Algorithmus 3.9 Aggregierung der Ergebnisse ([LWZ+ 08] angepasst)
input: key - null, value - itemset v with corresponing support count
output: key - item, value - itemset v with corresponding support count
1: procedure Mapper(key, value)
2:
for all item ai in v do
3:
Call Output(hai , v + supp(v)i)
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
input: key - item, value - set of itemsets v with corresponding support counts
output: key - null, value - item ai and top-k supported itemsets and support counts
procedure Reducer(key, value)
define and clear a size K max heap HP ;
if |HP | < K then
insert v + supp(v) into HP
else
if supp(HP [0].v) < supp(v) then
delete top element in HP
insert v + supp(v) into HP
for each v ∈ HP do
Call Output(h null, ai + vi)
Die Reduce-Tasks verarbeiten als Eingabe nun Schlüssel-Wert-Paare haj , set(v+supp(v))i
als Eingabe, wobei set(v + supp(v)) eine Menge von Transaktionen mit dem zugehörigen Support bezeichnet, welche das Item aj enthalten. Diese werden direkt in einen
Maximum-Heap HP, der nach dem Support-Wert sortiert ist, eingefügt, solange dieser
weniger als K Elemente enthält. Wenn der Heap bereits K Elemente enthält können
zwei Fälle auftreten. Wenn der Support des einzufügenden Itemsets v größer als der des
Itemsets mit dem kleinsten Support-Wert des Heaps ist, so wird dieses aus dem Heap
entfernt und v eingefügt. Ansonsten wird v verworfen. Wenn der Reducer alle Werte
gelesen hat werden zum Abschluss alle Itemsets v des Heaps als Schlüssel-Wert-Paare
hnull, ai + vi ausgegeben.
Abschließend ist festzuhalten, dass aus der Beschreibung des Algorithmus nicht hervorgeht, ob die Ergebnisse des Verfahrens exakt oder lediglich approximativ sind. Kernpunkt
hierfür ist die Aufteilung in gruppenabhängige Shards. Zunächst erfolgt eine Aufteilung
der auftretenden Items in Q Gruppen, wobei eine Gruppe mehrere Items enthalten kann.
Anschließend werden die Transaktionen sukzessive gekürzt und nach dem Hashwert des
letzten Transaktions-Items einem Shard, der einer Gruppe zugeordnet ist, zugewiesen.
Jedoch ist es nicht möglich, dass in einer Partition mehr als eine Teilmenge der Ausgangstransaktion auftritt.
Durch das Auftreten von Teilmengen einer Transaktion in verschiedenen gruppenabhängigen Shards ist deshalb anzunehmen, dass der Algorithmus nur approximative Ergeb-
54
nisse liefert. In Kapitel 5 wird deshalb experimentell versucht, in einem Vergleich mit
dem MapReduce-Partition-Algorithmus eine Antwort auf diese Frage zu finden.
3.1.5
Weitere Algorithmen
Der PARMA-Algorithmus (Parallel Randomized Algorithm for Approximate Association
Rule Mining in MapReduce)[RDFU12] ist ein paralleler Algorithmus auf Grundlage des
MapReduce-Paradigmas, der durch ein stochastisches Verfahren eine Annäherung der
Menge von frequent Itemsets berechnet. Hierfür nimmt jeder Rechenknoten eine Stichprobe der Transaktionsdatenbank. Die Knoten berechnen unabhängig voneinander die
frequent Itemsets der Stichprobe, die abschließend zu einem einzigen Ergebnis gefiltert
und aggregiert werden. Die Qualität der Näherung ist über Parameter für Genauigkeit
und Fehlerwahrscheinlichkeit der Stichprobe kontrollierbar. Diese Parameter bestimmen
letzlich auch den Umfang der Stichprobe.
Ein weiteres paralleles Verfahren für Multi-Core-Rechner ist der P-Mine-Algorithmus
(Parallel Itemset mining on large datasets) [BCCG13], welcher als grundlegende Datenstruktur einen HY-Baum[BCC10] verwendet. Bei diesem handelt es sich, analog zum
FP-Baum des FP-Growth-Algorithmus um einen Präfix-Baum, der Transaktionen nach
Häufigkeit sortiert repräsentiert. Ein zentraler Aspekt ist, dass für einen Baumdurchlauf die physischen Speicherorte der Knoten über Zeiger festgehalten werden. Außerdem
existiert ein sogenannter Item-Index, der es zum einen ermöglicht, alle Pfade, die ein
bestimmtes Item enthalten zu ermitteln, zum anderen enthält er alle Items des HYBaumes zusammen mit ihrem Support sowie Zeiger auf den physikalischen Speicherort.
Die Suche nach frequent Itemsets erfolgt, analog zum FP-Growth-Algorithmus, durch
die Generierung bedingter Teilbäume und der rekursiven Abarbeitung.
Der MLFPT-Algorithmus (Multiple local frequent pattern trees)[ZEHL01] ist paralleles Verfahren zur Assoziationsanalyse, welches auf den Grundlagen des FP-GrowthAlgorithmus basiert. Der Algorithmus benötigt zwei Suchläufe auf der Transaktionsdatenbank und besteht aus zwei Phasen. In der ersten Phase werden parallele FP-Bäume
erzeugt. In dieser werden in einem Suchlauf parallel die frequent 1-Itemsets bestimmt,
nach Häufigkeit sortiert und in einer Header-Tabelle gespeichert. In einem zweiten Durchlauf werden parallel die lokalen FP-Bäume aufgebaut. Jeder Prozessor baut aus der zugewiesenen Menge von Transaktionen einen eigenen FP-Baum auf, verwirft Items, die
nicht in der Header-Tabelle vorhanden sind und sortiert die Transaktionen gemäß der
Häufigkeit der Items. Jedoch ist die Header-Tabelle im MLFPT-Algorithmus global und
die Zeiger verbinden so die Items verschiedener FP-Bäume. Für die Suche nach frequent
Itemsets wird der Support aller Items durch die Anzahl der Prozessoren geteilt und so
der durchschnittliche Support A gebildet. Die Items werden über den Support-Wert auf
die Prozessoren verteilt, so dass der kumulierte Support jedes Prozessors etwa A beträgt.
Die Prozessoren bilden nun auf Basis ihrer zugewiesenen Items bedingte FP-Bäume auf
55
Grundlage der lokalen FP-Bäume, die zuvor berechnet worden sind. In diesen wird dann
parallel nach frequent Itemsets gesucht.
3.2
3.2.1
Algorithmen zur Clusteranalyse
Parallel Partially Overlapping Partitioning (pPOP)
Partial Overlapping Partitioning (POP)
Minimaler Abstand
Der POP-Algorithmus[DL01] nutzt die 90-10-Regel aus, um die Speicheranforderungen und die Laufzeit zu reduzieren. Diese ergibt sich aus Beobachtungen des Verlaufs
hierarchischer Clusterings und besagt, dass in den ersten 90% der Iterationen des Algorithmus Cluster vereinigt werden, deren Entfernung weniger als 10% der maximalen
Verschmelzungs-Distanz von allen Cluster-Vereinigungen beträgt (Abbildung 3.2).
Anzahl der Schritte
Abbildung 3.2: Die 90-10-Regel im hierarchischen Clustering
Basierend auf dieser Annahme wird nun der zu clusternde Datensatz in p Zellen unterteilt, die sich in den Randbereichen überlappen. Dieser Überlappungsbereich wird als
δ-Region bezeichnet, wobei δ die Breite dieses Bereiches bezeichnet (Abbildung 3.3), und
gehört immer zu zwei Partitionen.
Hierarchische Clusteringverfahren, wie etwa der SLINK-Algorithmus [Sib73], haben im
besten Fall eine Komplexität von O(n2 ). Weiterhin lässt sich aus der 90-10-Regel schließen, dass zu Beginn des Algorithmus Paare mit sehr kleinen Distanzen vereinigt werden,
so dass diese lokal begrenzt sind. Im Umkehrschluss bedeutet dies, dass durch die, mindestens quadratische, Komplexität die Verschmelzungsoperationen auf den unteren Ebenen
des Dendrograms sehr teuer sind. Eine Aufteilung der zu clusternden Daten in Partitionen ermöglicht somit eine signifikante Reduktion der Laufzeit, da die zugehörigen
56
Distanzmatrizen einen geringeren Umfang besitzen.
δ
δ
δ
δ
δ
δ
δ
δ
δ
δ
δ
δ
Abbildung 3.3: POP-Algorithmus - Überlappende Partitionen
Die Grundidee des POP-Algorithmus (Algorithmus 3.10) ist die Unterteilung des Datensatzes in p Partitionen nach einem zu bestimmenden Attribut; jede der Zellen besitzt
mit den angrenzenden Nachbarn einen Überlappungsbereich, der als δ-Region bezeichnet
wird (Abbildung 3.3).
Während jeder Iteration wird nun in jeder Zelle nach dem Cluster-Paar mit dem geringsten Abstand gesucht, aus diesen wiederum das Paar mit insgesamt geringsten Distanz
ausgewählt wird; wenn diese Distanz kleiner als δ ist, so werden die beiden Cluster
verbunden und die Prioritätswarteschlangen der betroffenen Zelle aktualisiert. Wenn einer der beiden Clusterschwerpunkte oder der Schwerpunkt des vereinigten Clusters in
der δ-Region liegt, so wird zusätzlich die Warteschlange der anderen betroffenen Zelle
aktualisiert. Diese erste Phase wird solange fortgesetzt, bis die Distanz zwischen ClusterPaaren δ überschreitet.
In der zweiten Phase wird abschließend ein klassischer Algorithmus für hierarchisches
Clustering auf den verbleibenden Clustern angewendet, um das Dendrogram zu vervollständigen.
57
Algorithmus 3.10 Der POP-Algorithmus ([DL01] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
input: d - dataset to be clustered
input: p - number of partitions, δ - width of cell overlap
output: resulting dendrogram
procedure POP
choose attribute to partition the data
divide the dataset d into p partitions with margin δ
init priority queues P for each cell
while merging distance < δ do
init priority queue clusterPairs
for each cell c do
find closest cluster pair p(C1 , C2 ) ∈ c
clusterPairs.insert(p)
closestPair = clusterPairs.deque()
Cnew = merge(closestPair )
update corresponding priority queue P containing p
if C1 ∈ p ∨ C2 ∈ p ∨ Cnew in δ-region then
update priority queue P of affected cell
if any of the pair merged cluster in δ-region, update P of affected cell
remove duplicates from the δ-regions
cluster the remaining k’ clusters
return dendrogram
Parallelisierung des POP-Algorithmus
Eine mögliche Parallelisierung ergibt sich direkt, wenn die Struktur des POP-Verfahrens
(Abschnitt 3.2.1 Algorithmus 3.10) betrachtet wird. In diesem wird der Datenraum in
disjunkte Partitionen unterteilt und in jedem Schritt werden sequentiell in diesen die
nächsten Cluster-Paare gesucht. Zusätzlich ist bei einer Verschmelzung die Distanzmatrix der betroffenen Partition zu aktualisieren, wobei Distanzen zwischen Clusterpaaren
bestimmt werden müssen. Der pPOP -Algorithmus [DPS07] setzt an diesen Punkten an
und parallelisiert die Berechnung der nächsten Paare, sowohl bei der Suche nach Verschmelzungskandidaten als auch bei der Aktualisierung der Distanzmatrizen; die charakteristischen zwei Phasen des POP-Algorithmus, der Aufbau lokaler Dendrogramme
und das anschließende Verschmelzen zu einem globalen Dendrogram, bleiben im pPOPAlgorithmus erhalten.
In der ersten Phase werden die Daten in überlappende Zellen partitioniert, wobei der
Überlappungsbereich die Breite δ besitzt. Nun wird parallel in jeder Partition das Clusterpaar mit der kleinsten Inter-Cluster-Distanz gesucht. Das Paar mit der insgesamt
kleinsten Distanz dist < δ wird von einem festgelegten Prozessor gewählt und anschließend vereinigt. Bei der Vereinigung sind nur Cluster und folglich Distanzen derselben
58
Zelle betroffen; es muss nur die Distanzmatrix einer Zelle aktualisiert werden. Wenn
sich das nächste Paar jedoch innerhalb einer δ-Überlappungszone, dem Randbereich der
Zelle, befindet, müssen zusätzlich zur Zelle, die die vereinigten Cluster enthält, auch die
Distanzen zwischen Clustern der an den δ-Breich angrenzenden Zellen aktualisiert werden. Die erste Phase wird beendet, sobald die Distanz zwischen den nächsten Paaren δ
überschreitet.
In der zweiten Phase werden die verbliebenen Cluster durch Verwendung eines sequentiellen Algorithmus für hierarchisches agglomeratives Clustering vereinigt.
Ablauf
Die hier vorgestellte Variante des pPOP-Algorithmus (Algorithmus 3.11) verwendet für
die Implementierung der Distanzmatrix Prioritätswarteschlangen. Diese werden zu Beginn parallel initialisiert. Die zu berechnenden Zellen werden den verfügbaren Prozessoren so zugeordnet, dass jeder Prozessor die Prioritätswarteschlangen für durchschnittlich
c/p Zellen berechnet. Jeder Prozessor erhält so eine eigene Menge kleinster Inter-ClusterDistanzen für die Zellen, die von ihm berechnet werden. Aus dieser wählt er die kleinste
aus und liefert die Zellennummer, in der dieses Paar zu finden ist. Folglich ergibt sich
hieraus, dass es nach der parallel Bestimmung der kleinsten Distanz (Zeile 6-7) p kleinste Distanzen von Clusterpaaren und zugehörige Zellennummern gibt. Ein Prozessor,
der quasi als Master fungiert, wählt hieraus nun das insgesamt kleinste Paar aus und
verschmilzt dieses zu einem neuen Cluster (Zeile 8-10). Für den neuen Cluster ist nun
eine neue Prioritätswarteschlange anzulegen, in welcher die Distanzen zu allen anderen
Clustern der Zelle gespeichert sind. Auch diese Berechnung kann parallelisiert werden,
indem jeder Prozessor einen Teil der Distanzen berechnet.
Nach der Vereinigung des Clusterpaares sind im Anschluss auch die Warteschlangen der
anderen Cluster in der Zelle zu aktualisieren, was ebenfalls parallel erfolgen kann, in
(Zeile 11dem jeder Prozessor einen Teil der Warteschlangen aktualisiert, also Cclus(C)
p
12). Für den Fall, dass einer der vereinigten Cluster oder der neu gebildete Cluster im
δ-Randbereich der Partition lagen, so sind in den hiervon betroffenen Zellen ebenfalls
die Warteschlagen zu aktualisieren (Zeile 13-17). Sobald die kleinste gefundene Distanz
zwischen Clusterpaaren mindestens die Größe δ erreicht hat endet der parallele Teil des
Algorithmus; die bis hierhin gebildeten lokalen Dendrogramme werden durch die Anwendung eines sequentiellen hierarchischen Clustering-Algorithmus durch die Vereinigung
der verbliebenen Cluster zu einem globalen Dendrogram verschmolzen.
Die Laufzeitkomplexität des pPOP-Algorithmus setzt sich aus vier Teilen zusammen. Für
N2
),
die Initialisierung der Prioritätswarteschlagen ergibt sich eine Komplexität von O( c∗p
wobei p die Anzahl der Prozessoren und N die Anzahl der Datenpunkte ist. Die anschließende WHILE-Schleife wird maximal N − 1-mal durchlaufen, hat also eine Laufzeit von
O(N ). Die Komplexität für die Suche nach den beiden Punkten mit der geringsten
59
Algorithmus 3.11 pPOP-Algorithmus ([DPS07] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
Ccell(p) - number of cells assigned to processor p
Cclus(C) - chunk of clusters in cell C
input: ds: dataset, c: number of cells, δ: width of cell overlap
output: dendrogram of a clustered dataset ds
procedure P-POP(ds, c, δ)
partition ds in c cells
for each processor P in parallel do
create priority queues for Ccell(P )
while overall min dist < δ do
for each processor P in parallel do
determine closest pair (CL1 , CL2 ) for each cell C in Ccell(p) in parallel
on designated processor P̄
find overall closest pair (CL1 , CL2 ) and cell C that holds it
merge CL1 and CL2
for each processor P in parallel do
update the priority queues of Cclus(C)
on designated processor P̄
determine if neighbor cells are affected by the merged clusters
for each processor P in parallel do
for each affected cell C do
update priority queues of Cclus(C)
merge resuming clusters with sequential hierarchical clustering algorithm
return complete dendrogram
Distanz ist O( np ). n bezeichnet hier die Anzahl von Clustern, die noch in den Prioritätswarteschlangen vorhanden sind. Für den vierten Schritt, das Aktualisieren der von der
n∗log( n )
Vereinigung betroffenen Prioritätswarteschlagen, beträgt die Komplexität O( p c ),
da eine einzelne Prioritätswarteschlange nc Cluster enthält. Insgesamt ergibt sich somit
eine Laufzeit von O(
3.2.2
N 2 ∗log( N
)
c
).
p
Parallel Disjoint-Set DBSCAN (PDSDBSCAN)
Um eine Erweiterung des ursprünglichen DBSCAN-Algorithmus [EKSX96] handelt es
sich bei dem PDSDBSCAN-Algorithmus[PPA+ 12], der eine Parallelisierung durch die
Verwendung der Union-Find-Datenstruktur erreicht. Zunächst wird die sequentielle Variante, der Disjoint-Set DBSCAN -Algorithmus vorstellt. Mit diesem als Grundlage wird
sodann eine parallele Implementierung vorgestellt.
60
Der Disjoint-Set-DBSCAN-Algorithmus
Im ersten Schritt des Algorithmus 3.12 wird die Punktmenge X derart aufgeteilt, dass
jeder Punkt in einer eigenen Menge enthalten ist. Zusätzlich werden für die jeweiligen
Mengen die Zeiger für die Vorgänger auf die eigene Menge gesetzt (Zeile 2-3). Anschließend wird die gesamte Punktemenge durchlaufen; für jeden Punkt wird zunächst dessen
Nachbarschaft in Abhängigkeit vom Punkt x und der Nachbarschaftsgröße eps bestimmt
und in der Menge N gespeichert. Wenn die Anzahl der Nachbarn größer als die minimale
Nachbarschaftsgröße ist, wird x als Kernpunkt markiert. Dann erfolgt eine zusätzliche
Betrachtung der Punkte x0 , die in der Nachbarschaft von x enthalten sind. Für jeden
dieser Punkte wird geprüft, ob x0 ein Kernpunkt ist oder ob er noch keinem Cluster
zugeordnet ist. Wenn der Punkt ein Kernpunkt ist werden die Mengen, die x und x0
enthalten, vereinigt. Falls x0 noch keinem Cluster zugeordnet ist, wird wie im vorherigen
Schritt die Mengen, die x und x0 enthalten vereinigt. Analog zum DBSCAN-Algorithmus
ergibt sich durch Nutzung einer räumlichen Indexstruktur eine Laufzeitkomplexität von
O(n ∗ log(n)) wenn eine DisjointSet-Implementierung verwendet wird, die eine Klassenvereinigung in O(1) ermöglicht.
Algorithmus 3.12 DSDBSCAN-Algorithmus ([PPA+ 12] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
input: X - set of points
input: eps - neighborhood size
input: minpts - number of points required for classification as core point
procedure DSDBSCAN(X, eps, minpts)
for each point x ∈ X do
p(x) ← x
for each point x ∈ X do
N ← GetNeighbors(x, eps)
if |N | ≥ minpts then
mark x as core point
for each point x0 ∈ N do
if x0 is a core point then
Union(x, x0 )
else if x0 is not yet member of any cluster then
mark x0 as member of a cluster
Union(x, x0 )
Ein Beispiel für die einzelnen Phasen des DSDBSCAN-Algorithmus ist in Abbildung
3.4 dargestellt. In diesem sind drei Punkte innerhalb der Punktnachbarschaft für eine
Klassifikation als Kernpunkt notwendig. Im ersten Teil der Abbildung ist die räumliche
Verteilung der Punkte des Beispiels abgebildet; die Größe der ε-Nachbarschaft ist durch
gestrichelte Kreise dargestellt. Das DisjointSet nach seiner Initialisierung mit den Beispielpunkten ist im zweiten Teil zu sehen. Eine Nachbarschaftssuche, ausgehend von den
61
Punkten 4, 5, 6 und 7 ist in dritten Teil zu sehen; das Ergebnis dieser spiegelt sich in den
gebildeten Bäumen wieder. Das finale Clustering schließlich ist im vierten Teil der Abbildung abgebildet. Cluster sind hier durch blaue Bäume mit blauen Knoten, Rauschen
durch Bäume mit roten Knoten hervorgehoben.
1
5
9
2
7
8
4
1
6
3
2
1
2
3
4
5
6
7
8
9
3
1
3
5
6
8
4
2
7
9
4
5
1
2
3
6
7
8
4
9
Abbildung 3.4: Beispielhafte Abbildung der Phasen des DSDBSCAN-Algorithmus (nach
[PPA+ 12])
Der Algorithmus (PDSDBSCAN)
Kernpunkt der parallelen Version des DSDBSCAN-Algorithmus ist die Union-FindDatenstruktur (Abschnitt 2.4). Hierbei wird zuerst der Datenraum partitioniert, sodass die Partitionen unabhängig von einander auf den lokalen Datenpunkten die lokalen
Cluster, respektive Bäume, berechnen. Sobald die Bäume für die jeweiligen Partitionen
62
berechnet sind, werden diese verschmolzen, um aus den lokalen Clustern die globalen zu
erzeugen.
Zugrunde liegt zunächst die Annahme, dass der Algorithmus auf einem Rechner mit
gemeinsamen Speicher und p Threads ausgeführt wird. Der Datenraum der Punktmenge
soll also in p Partitionen, {X1 , X2 , . . . , Xp }, wobei jedem Thread eine Partition zugewiesen wird. Für jeden Thread t bezeichne Yt eine Menge von Punkt-Paaren (x, x0 ), so dass
x ∈ Xt und x0 ∈
/ Xt .
Algorithmus 3.13 PDSDBSCAN-Algorithmus ([PPA+ 12] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
input: X - set of points
input: eps - neighborhood size
input: minpts - number of points required for classification as core point
procedure PDSDBSCAN-S(X, eps, minpts)
for t = 1 to p in parallel do
for each point x ∈ Xt do
p(x) ← x
Yt ← ∅
for each point x ∈ Xt do
N ← GetNeighbors(x, eps)
if |N | ≥ minpts then
mark x as a core point
for each point x0 ∈ N do
if x0 ∈ Xt then
if x0 is a core point then
Union(x, x0 )
else if x0 ∈
/ any cluster then
mark x0 as member of a cluster
Union(x, x0 )
else
Yt ← Yt ∪ {(x, x0 )}
for t = 1 to p in parallel do
for each (x, x0 ) ∈ Yt do
if x0 is a core point then
UnionUsingLock(x, x0 )
else if x0 ∈
/ any cluster then
mark x0 as member of a cluster
UnionUsingLock(x, x0 )
Der Algorithmus kann in zwei wesentliche Teile, einen lokalen (Zeile 2-18) und einen
globalen (Zeile 19-25), aufgeteilt werden. Die Berechnungen innerhalb des lokalen Teils
sind identisch zu denen aus Algorithmus 3.12, jedoch werden nicht Datenpunkte aus der
ganzen Punktmenge X sondern nur aus der Partition Xt des Threads verwendet. Zu
63
beachten ist hierbei jedoch, dass in der Abfrage der Nachbarschaft eines Punktes x ∈ Xt
auch Punkte x0 ∈
/ Xt enthalten sein können. In diesem Fall wird das Verschmelzen
der Bäume, die x und x0 enthalten, verzögert und in der Merge-Phase gelöst. Hierfür
wird das Paar (x, x0 ) zu der Menge Yt hinzugefügt. Die einzigen Fälle, in denen die
Threads auf nicht-lokale Punktedaten, also Punkte außerhalb der eigenen Partition,
zugreifen, sind die Lesezugriffe, wenn die Nachbarschaft eines Punktes abgefragt wird.
Es ist hierbei festzuhalten, dass in diesem ersten lokalen Schritt keine Kommunikation
zwischen den Threads notwendig ist, ja sogar nicht erwünscht ist, damit die lokale Phase
vollständig unabhängig voneinander ausgeführt werden kann, es also keine Verluste durch
Kommunikationsverzögerungen gibt. Folglich werden hier lokale Union-Find-Strukturen
verwendet.
Algorithmus 3.14 Vereinigung von Union-Find-Strukturen mit Sperre ([PPA+ 12] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
input: points x and x0 to be merged
procedure UnionUsingLock(x, x0 )
while p(x) 6= p(x0 ) do
if p(x) < p(x0 ) then
if x = p(x) then
Lock(p(x))
if x = p(x) then
p(x) ← p(x0 )
Unlock(p(x))
x = p(x)
else
if x0 = p(x0 ) then
Lock(p(x0 ))
if x0 = p(x0 ) then
p(x0 ) ← p(x)
Unlock(p(x0 ))
x0 = p(x0 )
Im zweiten Schritt, der Vereinigung der Bäume (Algorithmus 3.14), werden die in Yt
enthaltenen Punktpaare durchlaufen. Für jedes Paar (x, x0 ) ∈ Yt wird überprüft, ob x0
ein Kernpunkt ist oder noch nicht zu einem Cluster hinzugefügt wurde. Wenn dies der
Fall ist, werden die Bäume, die x und x0 enthalten, mit der Union-Operation vereinigt.
Implizit sorgt dieser Schritt dafür, dass x und x0 schließlich dem gleichen Cluster angehören. Während des Vereinigungsvorgangs kann es passieren, dass die Vorgänger-Zeiger
von Punkten geändert werden, die einem anderen Berechnungsthread zugeordnet sind.
Es ist also notwendig, Vorkehrungen für den konkurrierenden Zugriff auf die Punkte zu
treffen.
Der Grundgedanke bei den Sperren ist, dass jeder Punkt eine eigene Sperre besitzt,
64
was aufgrund des gemeinsamen Speichers notwendig ist. Nehmen wir an, dass wir zwei
Wurzeln r1 und r2 haben. Nehmen wir weiter an, dass wir den Vorgängerzeiger von r1
auf r2 umsetzen wollen. Um nun eine Union-Operation zum Verschmelzen auszuführen
ist das erfolgreiche Anfordern einer Sperre auf r1 erforderlich. Nun kann es passieren,
dass der Thread, der die Sperre anfordert, warten muss, da ein anderer Thread bereits
die Sperre des Punkts gesetzt hat. Nun gibt es zwei Möglichkeiten:
1. Wenn r1 weiterhin eine Wurzel ist wird der Vorgängerzeiger von r1 auf r2 gesetzt
2. Falls ein anderer Thread den Vorgängerzeiger geändert hebt der Thread die Sperre
auf und setzt die Ausführung von der momentanen Position fort
), da die BerechnunDie Laufzeit für den ersten Teil des Algorithmus liegt bei O( n∗log(n)
p
gen identisch mit dem DSDBSCAN-Algorithmus sind und völlig unabhängig voneinander
ausgeführt werden können. Für den zweiten Teil, die Vereinigung der lokalen Clusterings,
hängt die Laufzeit maßgeblich von der Größe der Menge Yt ab, wobei für jedes Element
aus Yt eine Vereinigung mit Algorithmus 3.14 ausgeführt wird. Da die diese solange ausgeführt wird, bis beide Bäume auf die gleiche Wurzel zeigen, beträgt die Laufzeit maximal
O(log(n)). Patwary et al. verweisen in [PPA+ 12] darauf, dass in der Praxis nur in sehr
wenigen Fällen Sperren andere ausführende Threads blockieren, da sich die Punktpaare
in Yt oft im gleichen Baum befinden, so dass die Laufzeitkomplexität für die Vereinigung
). Folglich kann die Laufzeit des gesamten PDSDBSCAN-Algorithmus grob
O( Yt ∗log(n)
p
) abgeschätzt werden.
mit O( (n+Yt )∗(log(n))
p
3.2.3
MapReduce-DBSCAN
Massiv angestiegene Datenmengen haben zu verschiedenen Vorschlägen wie etwa PDBSCAN [XJK99] oder PDSDBSCAN [PPA+ 12] geführt, um den bekannten DBSCANAlgorithmus (Abschnitt 2.3.2) zu parallelisieren. Diese haben jedoch als Nachteil, dass
die gesamte Parallelisierung des Verfahrens durch die Entwickler in Form von Nachrichtenaustausch oder gemeinsam genutztem Speicher behandelt werden muss. Eine Alternative zu klassischen Ansätzen ist das MapReduce-Framework (Abschnitt 2.5), welches die
Details der Parallelisierung vor den Entwicklern verborgen hält. [HTL+ 14] präsentieren
eine Variante des DBSCAN-Algorithmus, die das MapReduce-Paradigma zu Parallelisierungszwecken nutzt.
Der Ablauf des Algorithmus ist in Abbildung 3.5 abgebildet.
Im ersten Schritt, der Datenpartitionierung, wird ein zunächst ein Profil der zu clusternden Daten erstellt. Es wird zuerst die Anzahl der Punkte in den Rohdaten und
die räumliche Verteilung ermittelt. Basierend hierauf wird ein rechteck-basiertes Partitionierungsprofil des Datenraumes erstellt. Dieses ist die Grundlage für den nächsten
65
1. Datenpartitionierung
3. Globale Vereinigung
Profil der
Aufteilungsbereiche
1.1: Zellen-basierte
statistische Analyse
1.2: Partitionierung
Mengen der
Mergekandidaten
Statistisches Profil
3.1: Erzeuge MergeAbbildung
Punktdaten
globales
Abbildungsprofil
Partionierungsprofil
2:
lokales Clustering
Rohdaten
Metadaten
3.2:
Umbenennung der
Daten
Daten mit
globalen
Cluster IDs
Abbildung 3.5: Ablauf des MR-DBSCAN-Algorithmus [HTL+ 14]
Schritt, das lokale Clustering. In diesem wird der eigentliche Clustering-Algorithmus
für jeden Teilraum, der aus dem Partitionierungsprofil hervorgeht, ausgeführt. Hierfür wird eine Abwandlung des DBSCAN-Algorithmus verwendet, der an einigen Stellen
an die Einschränkungen einer MapReduce-Implementierung, die fehlende Kommunikationsmöglichkeit zwischen parallelen Berechnung, angepasst wurde. Im dritten und letzten
Schritt, der globalen Vereinigung, werden dann die Teilergebnisse der lokalen Clusterings
mittels eines Profils der Aufteilungsbereiche zu einem globalen Clustering aggregiert.
Vorbereitung der Partitionierung
Eine wichtige Aufgabe fällt der Vorverarbeitung zu, da in dieser das Partitionierungsschema für den Datensatz festgelegt wird. Dieses hat aufgrund der ungleichmäßigen
Verteilung von Objekten in Datensätzen zentralen Einfluss auf die Lastverteilung, da
nur durch eine gleichmäßige Partitionierung, die an die Datenverteilung angepasst ist,
ein optimaler Durchsatz erreicht werden kann.
Um ein angepasstes Partitionierungsschema zu erstellen besteht eine mögliche Lösung
darin, einen räumlichen Index, wie etwa einen R-Baum oder kD-Baum, aufzubauen.
Jedoch gestaltet sich dieses wegen des großen Umfangs der Datensätze schwierig, da
hierarchische Indexstrukturen bei umfangreichen Datenmengen selber einen erheblichen
Platzbedarf besitzen; somit kann also bereits die räumliche Indexierung nicht mehr effizient handhabbare Dimensionen erreichen.
[HTL+ 14] schlagen als Grundlage eine binäre Partitionierung des Raumes (BSP: binary space partitioning), um eine effiziente Aufteilung des Datenraumes in nicht überlappende Rechtecke zu erreichen und daraus Partitionen zu erzeugen. Auf Grundlage
vordefinierter Regeln wird durch BSP rekursiv eine Aufteilung des Raumes in zwei kleinere Teilräume vorgenommen. Eine weit verbreitete Regel ist die möglichst gleichförmige
66
Aufteilung von Punkten (ESP: even split partitioning). Eine andere Regel zielt auf eine Minimierung der Anzahl von Punkten innerhalb des Randbereiches von Partitionen
ab (RBP: reduced-boundary partitioning). Dies ist deshalb erstrebenswert, da die Partitionen überlappende Ränder mit einer Breite von ε, dem Radius der Punktnachbarschaft im zwei-dimensionalen Fall, haben, die auf mehrere Partitionen kopiert werden
und in der globalen Clustervereinigung beachtet werden müssen. Diese Regel kann also
die Kommunikationskosten reduzieren, wenn eine geringe Anzahl von Punkten in mehreren Partitionen vorkommt und somit weniger Kandidaten während der Berechnung der
Clustervereinigung betrachtet werden müssen.
Jedoch haben die Ansätze Nachteile. So fußt der ESP-Ansatz auf der Annahme, dass
die Laufzeit des DBSCAN-Algorithmus proportional zur Anzahl der Punkte ist; faktisch
hängt diese jedoch maßgeblich von der Datenverteilung ab. Der RBP-Ansatz hingegen
bewertet die Kommunikationskosten des lokalen Clusterings und der globalen Clustervereinigung als am teuersten, was jedoch nur für kleine Datenmengen und eine nicht
parallel durchgeführte Clustervereinigung stimmt.
Um diesen Nachteilen Herr zu werden schlagen [HTL+ 14] eine kosten-abhängige Partitionierung (CBP: cost-based partitioning) vor (Algorithmus 3.15). Vor Beginn der Partitionierung wird SU , das minimal einschließende Rechteck, das alle Punkte des Datensatzes
umschließt, in ein Raster mit n×n Zellen mit n = 1/2 unterteilt. Für jede der Zellen des
Rasters wird die Dichte, also die Anzahl der in der Zelle enthaltenen Punkte, berechnet.
Algorithmus 3.15 Kostenbasierte räumliche Aufteilung ([HTL+ 14] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
input: SU - mbr, that contains all points
input: nPointsOfCells - array containing number of points for each cell
input: maxCost - maximum cost allowed for a cell
S
output: set of non-overlapping rectangles satisfying SU = Si
procedure CostBasedPartitioning(SU , nPointsOfCells, maxCost)
taskQueue.add(SU )
partitions = ∅
while taskQueue is not empty do
S = taskQueue.remove()
if EstimateCost(S, nPointOfCells) > maxCost then
S1 , S2 = CostBasedBinarySplit(S, nPointsOfCells)
taskQueue.add(S1 , S2 )
else
partitions.add(S)
return partitions
Der Algorithmus (3.15) startet, indem einer abzuarbeitenden Warteschlange taskQueue
das minimal-einschließende Rechteck des gesamten Datensatzes übergeben wird. Die
Warteschlange wird nun so lange abgearbeitet, bis keine Rechtecke mehr in ihr enthalten
sind. Zu Beginn der Schleife (Zeile 5) wird ein Element der Warteschlange entnommen
67
und mit Hilfe der Funktion EstimateCost (Algorithmus 3.17) die Kosten der Zelle
berechnet. Wenn diese über einen festen Wert maxCost liegen, wird das Rechteck S
mit der Funktion CostBasedBinarySplit (Algorithmus 3.16) in zwei Teil-Rechtecke
S1 , S2 aufgeteilt und der Aufgabenwarteschlange hinzugefügt (Zeile 6-8). Sind die Kosten
kleiner, so wird das Rechteck einer Liste von Partitionen hinzufügt (Zeile 9-10). Diese
Liste wird nach Abbruch der Schleife zurückgegeben und stellt die Partitionierung des
Datensatzes in Form von nicht überlappenden Rechtecken dar, welche den Datenraum
vollständig abdeckt.
Die Zerteilung eines Rechtecks durch die Funktion CostBasedBinarySplit (Algorithmus 3.16) erfolgt durch vertikale und horizontale Teilungslinien, die entlang der Zellengrenzen verlaufen. In den Zeilen 5 bis 12 durchläuft der Algorithmus alle Teilungslinien
und wählt die beiden aus der Teilung resultierenden Rechtecke aus, die die geringste
Kostendifferenz haben.
Algorithmus 3.16 Kostenbasierte binäre Raumaufteilung ([HTL+ 14] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
input: S - rectangle to be split
input: nPointsOfCells - array cotaining number of points for each cell
output: S1 , S2 - split result of input rectangle S
function CostBasedBinarySplit(S, nP ointsOf Cells)
splitLineCandidates = vertical and horizontal lines that split S in two subrectangles
minCostDiff = ∞
(S1 , S2 ) = (null, null)
for each splitLine in splitLineCandidates do
(S10 , S20 ) = sub-rectangles corresponding to splitLine
S10 .cost = EstimateCost(S10 , nPointsOfCells)
S20 .cost = EstimateCost(S20 , nPointsOfCells)
costDif f = |S10 .cost − S20 .cost|
if costDiff < minCostDiff then
minCostDiff = costDiff
(S1 , S2 ) = (S10 , S20 )
return (S1 , S2 )
Ein Beispiel für eine solche Unterteilung ist in Abbildung 3.6 abgebildet. Auf der linken
Seite ist die Punkt-, auf der rechten die Kostenverteilung über die Zellen dargestellt. Die
Kandidaten für vertikale und horizontale Teilungslinien, von denen es in diesem Beispiel
insgesamt acht gibt, sind gepunktet gezeichnet und stellen gleichzeitig die Ränder der
zugrunde liegenden Zellen im Inneren des umschließenden Rechtecks dar. Die gewählte
Trennungslinie, die das Rechteck in zwei Teilrechtecke unterteilt, so dass die Kostendifferenz zwischen den Partitionen minimal ist, ist fett gezeichnet und verläuft entlang
der Trennlinie zwischen der ersten und zweiten Spalte. Für einen maxCost-Wert von
104 wäre die Unterteilung in Abbildung 3.6 bereits eine gültige finale Partitionierung;
68
für eine maxCost-Wert von 100 wäre eine weitere Unterteilung der rechten Partition
durchzuführen.
10.8
99
10.8
27.6
10.8
10.8
10.8
10.8
10.8
Abbildung 3.6: Beispiel für kostenbasierte Partitionierung [HTL+ 14]
Die Kostenabschätzung erfolgt mit der Funktion EstimateCost (Algorithmus 3.17). Sie
erhält als Eingabe ein Rechteck sowie ein Array, welches Informationen zu der Punktdichte für jede Zelle enthält. Die Gesamtkosten für ein Rechteck setzen sich zusammen
aus den Kosten für jede Zelle, die das Rechteck überdeckt.
Algorithmus 3.17 Kostenabschätzung für ein Rechteck ([HTL+ 14] angepasst)
input: S - rectangle, that represents a partition
input: nP ointsOf Cells - array containing number of points in each cell
output: cost - cost of given partition scheme
function EstimateCost(S, nP ointsOf Cells)
cost = 0
for each cell in S do
nP oints = number of points in cell, gathered with help of numP ointsOf Cell
costsOf Cell = cost computation using nP oints by Eq. 3.1
cost = cost + costsOf Cell
10:
return cost
1:
2:
3:
4:
5:
6:
7:
8:
9:
Die formale Berechnung der Kosten für ein Zelle erfolgt mit den Formeln in 3.1. Die
erste Formel W (S) besagt, dass sich die Kosten eines Rechtecks S zusammensetzen
aus den Kosten für alle Zellen ci , die von S überdeckt werden. Die Kosten der einzelnen Zellen wiederum setzen sich zusammen aus der Anzahl der in der Zelle enthaltenen
Punkte N ci und einer Abschätzung der Speicherzugriffskosten DA(N ci ), die durch Nachbarschaftsanfragen im DBSCAN-Algorithmus zu erwarten sind. Wenn die Zugriffskosten
der Einfachheit halber auf 1 gesetzt werden, so sind die Kosten einer Zelle die Anzahl der
69
darin enthaltenen Punkte. Eine detailliertere Beschreibung der Kosten ist in [HTL+ 14]
ausgeführt.
W (S) =
X
W (ci ) mit S =
[
ci
(3.1)
W (ci ) = N ci ∗ DA(N ci )
Die Laufzeit der Partitionierung setzt sich aus drei wesentlichen Teilen zusammen, der
aufrufenden Funktion, der Funktion zur binären Rechteck-Teilung und der Kostenschätzung. Die Laufzeit für die Kostenabschätzung, vorausgesetzt, dass einfach nur die Summe der in einem Raster enthaltenen Punkte gebildet wird, beträgt im schlechtesten Fall
O(n)2 , wenn ohne Optimierung das ganze Raster durchsucht werden muss. Jedoch ist
es möglich, aus den Koordinaten des Rechtecks die überdeckten Zellen des Rasters zu
berechnen, so dass in diesem Fall lediglich eine Laufzeit von O(n) benötigt wird. Für
binäre Rechteckteilung müssen jeweils für jede horizontale und vertikale Teilungslinie
die Kosten für die resultierenden Rechtecke geschätzt werden, so dass eine Laufzeit
von O(4 · n2 ) für die optimierte Fassung der Kostenschätzung benötigt wird. Wenn für
die binäre Rechteckaufteilung eine Laufzeit von O(n2 ) angenommen wird, so lässt sich
die rekursive Funktion zur Erzeugung der kostenbasierten Partitionierung mit Hilfe des
Master-Theorems auf eine Laufzeit von Θ(n2 ) abschätzen.
Partitionierung der Daten
Ziel der Partitionierung ist eine Aufteilung des Datensatzes, damit die Clustering-Aufgabe auf mehrere Berechnungen verteilt wird, die unabhängig voneinander durchgeführt
werden können, also keinen Zugriff auf Punkte außerhalb der Partition benötigen. Ein
einfaches Beispiel für eine Partitionierung ist in Abbildung 3.7 dargestellt. Das minimal
einschließende Rechteck des Datenraumes SU wurde hier bereits in zwei Rechtecke S1
und S2 unterteilt. Am Beispiel der Nachbarschaft des Punktes px wird deutlich, dass,
wenn eine Partition P1 nur die Punkte aus S1 enthält, nicht feststellbar ist, ob px ein
Kernpunkt ist, da die Nachbarschaft von px eventuell Punkte aus S2 enthält. Folglich
ist es notwendig, Punkte aus benachbarten Rechtecken zu einer Partition hinzuzufügen,
um für jeden Punkt einer Partition die Klasse bestimmen zu können.
Zu diesem Zweck wird ein Rechteck Si um ε nach außen erweitert, welches als äußeres
Rechteck von Si (OR - outer rectangle) bezeichnet wird und einen äußeren Rand (OM
- outer margin) besitzt. Analog wird ein weiteres Rechteck um ε nach innen verkleinert,
welches als inneres Rechteck von Si (IR - inner rectangle) mit einem inneren Rand
(IM - inner margin) bezeichnet wird. Punkte, die innerhalb von IR liegen, zeichnen
sich dadurch aus, dass sie für die Vereinigung der lokalen Cluster nicht mehr betrachtet
werden müssen, da diese außerhalb der Überlappungsbereiche angrenzender Partitionen
liegen.
70
OR(S1)
OR(S2)
S1
S2
IR(S2)
IR(S1)
px
ε
ε
Abbildung 3.7: Beispiel einer Partitionierung mit ε-Randbereichen [HTL+ 14]
Insgesamt bildet die Menge aller Rechtecke eine minimal vollständige Partitionierungsmenge (MCPS: minimum complete partition set). Für eine Punktmenge DB und ein
diese einschließendes Rechteck SU gilt, dass P = {P1 , P2 , . . . , Pm } ein MCPS von DB
ist (Abbildung 3.8), wenn für die Rechtecke S1 , S2 , . . . , Sm der Durchschnitt Si ∩ Sj =
∅ für i 6= j und ∪m
i=1 Si = SU ist, wobei Pi = OR(Si ) ist.
Für die Implementierung der Partitionierung ist zuerst ein MapReduce-Job auszuführen, der die Statistiken der Datenverteilung erfasst. In der Map-Funktion wird das minimal einschließende Rechteck des Datensatzes SU in kleine quadratische Zellen mit der
Kantenlänge 2 aufgeteilt. In der Shuffle-Phase des MapReduce-Frameworks werden die
Punkte nun nach den Partitionen, in denen sie enthalten sind, sortiert. Die ReduceFunktion generiert hieraus ein Profil, welches die Anzahl von Punkten in jeder Zelle
enthält.
Abschließend wird ein MCPS berechnet, welches mit den Algorithmen aus den geschätzten Berechnungskosten für die Datenverteilung abgeleitet wird. Das Ergebnis wird als
Datei (file-partition-index) ausgegeben und enthält die unteren linken und oberen rechten Ecken der Rechtecke des MCPS und bildet so einen Partitionierungsindex, welcher
für die Zuweisungen der Punkte zu Partitionen in Form von Identifikatoren im nächsten
Schritt benötigt wird.
71
S4
S0
S5
S3
S1
S2
Abbildung 3.8: Beispiel einer minimal vollständigen Partitionierungsmenge [HTL+ 14]
Lokales Clustering
Für das Clustering der Partitionen wird eine leicht abgewandelte Version des DBSCANAlgorithmus (Seite 27) verwendet, in der eine Modifikationen in der Suche und Erweiterung von Clustern vorgenommen wird (Algorithmus 3.18). Es wird für jeden Punkt
zusätzlich zum Cluster-Identifikator ein Flag eingeführt, welches explizit die Klassifikation in Kern-, Rand- und Rauschpunkte enthält und eine spätere Unterscheidung der
Punkte ermöglicht.
Die erste Änderung findet sich in Zeile 4, in welcher der Rauschpunkt keinen Clusteridentifikator zugewiesen bekommt, der ihn implizit als Rauschpunkt identifiziert, sondern
explizit ein Flag gesetzt wird, dass den Punkt explizit als Rauschpunkt ausweist. Eine
weitere kleine Änderung findet sich in Zeile 8, in der für einen Punkt mit einer ausreichend großen Nachbarschaft das Flag ’CORE’ gesetzt wird, um ihn als Kernpunkt
zu markieren. Die größte Änderung findet sich jedoch in der Clusterweiterung in den
Zeilen 10 bis 22. So wird zum einen bei Punkten, die über die Nachbarschaftabfrage
erreicht wurden, explizit überprüft, ob diese noch nicht besucht worden sind (’UNCLASSIFIED’) oder bereits als Rauschpunkte (’NOISE’) markiert wurden. Zum anderen wird
für erreichte unklassifizierte Punkte, die nun ihrerseits eine Abfrage starten, ein Flag
’CORE’ gesetzt (Zeile 16), wenn die Größe der Nachbarschaft den Schwellenwert überschreitet, und ein Flag ’BORDER’ gesetzt, wenn dies nicht der Fall ist (Zeile 19). Die
Laufzeitkomplexität des modifizierten DBSCAN-Algorithmus liegt, wie beim normalen
DBSCAN-Algorithmus bei O(n · log(n)).
72
Algorithmus 3.18 Modified DBSCAN - Clustererweiterung ([HTL+ 14] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
input: points - dataset to be clustered
input: point - starting point for cluster expansion
input: clusterId - identificator of the current cluster
input: ε - size of the neighborhood
input: minPts - points required for a core point
output: boolean - indicates, whether cluster was found
procedure ExpandCluster(points, point, clusterId, ε, minP ts)
seeds = points.regionQuery(point, eps)
if seeds.size < minPts then
point.setFlag(NOISE )
return FALSE
else
points.changeClusterIds(seeds, clusterId )
points.setFlag(CORE )
seeds.delete(point)
while seeds 6= empty do
currentP oint = seeds.first()
if currentP oint.clusterId == UNCLASSIFIED then
currentP oint.setClusterId(clusterId )
result = points.regionQuery(currentPoint, ε)
if result.size ≥ minPts then
currentP oint.flag == CORE
seeds.append(resultPoint)
else
currentP oint.flag == BORDER
else if currentP oint.flag == NOISE then
currentP oint.setClusterId(clusterId )
currentP oint.setFlag(BORDER)
seeds.delete(currentP oint)
return TRUE;
Das eigentliche Clustering findet in einem eigenen MapReduce-Auftrag statt. Zuerst
wird mittels der Map-Funktion (Algorithmus 3.19) der Datenbestand auf verschiedene
Partitionen verteilt.
Dieses geschieht, indem die eingegebenen Schlüssel-Wert-Paare, die keinen Schlüssel und
als Wert den Punkt enthalten, mit dem Partitions-Identifikator versehen und ausgegeben
werden.
Der Partitionierungsindex partitionIndex wird zu Beginn der Ausführung des Map-Tasks
aus den Ausgabedaten des ersten Schrittes, der Datenpartitionierung, erzeugt. Für jeden
73
Algorithmus 3.19 Lokales Clustering([HTL+ 14] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
input: - hkey : null, value : pointi
output: key - partition id, value - point object
procedure Map(key, value)
partitionIndex = readFromFile(partition-index-file)
partitionIDs = getPartitionIDsForPoint(partitionIndex, point)
for each partitionId in partitionIDs do
Output (h partitionId, point i)
input: key - partitionId, value - all points in partition P(partitionId)
input: context - minP ts: points required for core points, ε: neighborhood size
procedure Reduce(key, value)
rT ree = createRTree(value)
S = rectangle of partitionId
for each unvisited p inside of S do
cluster points with algorithm 3.18 using rT ree
for each p in P do
OutputToFile(hp, (p.clusterId, p.f lag)i, file-results-partitionID)
for each p in P do
if p.f lag == CORE and p inside IM (S) then
OutputToFile (hp, (p.clusterId, p.f lag)i, file-AP-partitionID)
else if p.f lag != N OISE and p inside OM (S) then
OutputToFile (hp, (p.clusterId, p.f lag)i, file-BP-partitionID)
Punkt point kann über den Index partitionIndex mit der Funktion getPartititionIDsForPoint aus den Koordinaten der Partitionierungsrechteckee bestimmt werden, in welchem
Rechteck und somit in welcher Partitionen dieser enthalten ist. Daraus wird für jede enthaltene Partition ein Schlüssel-Wert-Paar hpartitionId, pointi erzeugt und ausgegeben.
Nach Abschluss der Map-Phase werden die Schlüssel-Wert-Paare dann vom MapReduceFramework gemischt und nach den Partitions-Identifikatoren gruppiert. Punkte mit dem
gleichen Identifikator werden an die Reduce-Funktion übergeben, in welcher das lokale
Clustering durchgeführt wird.
Das Clustering erfolgt in der Reduce-Phase (Algorithmus 3.19) mit dem modifizierten
DBSCAN-Algorithmus (3.18). Für eine effiziente Unterstützung der Nachbarschaftsanfragen verwendet der Algorithmus einen R-Baum als räumlichen Index, der alle Punkte
der Partition enthält.
Die Ergebnisse des Clustering einer Partition werden in drei verschiedene Ausgabedateien geschrieben. Das Gesamtergebnisse, alle Punkte innerhalb von S, werden eine Datei ’file-results-partitionID’ geschrieben. Kernpunkte, die innerhalb des inneren Randes
(S − IR(S)) liegen, werden in eine Datei ’file-AP-partitionID’ geschrieben, Kern- und
74
Randpunkte im äußeren Rand (OR(S) − S) in eine Datei ’file-BP-partitionID’. Lediglich diese Bereiche, die von anderen angrenzenden Partitionen überdeckt werden oder
andere angrenzende Partitionen überdecken, sind zu betrachten, wenn die Vereinigung
der lokalen Cluster durchgeführt wird. Außerdem werden die Partitions-Identifikatoren
in die Cluster-Identifikatoren eingearbeitet, so dass alle lokalen Cluster global eindeutige
Identifikatoren besitzen.
Globale Clustervereinigung
Um das Clustering zu vervollständigen ist eine Vereingung der lokalen Clusterings in ein
globales Clustering notwendig. Grob lässt sich dieser Schritt aufteilen in die Erzeugung
einer Vereinigungs-Abbildung und das Umbenennen der Daten. Für die VereinigungsAbbildung ist es zunächst notwendig, alle Partitionspaare zu finden, die eine gemeinsame
Schnittmenge besitzen. Ziel ist es, für zwei Partitionen P1 und P2 alle Paare lokaler Cluster C1 ⊂ P1 und C2 ⊂ P2 zu finden, so dass C1 und C2 dem selben globalen Cluster
angehören. Anschließend werden die globalen Cluster berechnet und eine Abbildung bestimmt, die die lokalen Cluster auf globale Cluster abbildet. Hierbei gilt es festzuhalten,
dass ein Cluster C1 mit einem Cluster C2 vereinigt werden kann, wenn es einen Punkt
p ∈ C1 ∩ C2 gibt, wobei p ein Kernpunkt in C1 oder C2 ist. p wird in diesem Kontext
auch als Vereinigungs-Punkt von C1 oder C2 bezeichnet. Auf dieser Grundlage ist es
einfach, alle Vereinigungspunkte der Partitionen P1 und P2 zu bestimmen, zumal zum
Abschluss des lokalen Clusterings bereits alle Punkte innerhalb der Randbereiche gesondert als Kandidaten für die Vereinigung von Clustern ausgegeben werden. Es ist lediglich
zu überprüfen, dass einer der Punkte ein Kernpunkt in P1 oder P2 ist. Diese Kandidaten
lassen sich relativ einfach über den Join (AP1 ∩ BP2 ) ∪ (AP2 ∩ BP1 ) bestimmen.
Weiterhin gilt, dass die Vereinigung lokaler Cluster zwischen zwei überlappenden Partitionen unabhängig von allen anderen Partitionen ist und somit parallel ausgeführt
werden kann. Sobald alle Paare zu vereinigender lokaler Cluster bekannt sind, ist es
einfach, mit einem graph-basierten Algorithmus die globalen Cluster zu berechnen.
Diese Abbildungen lassen sich durch einen MapReduce-Job berechnen der aus der MapFunktion in Algorithmus 3.20 und der Reduce-Funktion in Algorithmus 3.20 besteht. In
der Map-Funktion (3.20) für zwei Partitionen A und B werden zunächst die jeweiligen
Listen mit Kernpunkten im inneren Rand und Kern- und Randpunkten des äußeren
Randes geladen. Mit diesen wird über den bereits zuvor beschrieben Join in Zeile 6 die
Menge der zu vereinigenden Cluster-Paare berechnet. Die ausgegebenen Schlüssel-WertPaare enthalten keinen Schlüssel, damit diese zum Aufbau der globalen VereinigungsAbbildung nur an einen Reducer übergeben werden.
Eine beispielhafte Situation ist in Abbildung 3.9 dargestellt, anhand derer der Aufbau
eines Vereinigungs-Mappings verdeutlicht werden soll. Nachdem für jede Partition der
innere Rand (S − OR(S)) und der äußere Rand (OR(S) − S) eingelesen wurde, gilt es
75
Algorithmus 3.20 Erzeugung einer Vereinigungs-Abbildung ([HTL+ 14] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
input: key - partition id, value - partition id
output: key - null, value - pair of cluster ids
procedure Map(key, value)
AP1 = read file-AP-key
BP1 = read file-BP-key
AP2 = read file-AP-value
BP2 = read file-BP-value
for each p ∈ (AP1 ∩ BP2 ) ∪ (AP2 ∩ BP1 ) do
A.clusterId = p.clusterId in partition A
B.clusterId = p.clusterId in partition B
Output (hnull, (A.clusterId, B.clusterId)i)
input: key - null, value - pair of cluster ids
output: key - local cluster id, value - global cluster id
procedure Reduce(key, value)
initialize empty graph G
for each item in values do
if vertex item.A.clusterId is not in G then
G.addVertex(item.A.clusterId)
if vertex item.B.clusterId is not in G then
G.addVertex(item.B.clusterId)
if edge (item.A.clusterId, item.B.clusterId) is not in G then
G.addEdge((item.A.clusterId, item.B.clusterId))
for each connected component C in G do
globalClusterId = next unique global cluster id
for each vertex localClusterId in C do
Output hkey=localClusterId, value=globalClusterId i
for each localClusterId not in G do
globalClusterId = next unique global cluster id
Output (hlocalClusterId, globalClusterId i)
die Schnittmenge (AP1 ∩ BP2 ) ∪ (AP2 ∩ BP1 ) zu berechnen. Hierbei ist (AP1 ∩ BP2 ) =
{p1 , p2 , p3 , p4 } und (AP2 ∩ BP1 ) = {p5 , p6 , p7 }, so dass insgesamt (AP1 ∩ BP2 ) ∪ (AP2 ∩
BP1 ) = {p1 , p2 , p3 , p4 } ∪ {p5 , p6 , p7 } ist. Für jeden Punkt diese Menge wird nun ein
Schlüssel-Wert-Paar ausgegeben, welches als Wert das Paar der Clusteridentifikatoren
enthält, dem der Punkt in der jeweiligen Partition angehört. Für die Punkte p1 . . . p7
ergeben sich so die folgenden Paare:
• h null, (S1 1, S2 1) i
• h null, (S1 1, S2 1) i
76
• h null, (S1 2, S2 1) i
• h null, (S1 2, S2 1) i
• h null, (S2 1, S1 2) i
• h null, (S2 1, S1 2) i
• h null, (S2 1, S1 1) i
OR(S1)
OR(S2)
S1_1
S1
S2
S1_2
IR(S2)
IR(S1)
p1
ε
ε
S2_1
minPts=3
p7
p2
ε
p6
p3
p5
p4
Abbildung 3.9: Beispiel eines Vereinigungs-Mappings
Der Reducer (3.20) baut aus diesen Abbildungen einen ungerichteten Graph auf. Jeder
Cluster-Identifikator in den Schlüssel-Wert-Paaren wird durch einen Knoten im Graph
repräsentiert (Zeile 4-7), jedes übergebene Paar von Cluster-Identifikatoren wird durch
eine Kante im Graph dargestellt (Zeile 8-9). Aus dem Graph lassen sich schließlich die
globalen Cluster einfach ableiten, da jeder zusammenhängende Teilgraph einem globalen
Cluster entspricht. Für jeden zusammenhängenden Teilgraphen werden Schlüssel-WertPaare ausgegeben, die die Identifikatoren der lokalen Cluster auf einen globalen Identifikator abbilden (Zeile 10-13). Außerdem werden auch für jeden alleinstehenden Cluster
ein Schlüssel-Wert-Paar ausgegeben, welches den lokalen Cluster-Identifikator auf einen
globalen Identifikator abbildet (Zeile 14-16).
Aus den zuvor generierten Schlüssel-Wert-Paaren wird nun nach Algorithmus 3.20 ein
ungerichteter Graph aufgebaut, der in Abbildung 3.10 dargestellt ist. Folglich gehören
alle Punkte aus beiden Partitionen zum selben Cluster, da die jeweiligen lokalen Cluster
eine zusammenhängende Komponente im Graphen bilden.
77
S1_1
S2_1
S1_2
Abbildung 3.10: Graph-Repräsentation des Vereinigungs-Mappings
Algorithmus 3.21 Umbenennung der Daten ([HTL+ 14] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
input: key - point object, value - cluster id and classfication flag
input: mergeMapping - stored in a hash map
output: key - point object, value - cluster id and classification flag
procedure Map(key, value)
globalClusterId = mergeMapping[p.clusterId]
p.clusterId = globalClusterId
Output (hp, (p.clusterId, p.flag)i)
input: key - point p, value: set of (clusterId, flag)-pairs
output: key - point p, value: (clusterId, flag)-pair
procedure Reduce(key, value)
clusterId = null
f lag = N OISE
for each pair in value do
if clusterId is null then
clusterId = pair.p.clusterId
if pair.p.flag == BORDER then
f lag = BORDER
else if pair.p.flag == CORE then
f lag == CORE
break
Output (hp, (clusterId, flag)i)
Um das finale Ergebnis zu erhalten müssen die lokalen Cluster-Identifikatoren aller Punkte substituiert werden und der Typ dieser bestimmt werden. Auch dies wird durch einen
MapReduce-Job gelöst. Die Eingabe für den Job besteht aus den Dateien ’file-resultspartitionId’, welche die vollständigen Ergebnisse der jeweiligen lokalen Clusterings enthält. In der Map-Phase (Algorithmus 3.21) werden die lokalen Cluster-Identifikatoren mit
Hilfe der Vereinigungs-Abbildung ersetzt (Zeile 2-3) und der Punkt zusammen mit dem
78
finalen Cluster-Identifikator und dem Typ, der im lokalen Clustering bestimmt wurde,
ausgegeben.
Für Punkte in den Randbereichen gilt hierbei, dass diese an mehreren lokalen Clusterings
beteiligt gewesen sein und somit verschiedene Typen besitzen können; das Ergebnis der
Map-Phase kann also Paare mit gleichem Schlüssel jedoch verschiedenen Werten liefern.
Durch die Shuffle-Phase werden die Paare nach dem Schlüssel gruppiert, so dass sich
Duplikate leicht identifizieren lassen. Für diese ist im Detail der Typ zu bestimmen. Die
Typisierung folgt einer einfachen Regel: der Typ eines Punktes wird durch die signifikanteste gefundene Klassifikation definiert. Diese ist absteigend geordnet nach ’CORE’,
’BORDER’ und ’NOISE’. Punkte ohne Duplikate werden direkt ausgegeben. Die so produzierte Menge von Schlüssel-Wert-Paaren repräsentiert nun das finale Ergebnis und
somit das globale Clustering.
3.2.4
Shared-memory SLINK (SHRINK)
Der SHRINK-Algorithmus [HPA+ 12] ist ein paralleler Algorithmus zur Berechnung eines
hierarchischen Clusterings, der für Systeme mit gemeinsamen Speicher entworfen wurde.
Die Grundidee des Algorithmus besteht in einer Aufteilung des eigentlichen Problems,
der Erzeugung eines globalen hierarchischen Clusterings, in überlappende Teilprobleme
annähernd gleicher Größe, die schließlich zu einer Gesamtlösung vereinigt werden. Hierfür
wird eine Partitionierung des Datenraumes vorgenommen und auf den Partitionen mit
einer modifizierten Version des SLINK-Algorithmus [Sib73] das jeweilige hierarchische
Clustering in Form eines Dendrogramms bestimmt. Die Gesamtlösung wird abschließend
aus den überlappenden Teillösungen kombiniert, indem die Dendrogramme der lokalen
Clusterings zu einem globalen Dendrogramm vereinigt werden.
Algorithmus 3.22 Ablauf des SHRINK-Algorithmus ([HPA+ 12] überarbeitet)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
input: D - dataset to be clustered, p - number of partitions
output: complete dendrogram of dataset D
procedure Shrink(D, p)
partition D into p subsets S = {S1 , S2 , . . . , Sp }
let M be an empty set
for all (Si , Sj ) ∈ S|i 6= j in parallel do
dendrogram d = Mod-Slink(Si ∪ Sj )
add dendrogram d to set M
repeat
merge dendrograms in M pairwise as shown in 3.11 with the function Merge
until M contains only one dendrogram in parallel
return remaining dendrogram in M
Zunächst werden in Algorithmus 3.22 die Teilmengen S des Datensatzes gebildet. Diese
79
entstehen durch die Aufteilung des Datenbestandes in p Teile. Jedes Paar (Si , Sj ) ∈ S
für i 6= j wird einem separaten Thread zugewiesen, der für die Punktmenge ein hierarchisches Clustering berechnet (Zeile 5-7). Für das Clustering wird eine modifizierte
Variante des SLINK-Algorithmus verwendet. Diese unterscheidet sich von dem Original
in drei Punkten. Als erstes werden die für Distanzberechnungen verwendeten Punkte
mitverfolgt und gespeichert. Dann werden bei gleichen Distanzen die Identifikatoren der
Datenpunkte verglichen und der kleinere Identifikator gewählt. Als Drittes und Letztes
werden die Dendrogramme repräsentiert durch eine Folge von Vereinigungen von nächsten Clusterpaaren. In Zeile 8-10 werden die Dendrogramme der Partitionen-Paare d
aus der Liste M in ein einziges Dendrogramm zusammengeführt, das die Originallösung
darstellt. Die Kombination erfolgt, indem über die Clustervereinigungen in aufsteigender
Distanz iteriert wird und die Vereinigungen übernommen werden, die Datenpunkte in
verschiedenen Clustern vereinigen; Vereinigungen hingegen, die Punkte zusammenführen, welche bereits demselben Cluster angehören, werden verworfen. Die Reihenfolge, in
der die Teilprobleme zusammengeführt werden, ist hierbei nicht von Bedeutung.
d1
d2
d3
d1
d4
d5
d3
d6
d5
d1
d1
Abbildung 3.11: Beispiel für den Ablauf der Vereinigung sechs lokaler Dendrogramme
zu einem globalen Dendrogramm (nach [HPA+ 12])
Die Berechnung der Dendrogramme erfolgt über den Modified-SLINK-Algorithmus (Algorithmus 3.23). Zu Beginn werden drei Arrays erzeugt, welche das Endergebnis des Clusterings sowie Zwischenergebnisse speichern. Die Arrays result und M enthalten jeweils
3-Tupel der Form (P unkt, P unkt, Distanz), wobei ein 3-Tupel eine Clustervereinigung
repräsentiert; das Array result wird für jeden Punkt mit Einträgen (−, −, ∞), das Array
M mit Einträgen (−, −, −) initialisiert. Das Array clusterId enthält für einen Punkt die
Information, zu welchem welchem Cluster er gehört; repräsentiert wird ein Cluster durch
den Punkt mit der kleinsten ID, so dass Punkte eines Clusters schlussendlich auf diesen
zeigen. Initialisiert wird dieses so, dass jeder Punkt sich selber als Vereinigungspartner
besitzt (Zeile 4-6). Der Algorithmus durchläuft nun die ganze Punktmenge (Zeile 7-20),
wobei die Position des Punktes in der Punktliste gleichzeitig seine globale ID darstellt.
80
Für jeden Punkt i ∈ D wird die Distanz zu jedem anderen Punkt j in der Liste M in der
Form (D[i], D[j], d(D[i], D[j])) gespeichert (Zeile 8-9). d(D[j], D[i]) repräsentiert hierbei
implizit über die Distanz die Höhe, in der die Vereinigung im Dendrogramm passiert.
Es erfolgen jetzt zwei Durchläufe über alle Punkte j ∈ D\{i} (Zeilen 10-17 und 18-20).
Zuerst wird überprüft, ob die Distanz der bisherigen Vereinigung der Punkte i und j
in result[j] kleiner ist als die zuvor berechnete in M [j]. Wenn jedoch die Distanz der
Vereinigung von i und j größer ist als die berechnete Vereinigung des Vorgängers von
j, so wird dessen Zwischenergebnis M [j] auf das bisherige Ergebnis result[j] gesetzt
(Zeile 12-13). Anschließend wird das Ergebnis für j in result[j] auf M [j] gesetzt und
der Vorgänger von j auf i gesetzt (Zeile 14-15). Wenn weiterhin die Vereinigung von i
und j in M [j] eine geringere Distanz als der Vorgänger von j besitzt, so wird das Ergebnis von M [clusterId [j]] auf M [j] gesetzt. In einer weiteren Schleife (Zeile 18-20) wird
nun getestet, ob die Distanz der Clustervereinigung von Punkt j größer ist als die des
Vorgänger von j. Ist dieses der Fall, so wird der Vorgänger von j auf i gesetzt. Zuletzt
werden die Vereinigungen der Cluster, die in result gespeichert sind, nach aufsteigender
Distanz sortiert zurückgegeben (Zeile 21-22).
Algorithmus 3.23 Modified SLINK ([HPA+ 12] überarbeitet
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
input: D - dataset of points
output: result: list of cluster merges sorted by increasing height
procedure Mod-Slink(D)
initialize array clusterId [i] with i for each i ∈ D
initialize array result with (−, −, ∞) for each i ∈ D
initialize array M with (−, −, −) for each i ∈ D
for i ← 1 to |D| − 1 do
for j ← 0 to i − 1 do
M [j] ← (D[j], D[i], d(D[j], D[i])), at distance d(D[j], D[i])
for j ← 0 to i − 1 do
if result[j] has a higher distance/id than M [j] then
if result[j] has a lower distance/id than M [clusterId [j]] then
M [clusterId [j]] ← result[j]
result[j] ← M [j]
clusterId[j] ← i
else if M [j] has a lower distance/id than M [clusterId[j]] then
M [clusterId[j]] ← M [j]
for j ← 0 to i − 1 do
if result[j] has a higher distance/id than result[clusterId[j]] then
clusterId[j] ← i
Sort the merges of result by increasing distance
return result
Der Algorithmus zur Vereinigung zweier Dendrogramme (Algorithmus 3.24) nutzt im
Kern eine Union-Find Struktur, um die Cluster-Zugehörigkeit für jeden Datenpunkt zu
81
verwalten. Zu Beginn werden zwei Datenstrukturen initialisiert, eine Liste M3 , in welcher die Folge der Clustervereinigungen gespeichert wird und eine Union-Find-Struktur,
die die Clusterzuordnungen der Datenpunkte festhält. Für die Union-Find-Struktur wird
zudem angenommen, dass eine Vereinigung nur erfolgreich ist, wenn die beiden zu vereinigenden Punkte in verschiedenen Clustern liegen. Das Verfahren beginnt, indem aus den
Vereinigungslisten der Cluster M1 und M2 die Vereinigungen mit der kleinsten Distanz
zusammen mit den vereinigten Punkten ausgewählt werden. Zuvor ist zu überprüfen, ob
Duplikate vorhanden sind, Vereinigungen also, die in beiden Vereinigungslisten enthalten sind. Wenn Duplikate vorhanden sind, ist eines der beiden Vorkommen zu entfernen.
Wenn die Distanz h1 ∈ m1 nun kleiner als h2 ∈ m2 ist, so wird m1 zu M3 hinzugefügt,
vorausgesetzt, dass die Cluster u1 , v1 ∈ m1 in der Union-Find-Struktur verschiedenen
Clustern zugeordnet sind; m1 wird anschließend auf die Vereinigung mit der nächst
kleinsten Distanz gesetzt. Das Vorgehen ist analog für den Fall, dass h1 keine geringere
Höhe als h2 aufweist. Das Vorgehen wird solange fortgesetzt, wie M1 und M2 verbleibende Vereinigungen haben. Sobald nur noch eine Liste Vereinigungen enthält werden diese
zu M3 hinzugefügt, sofern diese in verschiedenen Clustern enthalten sind. Abschließend
wird als Ergebnis die neue Liste der Vereinigungen M3 zurückgegeben.
Algorithmus 3.24 Vereinigung zweier Dendrogramme ([HPA+ 12] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
input: M1 , M2 - dendrograms
output: M3 - merged dendrogram
procedure Merge((M1 , M2 ))
Initialize M3 to be empty
Initialize Union-Find data structure with each unique point from M1 , M2
Let m1 = (u1 , v1 , h1 ) be the data points and distance of the lowest merge in M1
Let m2 = (u2 , v2 , h2 ) be the data points and distance of the lowest merge in M2
while M1 and M2 have merges remaining do
if h1 < h2 then
if union(u1 , v1 ) succeeds then
Add m1 to M3
Let m1 = (u1 , v1 , h1 ) be the next lowest merge in M1
else
if union(u2 , v2 ) succeeds then
Add m2 to M3
Let m2 = (u2 , v2 , h2 ) be the next lowest merge in M2
Add remaining merges (u, v, h) of M1 or M2 to M3 as long as union(u, v) succeeds
return M3
Die Laufzeit des SHRINK-Algorithmus setzt sich aus drei wesentlichen Teilen zusammen,
der Partitionierung, dem Clustering und der Vereinigung. Hendrix et al. schätzen die
Laufzeit für die Partitonierung mit O( √np ) ab, wobei p die Anzahl der Prozessoren, bzw.
Threads ist. Die Laufzeit für das Clustering mit dem ModSLINK-Algorithmus beträgt
O(n20 ) mit n0 als Anzahl der Punkte pro Partition. In diesem wird eine äußere Schleife
82
(Zeilen 7-20) n0 -mal durchlaufen, die beiden inneren Schleifen (Zeilen 8-9 sowie 10-20)
werden ebenso maximal n0 -mal und benötigen jeweils nur konstante Zeit. Das Sortieren
der Vereinigungen nach der Distanz wird in O(n0 ∗ log(n0 )) ausgeführt. Insgesamt kann
2
das Clustering somit in O( np ) ausgeführt werden. Die Vereinigung zweier Dendrogramme
wird mit O(n ∗ α(n)) abgeschätzt, wobei α die inverse Ackermann-Funktion ist. Da die
Vereinigung der Dendrogramme in den Zeile 7-9 jeweils paarweise in Form eines binären
Baumes erfolgt, benötigt diese eine Laufzeit von O(log(p)), so dass sich für die gesamte
Dendrogrammvereinigung eine Laufzeit von O(n ∗ log(p)) ergibt. Insgesamt beträgt die
2
Laufzeitkomplexität somit nach [HPA+ 12] O( np + √np + n ∗ α(n) ∗ log(p). Diese lässt
n
Prozessoren verwendet
sich sogar auf O(n ∗ α(n) ∗ log(p)) reduzieren, indem p ∼ log(n)
werden.
Beispiel
Um die Funktionsweise des SHRINK-Algorithmus zu verdeutlichen ist in Abbildung 3.12
ein Beispiel für ein solches abgebildet. Erzeugt werden soll das hierarchische Clustering
eines Datensatzes, bestehend aus vier Punkten, der in drei Partitionen aufgeteilt ist.
Im oberen Teil ist zunächst der Datenraum mit den Datenpunkten und Partitionierungen
auf der linken Seite abgebildet, auf der rechten Seite die Resultate der drei möglichen
Clusterings von paarweise disjunkten Partitionen in Form einer Liste. Wie bereits zuvor
in der Beschreibung des Algorithmus erwähnt, ist die Reihenfolge, in der Teilergebnisse
der paarweisen Clusterings zusammengefügt werden, irrelevant. In diesem Beispiel wird
zunächst das Ergebnis A mit B vereinigt, anschließend {A, B} mit C. Für die erste
Vereinigung werden die Vereinigungs-Listen aus A (M1 ) und B (M2 ) verwendet und
zusätzlich eine Union-Find-Struktur für die Datenpunkte, die in M1 und M2 enthalten
sind, initialisiert. Algorithmus 3.24 folgend, werden m1 und m2 mit den Vereinigungen
initialisiert, die in den jeweiligen Teilergebnissen die kleinste Distanz besitzen. Da m1 <
m2 wird die Vereinigung (2, 3, 0.5) gewählt und zu M3 hinzugefügt. Folglich wird auch
die Union-Find-Struktur aktualisiert, um zu repräsentieren, dass die Punkte 2 und 3 dem
gleichen Cluster angehören. m1 wird nun auf die Vereinigung mit der nächst größeren
Distanz gesetzt. Auch im zweitem Schritt ist m1 < m2 und es wird m1 zu M3 hinzugefügt
und die Union-Find-Struktur aktualisiert. Da M1 nun keine Vereinigungen mehr enthält
werden nun die verbleibenden aus M2 zu M3 hinzugefügt, sofern sich die Punkte, die
vereinigt werden, in verschiedenen Clustern befinden. In der zweiten Vereinigung werden
nun die Vereinigungs-Listen aus dem vorherigen Schritt {A, B} sowie des Ergebnisses C
aus dem Clustering der Partitionen D2 ∪ D3 zusammengeführt, um das finale Clustering
zu erhalten. Auch hier werden m1 und m2 mit den Vereinigungen mit kleinster Distanz
aus M1 und M2 initialisiert. Hier ist sofort zu sehen, dass die erste Vereinigung in M1
und M2 identisch ist; eine der beiden ist zu eliminieren. In diesem Beispiel wird die
Vereinigung aus M2 entfernt. Die Vereinigungen werden nun nach dem bekannten Muster
zu M3 hinzugefügt. Nachdem die Vereinigung (1, {2, 3}, 1.1) zu M3 hinzugefügt wurde,
83
D1
A = MOD-SLINK(D1 ᴜ D2): [(2, 3, 0.5), (1, {2,3}, 1.1)]
1
D2
2
3
B = MOD-SLINK(D1 ᴜ D 3): [(1, 4, 2.1)]
D3
C = MOD-SLINK(D2 ᴜ D 3): [(2, 3, 0.5), ({2,3}, 4, 1)]
4
MERGE(A,B)
M1 = [(2, 3, 0.5), (1, {2, 3}, 1.1)]
M2 = [(1, 4, 2.1)]
M3 = [(2, 3, 0.5)]
1
2
3
4
1
2
2
4
M3 = [(2, 3, 0.5), (1, {2, 3}, 1.1)]
1
2
3
4
1
1
1
4
M3 = [(2, 3, 0.5), (1, {2, 3}, 1.1), (1, 4, 2.1)]
1
2
3
4
1
1
1
1
m1(u1, v1, h1) = (2, 3, 0.5)
m2(u2, v2, h2) = (1, 4, 2.1)
m1(u1, v1, h1) = (1, {2,3}, 1.1)
m2(u2, v2, h2) = (1, 4, 2.1)
da A leer, Rest von B zu M
hinzufügen
MERGE({A,B}, C) M1 = [(2, 3, 0.5), (1, {2, 3}, 1.1), (1, 4, 2.1)]
M2 = [(2, 3, 0.5), ({2, 3},4, 1)]
M3 = [(2, 3, 0.5)]
1
2
3
4
1
2
2
4
M3 = [(2, 3, 0.5), ({2, 3}, 4, 1)]
1
2
3
4
1
2
2
2
M3 = [(2, 3, 0.5), ({2, 3}, 4, 1), (1, {2, 3}, 1.1)]
1
2
3
4
1
1
1
1
m1(u1, v1, h1) = (2, 3, 0.5)
m2(u2, v2, h2) = ({2, 3}, 4, 1)
m1(u1, v1, h1) = (1, {2, 3}, 1.1)
m2(u2, v2, h2) = ({2, 3}, 4, 1)
m1(u1, v1, h1) = (1, {2, 3}, 1.1)
m2(u2, v2, h2) = (1, 4, 2.1)
1
2
3
4
Abbildung 3.12: SHRINK-Algorithmus - Beispiel
lässt aus der mitgeführten Union-Find-Struktur ablesen, dass alle Datenpunkte dem
selben Cluster angehören. Die verbleibende Vereinigung, (1, 4, 2.1) wird abschließend
84
nicht mehr zu M3 hinzugefügt, da die Punkte 1 und 4 bereits dem gleichen Cluster
angehören.
M3 enthält nun das finale Clustering des Datensatzes, dargestellt in Form einer Abfolge
von Vereinigungen, welches am unteren Rand von Abbildung 3.12 als Dendrogramm
dargestellt ist.
3.2.5
MapReduce k-Means Clustering
Auch der sequentielle k-Means-Algorithmus (Abschnitt 2.3.2) ist nicht in der Lage, große
Datenmengen effizient zu clustern, da der Hauptspeicher zu klein ist und alleine das
Einlesen der Daten zu lange dauern würde. [ZMH09] stellen eine Implementierung des
k-Means-Algorithmus für das MapReduce-Framework vor, der mit diesen Anforderungen
umgehen kann. Das zugrundeliegende Prinzip des Algorithmus ist, dass die Zuweisung
von Punkten zum nächsten Clusterschwerpunkt unabhängig von der Berechnung eines
neuen Schwerpunktes durchgeführt wird. Zusätzlich kann die Zuweisung zum nächsten
Clusterschwerpunkt parallel ausgeführt werden, da unter den Datenpunkten keine Abhängigkeiten vorhanden sind; jeder Punkt kann individuell dem nächsten Cluster zugewiesen werden. Eine Parallelisierung der Berechnung der neuen Schwerpunkte kann
ebenso erreicht werden, indem jeder Clusterschwerpunkt unabhängig von den anderen
berechnet wird. Erreichen lässt sich dies, indem die Sortier-Fähigkeit des MapReduceFrameworks ausgenutzt und Schlüssel-Wert-Paare derart sortiert werden, dass diese nach
ihrem Schlüssel gruppiert werden. Das iterative Vorgehen des k-Means-Algorithmus wird
in MapReduce durch mehrere Durchläufe des Prozesses erreicht, eine wiederholte Ausführung der Map-, Combine- und Reduce-Phase, wobei als Eingabe immer die SchlüsselWert-Paare und Cluster-Schwerpunkte der vorherigen Iteration genutzt werden.
Ablauf
Der Ablauf der MapReduce-Implementierung ist in Abbildung 3.13 dargestellt. Wenn
ein Task für einen Datensatz und eine Menge von vorgegebenen Centroiden gestartet
wird erfolgt zuerst eine Unterteilung des Datensatzes in Teilstücke. Diese werden von
den Mappern zusammen mit den Centroiden als Eingabe verwendet, um darauf parallel
für jeden Punkt des Datensatzes zu berechnen, welchem Clusterschwerpunkt dieser am
nächsten ist.
Diese Zuordnung wird als Schlüssel-Wert-Paar ausgegeben und auf demselben Rechenknoten durch die Combine-Funktion lokal zusammengefasst, quasi die lokalen Clusterschwerpunkte jeder Datenpartition berechnet. Im Reduce-Schritt werden diese Zwischenergebnisse schließlich für jeden Cluster mit der Reduce-Funktion zusammengefasst,
um die globalen Clusterschwerpunkte zu erhalten. Analog zum klassichen k-Means-
85
Cluster
Centroids
Map
Combine
Reduce
Shard 1
Shard 2
Map
Combine
Result
Shard 3
Reduce
Map
Combine
Abbildung 3.13: MapReduce k-Means - Datenfluss
Algorithmus können diese neuen Centroide für einen erneuten MapReduce-Durchlauf
verwendet werden.
Map-Funktion
Die Map-Funktion in Algorithmus 3.25 berechnet für jeden Punkt, welches der nächste
Cluster-Schwerpunkt ist und ordnet den Punkt diesem zu.
Algorithmus 3.25 MapReduce-k-Means: Map-Phase ([ZMH09] angepasst)
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
input: key - null, value - point object
output: key - cluster id, value - point object
procedure Map(hkey, valuei)
minDistance = Double.MAX VALUE
index = −1
for i = 0 to centroids.length do
distance = computeDistance(value, centroids[i])
if distance < minDistance then
minDistance = distance
index = i
Output hindex, valuei
Für ein eingegebenes Schlüssel-Wert-Paar hnull, pointi, bei dem der Schlüssel nicht be-
86
trachtet wird und der Wert die Repräsentation eines Punktes ist, wird über eine Schleife
die Distanz zu jedem Cluster-Schwerpunkt berechnet und der nächste in der Variable
index festgehalten (Zeile 4-8). Die so berechnete Cluster-Zuordnung wird nun ausgegeben in Form eines Schlüssel-Wert-Paares hclusterID, pointi (Zeile 9-11).
Kombinations-Funktion
Die Combine-Funktion (Algorithmus 3.26) führt bereits lokal eine Reduzierung der Schlüssel-Wert-Paare durch, äquivalent zur Reduce-Funktion. Die lokal-generierten Paare werden nach den Cluster-Identifikatoren gruppiert und zusammengefasst. Hierfür wird zum
einen ein Array mit so vielen Feldern verwendet wie die Dimensionalität der Daten ist
und ein Zähler für die Anzahl der Punkte.
Algorithmus 3.26 MapReduce-k-Means: Combine-Phase ([ZMH09] angepasst)
input: key - cluster id, value - set of points
output: key - cluster id, value - number of cluster points and centroid coordinates
procedure Combine(hkey, valuei)
init values[dimCount]
init pointCount = 0
while value.hasNext() do
v = value.next()
for i = 0 to v.dimCount do
values[i]+ = v[i]
10:
pointCount + +
11:
for j = 0 to values.length do
values[i]
12:
values[i] = pointCount
1:
2:
3:
4:
5:
6:
7:
8:
9:
13:
Output hkey, pointCount + values.toString()i
Für einen Cluster-Identifikator werden jetzt in den jeweiligen Dimensionen die Koordinaten der Punkte aufsummiert (Zeile 6-7) und über einen Zähler mitgeführt, wieviele
Punkte aufsummiert worden sind. Abschließend wird jede Dimensionssumme durch die
Anzahl der Punkte geteilt um den lokalen Clusterschwerpunkt zu erhalten (Zeile 9-10).
Das Ergebnis wird als Paar h clusterID, |Punkte| + localCentroid i ausgegeben.
Reduce-Funktion
Am Ende des MapReduce-Durchlaufs werden dann durch die Reduce-Funktion (Algorithmus 3.27) die Zwischenergebnisse, die durch die Combine-Funktion bereits lokal
reduziert wurden, global zusammengefasst. Als Eingabe erhält die Funktion das Paar
87
hclusterID, set(|points| + localCentroid )i, die, analog zur Combine-Funktion (Algorithmus 3.26) reduziert werden.
Algorithmus 3.27 MapReduce-k-Means: Reduce-Phase ([ZMH09] angepasst)
input: key - cluster id, value - set of partial centroid calculations
output: key - cluster id, value - centroid coordinates
procedure Procedure(hkey, valuei)
init values[dimCount]
init pointCount = 0
while value.hasNext() do
v = value.next()
for i = 0 to v.dimCount do
values[i]+ = v[i]
10:
pointCount+ = v.pointCount
11:
for j = 0 to values.length do
values[i]
12:
values[i] = pointCount
1:
2:
3:
4:
5:
6:
7:
8:
9:
13:
Output hkey, values.toString()i
Jedoch werden nicht Punkte, sondern lokale Cluster-Schwerpunkte koordinatenweise aufsummiert (Zeile 4-8) und anschließend durch die Gesamtzahl der Punkte geteilt (Zeile
9-10). Die Ausgabe der Funktion ist nun das Schlüssel-Wert-Paar h clusterID, centroidCoordinates i, also der neue Schwerpunkt des bearbeiteten Clusters, der für die nächste
Iteration verwendet wird.
3.2.6
Weitere Algorithmen
Ein weiteres Verfahren zur Berechnung eines approximativen hierarchischen Clusterings auf Grundlage des MapReduce-Paradigmas ist der PARABLE(PArallel RAndompartition Based hierarchicaL clustEring)-Algorithmus [WD11], der in zwei Phasen gegliedert ist. In der ersten Phase wird eine zufällige Partitionierung mit homogener Partitionsgröße des Datensatzes vorgenommen. Auf diesen Partitionen wird parallel mit
einen sequentiellen Verfahren ein hierarchisches Clustering berechnet, wobei das Clustering durch ein Dendrogramm repräsentiert wird. In der zweiten Phase wird sequentiell
eine Vereinigung der berechneten Dendrogramme zu einem Gesamtergebnis durchgeführt. Diese werden zunächst sequentiell in einem rekursiven Verfahren an einem VorlageDendrogramm ausgerichtet. Anschließend wird das Vorlage-Dendrogramm in Teilbäume
zerlegt und in den anderen Dendrogrammen nach Teilbäumen gesucht, die an denen der
Vorlage ausgerichtet sind und mit dem Clusteridentifikator der Vorlage versehen.
Der PDBSCAN-Algorithmus[XJK99] ist eine weitere Variante des DBSCAN-Algorithmus,
welches als Master-Slave-Architektur umgesetzt ist. Zentrale Komponente dieses Ver-
88
fahrens ist die Verwendung eines verteilten R-Baums als räumliche Indexstruktur. Das
Verfahren ist in drei Phasen gegliedert. In der ersten Phase wird eine Aufteilung der Eingabedaten in mehrere Partitionen vorgenommen, welche an die verfügbaren Rechenknoten (Slaves) verteilt werden. In der zweiten Phase führen die Rechenknoten parallel das
Clustering auf der zugewiesenen Partition mit einer modifizierten Variante des DBSCANAlgorithmus durch. Dieser ist dahingehend verändert, dass er in Randbereichen, die über
die Partition herausragen, über den verteilten räumlichen Index im Punkte aus angrenzenden Partitionen abfragen kann und so Punkt-Paare ermittelt, die für eine Vereinigung
von Clustern betrachtet werden müssen. Diese wird in der dritten Phase unter Nutzung
der Punkte-Paare aus den lokalen Clusterings vom Master ausgeführt.
89
Kapitel 4
Implementierung
In diesem Kapitel werden die Implementierungen der MapReduce-Varianten des Partition-Algorithmus sowie des DBSCAN-Algorithmus vorgestellt. Diese sind in der Programmiersprache Java geschrieben und nutzen als Grundlage das MapReduce-Framework
der MapReduce-Implementierung Hadoop. In den Versuchen werden zusätzlich der MapReduce-kMeans- sowie Parallel-FP-Growth-Algorithmus untersucht. Für diese werden
die Implementierungen des Apache-Projekts Mahout1 genutzt, welches unter anderen
Verfahren aus dem Bereich Clustering und Klassifikation für maschinelles Lernen bereitstellt. Diese können unter anderen auf der Hadoop-Laufzeitumgebung ausgeführt werden
können.
4.1
MapReduce-Partition
Die Implementierung des Algorithmus hält sich an die Beschreibung aus Abschnitt 3.1.3.
Die Implementierung setzt sich aus drei MapReduce-Jobs zusammen, die sequentiell ausgeführt werden (Abbildung 4.1). Es ist festzuhalten, dass die Anzahl von Transaktionen,
in denen frequent Itemsets gesucht werden sollen, vorab nicht zwangsläufig bekannt ist,
insbesondere vor dem Hintergrund, dass sehr große Datenmengen eventuell auf viele Dateien verteilt sind. Aus diesem Grund wird zunächst ein MapReduce-Job ausgeführt, der
die gesamten Eingabedaten durchläuft und ausschließlich die Menge der Transaktionen
zählt.
Im Anschluss beginnt die Berechnung der lokalen frequent Itemsets. Diese wird in einem
seperaten Job ausgeführt, wobei die Eingabedaten in disjunkte Teile aufgeteilt werden
und für jeden Teil der Eingabe ein Mapper die lokalen frequent Itemsets der Partition
berechnet. Diese werden von einem Reducer in eine Menge zusammengefasst und als
1
http://mahout.apache.org/
90
Daten
Transaktionen
(Eingabe)
MapReduceJobs
Datenfluss
Prozessfluss
Zählung der
Transaktionen
Generierung der
Kandidaten
Berechnung des
globalen Supports
Globale frequent
Itemsets
(Ausgabe)
Kandidaten für
global-häufige
Itemsets
Abbildung 4.1: Ablauf der MapReduce-Implementierung des Partition-Algorithmus
Datei im verteilten Dateisystem zwischengespeichert.
Anschließend werden in einem dritten Job die Kandidaten für globale frequent Itemsets
gegen die gesamte Menge von Transaktionen getestet. Hierfür liest jeder Mapper die
Kandidatenmenge ein und einen Teil der Menge von Transaktionen. In diesem Anteil
von Transaktionen wird nun für jeden Kandidaten berechnet, wie oft dieser vorkommt
und dieses als Schlüssel-Wert-Paar ausgegeben. Der Reducer fasst in diesem Schritt die
Vorkommen für jeden Kandidaten zusammen und berechnet, ob der Anteil von notwendigen Vorkommen für jeden Kandidaten erreicht wurde. Kandidaten, die den minimalen
Support erreichen, werden ausgegeben und bilden so die Menge der frequent Itemsets.
Berechnung der Transaktions-Anzahl
Die Berechnung des Umfangs der Transaktionsdaten erfolgt, indem jeder Mapper die
Anzahl der gelesenen Zeilen mitzählt und die Summe ausgibt. Ein Reducer sammelt die
von den Mappern gelieferten Werte ein, addiert diese, und bildet so die Gesamtsumme der gelesenen Zeilen, respektive Transaktionen. Die Anzahl der Transaktionen wird
abschließend im verteilten Dateisystem gespeichert.
91
Berechnung der lokalen frequent Itemsets
Ein Mapper beginnt mit dem Einlesen des zugewiesenen Teils der Transaktionsmenge.
Während des Einlesens werden die Transaktionen direkt in Listen von TransaktionsIdentifikatoren (TID-Liste) zerlegt, so dass für jedes Element der Transaktionsmenge
festgehalten wird, in welcher Transaktion es vorkommt. Die TID-Liste wird als HashMap
gespeichert, welches als Schlüssel das Element und als Wert die Liste enthält. Die initiale Menge von lokalen frequent 1-Itemsets lässt sich direkt aus der TID-Liste generieren,
indem alle Elemente als 1-Itemsets übernommen werden, deren Vorkommen in Relation
zu der Anzahl der Transaktionen des Mappers den minimalen lokalen Support überschreiten. Mit diesen Transaktionen geschehen nun mehrere Dinge. Zum einen werden
diese direkt als Schlüssel-Wert-Paare ausgegeben. Zum anderen bilden diese Itemsets die
Grundlage, auf der größere Itemsets gebildet werden. Sie werden also zusätzlich in einer
Array-List gespeichert. Die Itemsets werden durch Objekt repräsnetiert, welches sowohl
das Itemset als auch die TID-Liste in Form eines Arrays enthält. Zur Vereinfachung des
Prunings von Itemset-Kandidaten wird zusätzlich die String-Repräsentation des Arrays
in einem Hash-Set gespeichert, so dass die Prüfung der häufigen Itemset-Teilmengen
durch eine indizierte Suche optimiert wird.
Die Kandidatengenerierung erfolgt analog zu Algorithmus 3.2. Zusätzlich werden immer nur die Itemsets der vorherigen Iteration vorgehalten, um den Speicherbedarf zu
reduzieren.
Das Pruning eines Itemset-Kandidaten gestaltet sich durch die Speicherung der StringRepräsentation aller bisher generierten lokal-häufigen Itemsets relativ einfach. Für einen
Kandidaten werden alle Teilmengen berechnet und in dem HashSet gesucht. Sobald
eine Teilmenge nicht in dem HashSet enthalten ist, kann der Kandidat direkt verworfen
werden.
Die Implementierung des Reduce-Tasks ist sehr einfach gehalten, da dieser lediglich die
von den Mappern generierten Kandidaten als Eingabe erhält und daraus die Menge der
lokalen frequent Itemset-Kandidaten liefert. Diese werden anschließend zur Zwischenspeicherung in das verteilte Dateisystem geschrieben.
Berechnung der globalen frequent Itemsets
Das initiale Vorgehen der Mapper in der dritten Phase ist identisch zu dem der zweiten
Phase. Zunächst wird aus den Transaktionen der zugewiesenen Eingabe eine TID-Liste
für jedes Item erstellt, die in Form eines HashSets als Schlüssel das Element und als Wert
in Form einer Array-List die Vorkommen in Transaktionen enthält. Zusätzlich wird die
Menge der lokalen frequent Itemsets eingelesen. Nun wird für jedes Kandidaten-Itemset
der Support berechnet. Dies geschieht, indem das Itemset Item für Item durchlaufen wird
92
und die Schnittmenge der TID-Listen für alle Elemente des Itemsets sukzessive berechnet
wird, so dass am Ende für das Itemset die Menge der Transaktionen übrig bleibt, die alle
Elemente des Itemsets enthalten. Die Ausgabe erfolgt nun als Schlüssel-Wert-Paar, wobei
als Schlüssel das Itemset und als Wert die Anzahl der lokalen Vorkommen ausgegeben
werden.
Die Reducer fassen abschließend die Ergebnisse der Mapper gemäß Algorithmus 3.6
zusammen. Ausgegeben werden schlussendlich die globalen frequent Itemsets zusammen
mit der Anzahl ihrer Vorkommen.
4.2
MapReduce-DBSCAN
Die Implementierung des MR-DBSCAN-Algorithmus folgt der Verfahrensbeschreibung
in Abschnitt 3.2.3 auf Seite 65. Als zusätzliche Phase ist hier jedoch die Bestimmung eines minimal-einschließenden Rechtecks der zu clusternden Punktmenge eingeführt (Abbildung 4.2). Dieses ist notwendig, da die geometrischen Ausmaße der zu clusternden
Punktmenge nicht zwingend bekannt ist. Die Erzeugung einer adaptiven Partitionierung
ist jedoch auf die Bekanntheit der Datensatzausmaße angewiesen.
Bestimmung eines MERs
Die Bestimmung des minimal-einschließenden Rechtecks (MER) für die Eingabedaten
erfolgt auf triviale Weise, indem die Mapper den zugewiesenen Teil der Eingabe punktweise durchlaufen und die minimalen und maximalen x- und y-Koordinaten festhalten.
Diese wird von jedem Mapper jeweils als Schlüssel-Wert-Paar h null, (x, y) i ausgegeben
werden. Ein Reducer verarbeitet diese Zwischenergebnisse nun und bestimmt daraus die
linke untere und rechte obere Ecke des einschließenden Rechtecks.
Berechnung einer Partitionierung
Die Mapper berechnen in der Initialisierung zunächst unabhängig voneinander aus dem
MER ein Raster, indem nacheinander in positiver x- und y-Richtung mit Schrittweite ε2
vom Mittelpunkt des minimal einschließenden Rechtecks ausgehend gelaufen wird, bis
die erreichte Koordinate der jeweiligen Dimension echt größer als die maximal x-, bzw.
y-Koordinate, sind. Die gleiche Zellenzahl kommt auch in negativer Richtung hinzu, so
dass die Größe des Rasters [2∗X][2∗Y ] beträgt, wenn X und Y die Anzahl der gezählten
Zellen in der jeweiligen Dimension sei. Im Anschluss berechnet jeder Mapper für jeden
2
Die Schrittweite beträgt hier nicht, im Gegensatz zur Beschreibung in 3.2.3, um das Raster nicht
zu groß werden zu lassen.
93
Daten
Rohdaten (Eingabe)
MapReduceJobs
Datenfluss
MER der
Eingabedaten
berechnen
Prozessfluss
Koordinaten des
MER
Datenpartitionierung
Angrenzende
Partitionenpaare
Partitionierungsprofil
Lokales Clustering
Ergebnisse der
lokalen Clusterings
Menge der lokalen
Clusteridentifikatoren
Erzeuge MergeAbbildung
Merge-Abbildung
Umbenennung
der Daten
Daten mit globalen
Cluster IDs
(Ausgabe)
Abbildung 4.2: Ablauf der Implementierung des MR-DBSCAN-Algorithmus
94
Punkt der zugewiesenen Eingabe, in welcher Rasterzelle ein Punkt enthalten ist, und
inkrementiert einen Zähler für diese Zelle. Nachdem die Eingabe vollständig gelesen und
bearbeitet ist, gibt der Mapper für jede Rasterzelle die Anzahl der enthaltenen Punkte
in der Form h ID Rasterzelle, Punktzahl i aus.
Ein Reducer sammelt nun die Schlüssel-Wert-Paare der Mapper ein und erzeugt aus
diesen analog zu den Mappern ein Raster für den Datensatz, in welchem die Ergebnisse
der Mapper zusammengefasst werden und so die Punktverteilung für den ganzen Datensatz repräsentiert wird. Auf Grundlage des Rasters wird eine binäre Unterteilung des
minimal-einschließenden Rechtecks, welches das gesamte Raster einschließt, vorgenommen. Hierfür wird eine Rechtecke enthaltende Queue so lange abgearbeitet, bis diese leer
ist. Initialisiert wird diese mit dem MER des Rasters. Aus der Queue wird nun das erste
Rechteck entnommen und mit Hilfe des Rasters berechnet, wie viele Punkte es enthält.
Dieses geschieht, in dem über die Koordinaten beider Objekte die vom Rechteck überdeckten Zellen berechnet und die darin enthaltenen Punktsummen aufsummiert werden.
Wenn diese Punktsumme nun die vorgegebenen Kosten überschreitet, so ist das Rechteck in zwei neue Rechtecke zu unterteilen. Hierfür werden parallel zu der x- und y-Achse
verlaufende Unterteilungslinien aus den Rechteck-Koordinaten berechnet, die parallel im
Abstand ε zueinander achsparallel verlaufen. Für jede dieser Linien wird die Kostendifferenz wie zuvor durch die Bestimmung der überdeckten Rasterzellen für die zwei sich aus
der Unterteilung ergebenden Rechtecke berechnet und die Linie gewählt, die zu einer
minimalen Kostendifferenz führt; die resultierenden Rechtecke werden zurückgeliefert
und in die Queue für eine eventuelle weitere Unterteilung eingefügt. Ein Rechteck, dass
die maximalen Kosten für eine Partition nicht überschreitet, stellt eine gültige Partition dar und wird zwischengespeichert. Sobald die Queue keine Rechtecke mehr enthält
ist die Partitionierung abgeschlossen und die Partitionsliste wird im verteilten Dateisystem gespeichert. Zusätzlich wird für jede Partition der berechneten Partitionierung die
Menge der direkt angrenzenden Partitionen bestimmt und diese als Schlüssel-Wert-Paar
ausgegeben.
Lokales Clustering
Die Mapper lesen zu Beginn zuerst die Partitionsliste ein. Für jeden Punkt der Eingabe
wird gegen die Partitionsliste über die Koordinaten der Partition und des Punktes die
Partition bestimmt, in der der Punkt enthalten ist, und der Partitions-Identifikator mit
dem Punkt als Schlüssel-Wert-Paar ausgegeben.
Für die Reduce-Phase werden nun so viele Reduce-Tasks erzeugt, wie es Partitionen
gibt; dieses soll gewährleisten, dass das Clustering jeder Partition in einem eigenen Task
stattfindet. Ein Reducer liest zunächst die ihm zugeordneten Punkte ein und speichert
diese als Clusterobjekte, so dass einem Punkt zusätzlich ein Klassifikationsflag und ein
Cluster-Identifikator zugeordnet ist. Sobald die Eingabe vollständig gelesen ist wird aus
der Punktmenge ein R-Baum aufgebaut, wofür die Implementierung Sort-Tile-Recursive
95
R-Tree(STRtree) aus der Java Topology Suite(JTS) verwendet wird. Auf diesem wird
das Clustering gemäß des modifizierten DBSCAN-Algorithmus (siehe 3.18 auf Seite 73)
durchgeführt. Mit den Ergebnissen des Clusterings passieren nun drei Dinge. Zum einen
werden die geclusterten Punkte als Schlüssel-Wert-Paare der Form h Punkt, (lokale
Cluster-ID, Punkt-Klassifikation) i ausgegeben; der lokale Clusteridentifikator setzt sich
zusammen aus einem Identifikator für die Partition und dem numerischen Wert, der sich
aus dem Clustering-Prozess ergibt. Zusätzlich schreibt jeder Reducer die Menge aller lokale vorkommenden Clusteridentifikatoren in das Dateisystem. Außerdem wird für jede
Partition die Menge der Punkte innerhalb der inneren und äußeren Randbereiche (siehe
Abbildung 3.7 auf Seite 71) bestimmt und in das verteilte Dateisystem geschrieben.
Berechnung einer Vereinigungsabbildung
Die Mapper erhalten als Eingabe einen Teil der Schlüssel-Wert-Paare mit angrenzenden
Partitionspaaren, die in der Partitionserzeugung generiert worden sind. Für jedes Paar
wird aus dem verteilten Dateisystem für jede Partition die Liste der Kernpunkte im
inneren Randbereich und die Liste der Rand- und Kernpunkte im äußeren Randbereich
der Partition gelesen. Die Berechnung des Durchschnitts von jeweils zwei Listen erfolgt,
indem eine der beiden zu vergleichenden Clusterobjekt-Listen in eine HashMap umgewandelt wird, die als Schlüssel eine String-Repräsentation des Punktes und als Wert
das Clusterobjekt besitzt. Anschließend wird die Liste durchlaufen und für jeden Punkt
gegen die HashMap getestet, ob die String-Repräsentation in der Map enthalten ist;
ist dies der Fall werden beide Clusterobjekte zu einer Ergebnisliste von ClusterobjektPaaren hinzugefügt. Abschließend wird die Liste durchlaufen und Schlüssel-Wert-Paare
für jedes Punktpaar ausgegeben.
Ein Reducer sammelt nun die Paare der Mapper ein und generiert aus diesen zunächst einen ungerichteten Graphen, für den die Implementierung des Java Universal Network/Graph(JUNG)-Frameworks verwendet wird. Hierfür wird für jedes gelesene
Identifikatoren-Paar jeweils ein Knoten sowie eine die beiden Knoten verbindende Kante generiert und in den Graphen eingefügt; der Bezeichner der Knoten ist hierbei der
lokale Clusteridentifikator. Sobald alle Identifikatoren-Paare gelesen und in den Graph
eingefügt worden sind, werden die Zusammenhangskomponenten des Graphen berechnet;
für die Berechnung wird ein WeakComponentClusterer des JUNG-Frameworks genutzt,
der eine Menge von berechneten Zusammenhangskomponenten zurückliefert, wobei jede
Zusammenhangskomponente wiederum aus einer Menge von Knoten der Komponente besteht. Es wird nun eine HashMap für die Vereinigungsabbildung erstellt, die als
Schlüssel den lokalen und als Wert den globale Clusteridentifikator enthält. Zunächst
wird ein Zähler für die globalen Clusteridentifikatoren mit eins initialisiert. Nun wird jede Zusammenhangskomponente durchlaufen und für jeden Knoten ein Eintrag mit dem
lokalen und dem globalen Clusteridentifikator zu der Map hinzugefügt; nach jeder Komponente wird der Zähler um eins inkrementiert. Anschließend wird nun ebenfalls aus
96
dem Dateisystem für jede Partition die Liste lokaler Clusteridentifikatoren gelesen, aus
denen bereits im Graph vorhandene Identifikatoren entfernt werden. Auch hier wird für
jeden lokalen Clusteridentifikator ein Eintrag mit dem lokalen und dem globalen Clusteridentifikator in die Map eingefügt und anschließend der Zähler inkrementiert; Punkte,
die bereits lokal als Rauschen markiert worden sind (Partitions-Identifikator + ”0”), erhalten global den Identifikator ”0” als Rausch-Marker. Abschließend wird die Map mit
der Abbildung serialisiert und im verteilten Dateisystem gespeichert.
Umbenennung der Ergebnisse des lokalen Clusterings
Die Mapper lesen in der Initialisierung die Vereinigungsabbildung, die im vorherigen
Schritt erzeugt wurde. Diese wurde als HashMap angelegt, damit die Suche nach einem
globalen Cluster-Identifikator ausgehend von einem lokalen Cluster-Identifikator über
einen Index beschleunigt werden kann. Anschließend erhalten sie einen Teil der Ausgabe, die im lokalen Clusterings erzeugt wurde. Mit Hilfe der HashMap wird der globale
Identifikator des Clusterobjekts gesucht und im Clusterobjekt aktualisiert. Schließlich
wird das modifizierte Schlüssel-Wert-Paar ausgegeben.
Die Reducer führen nun die finale Bestimmung der Punktklassifikation für Punkte aus
den Überlappungsbereichen der Partitionen durch. Die implementierte Vorgehensweise
orientiert sich direkt an 3.21 auf Seite 78 und wird deshalb nicht weiter ausgeführt.
4.2.1
MapReduce-kMeans und Parallel FP-Growth
In diesem Abschnitt sollen die Implementierungen der beiden Algorithmen, die aus dem
Apache Mahout Projekt stammen, kurz analysiert werden. Es soll betrachtet werden,
ob und, wenn ja, welche Unterschiede es zwischen den vorgestellten Beschreibungen in
3.2.5 und 3.1.3 gibt.
MapReduce-kMeans
Die Implementierung des MapReduce-kMeans-Algorithmus vom Mahout-Projekt entspricht nicht der Beschreibung in [ZMH09], die in Abschnitt 3.2.5 vorgestellt wurde.
Im Gegensatz zu der gelieferten Beschreibung besteht die Implementierung lediglich aus
einem einzigen Map-Task pro Iteration. In diesem Map-Task werden die eingegeben
Punkte mittels eines Cluster-Klassifikators dem nächstgelegenen Cluster-Zentroiden zugeordnet. Die im Laufe der Berechnung aktualisierten Zentroide werden am Ende jeder
Iteration ausgegeben und als Eingabe-Zentroide der nächsten Iteration genutzt.
97
Parallel FP-Growth
Die Implementierung des Parallel FP-Growth-Algorithmus folgt der Verfahrensbechreibung von [LWZ+ 08], die in Abschnitt 3.1.3 vorgestellt wurde. Die Implementierung ist
ebenfalls in drei Phasen aufgeteilt für die Zählung der frequent 1-Itemsets, die Suche nach
frequent Itemsets in gruppenabhängigen Shards und die Aggregierung der Teilergebnisse. Es wird ebenfalls die zentrale Datenstruktur zur Speicherung der frequent Itemsets,
ein Maximum-Heap mit einer begrenzten Anzahl von Elementen, verwendet. Eine oberflächliche Betrachtung des Quelltextes der Implementierung hat keine Auffälligkeiten zu
Tage gefördert.
98
Kapitel 5
Versuche
In diesem Kapitel werden vier verschiedene Verfahren auf MapReduce-Basis, jeweils
zwei aus dem Bereich der Assoziations- und Clusteranalyse, untersucht, die in Kapitel
3 vorgestellt wurden. Für die Assoziationsanalyse werden hier die Algorithmen MRPartition sowie Parallel FP-Growth, für die Clusteranalyse die Algorithmen MR-kMeans
sowie MR-DBSCAN untersucht.
Für die Algorithmen zur Assoziationsanalyse wird hierbei zum einen experimentell die
Korrektheit des Parallel FP-Growth-Algorithmus untersucht, zum anderen wird für beide
Algorithmen die Tauglichkeit am Beispiel von Transaktionsdatenbanken verschiedener
Größe untersucht.
Für die Algorithmen der Clusteranalyse wird am Beispiel des MR-kMeans-Algorithmus
die Auswirkung der Blockgröße der Eingabedaten auf die Laufzeit des Algorithmus betrachtet. Für den MR-DBSCAN-Algorithmus wird experimentell die Auswirkung der
Parameterwahl auf die Laufzeit untersucht, insbesondere im Hinsicht auf die sequentiellen Anteile des Algorithmus. Für beide Verfahren wird zusätzlich der Speed-Up betrachtet. Schließlich werden beide Verfahren in einem Versuch mit großen Testdaten direkt
gegenübergestellt und verglichen.
Für die Versuche wurde ein Hadoop-Cluster auf drei Rechnern aus dem Rechnerpool des
Fachgebiets Datenbanken und Informationssysteme aufgesetzt. Hierbei handelt es sich
um zwei Rechner mit Intel Core i3-540 2-Kern-Prozessoren mit 3,07 GHz, 4 virtuellen
Kernen und 4 GB Arbeitsspeicher (Typ-1) sowie einen Rechner mit einem Intel Core
i7-2600 4-Kern Prozessor mit 8 virtuellen Kernen und 16 GB Arbeitsspeicher (Typ-2).
Vom ersten Typ werden dem Hadoop-Cluster jeweils 2 virtuelle Prozessorkerne und 3
GB Arbeitsspeicher, vom zweiten Typ 4 virtuelle Kerne sowie 6 GB Arbeitsspeicher zu
Verfügung gestellt.
Durch diese Ressourcenzuweisung lässt sich durch sukzessives Hinzufügen von Knoten
99
vom Typ 1 und schließlich Typ 2 zum Cluster grob eine Verdopplung der Leistung mit
jedem hinzugefügten Knoten darstellen:
• Konfiguration 1 (1 Knoten): Typ-1
• Konfiguration 2 (2 Knoten): Typ-1 + Typ-1
• Konfiguration 3 (3 Knoten): Typ-1 + Typ-1 + Typ-2
Zusätzlich ist der Hadoop-Cluster so konfiguriert, dass der Nodemanager jedes Knotens
maximal 3 GB Speicher zur Verfügung stellt. Das verteilte Dateisystem wird aus den
lokalen Festplatten der Knoten aufgebaut und besitzt eine standardmäßige Blockgröße
von 128MB und einem Replikationsfaktor von 3, so dass im Betrieb mit 3 Knoten jeder
Block auf jedem Knoten vorliegt.
Auf jedem Knoten des Clusters laufen folglich als Dienste ein Namenode für die Bereitstellung von Speicher für das verteilte Dateisystem sowie ein Nodemanager, der die
Rechenressourcen des Knotens verwaltet. Die zentralen Komponenten, der ResourceManager für die Verwaltung der Rechenressourcen des Clusters und der Namenode für
die Metadatenverwaltung des Clusters, laufen auf dem Typ-2 Knoten. Sofern nicht anders angegeben beträgt die Blockgröße für die Dateien im verteilten Dateisystem 64 MB.
Zusätzlich beträgt die Größe für einen Container 1 GB, wovon 768 MB von der JVM,
die im Container ausgeführt wird, für den Heap-Speicher verwendet werden können.
5.1
Assoziationsanalyse
In den Versuchen mit dem MapReduce-Partition und Parallel FP-Growth-Algorithmus
sollen verschiedene Aspekte untersucht werden. Zum Abschluss der Vorstellung des Parallel FP-Growth-Algorithmus in Abschnitt 3.1.3 blieb die Frage offen, ob dieser eine
exakte oder ungefähre Lösung liefert. Dies soll in einem ersten Experiment untersucht
werden. In zwei weiteren Versuchen soll unabhängig voneinander die Performance beider
Algorithmen untersucht werden. Dies geschieht durch Tests der Implementierungen in
unterschiedlichen Konfigurationen. Zum Abschluss sollen schließlich beide in einem Test
einem Vergleich hinsichtlich der Tauglichkeit antreten.
Für die Versuche mit den Verfahren der Assoziationsanalyse werden die folgenden Testdatensätze von Frequent Itemset Mining Implementations(FIMI)1 verwendet, bei denen
es sich sowohl maschinell erzeugte als erfasste Realwelt-Daten handelt:
• Synthetische Testdaten:
1
http://fimi.ua.ac.be/
100
– T10I4D100K.dat (3,9 MB) - 100.000 Transaktion, 1000 verschiedene Items,
durchschnittliche Transaktionslänge: 10, durchschnittliche Länge der häufigsten Itemsets: 4
– T40I10D100K.dat (15 MB) - 100.000 Transaktionen, 1000 verschiedene Items,
durchschnittliche Transaktionslänge: 40, durchschnittliche Länge der häufigsten Itemsets: 10
• Realwelt-Testdaten:
– accidents.dat2 (34 MB) - Offizielle Unfallstatistiken des National Institute of
Statistics (NIS) der Region Flandern (Belgien), 340.183 Transaktionen, 572
verschiedene Items, durchschnittlich Transaktionslänge: 54 Items
– webdocs.dat3 (1,4 GB) - Sammlung von 1,7 Million HTML-Dokumenten aus
dem Internet, 1.692.082 Transaktionen, 5.267.656 verschiedene Items, maximale Transaktionslänge: 71.412 Items
Bei diesen handelt es sich um einfache Textdateien, in denen jede Zeile einer Transaktion
entspricht.
5.1.1
Experimentelle Untersuchung der Korrektheit des Parallel FP-Growth-Algorithmus
Am Ende der Beschreibung des Parallel FP-Growth-Algorithmus (Abschnitt 3.1.3, Seite
49) ist die Frage offen geblieben, ob dieser ein exaktes oder approximatives Ergebnis
liefert. Dies soll anhand eines kleinen Beispiels untersucht werden. Verwendet wird hierfür
die Transaktionsdatenbank in Tabelle 5.1, welche aus dem Artikel über den AprioriAlgorithmus4 aus der englischen Wikipedia stammt.
Itemsets
{1, 2, 3, 4}
{1, 2, 4}
{1, 2}
{2, 3, 4}
{2, 3}
{3, 4}
{2, 4}
Tabelle 5.1: Beispiel-Transaktionsdatenbank
2
http://fimi.ua.ac.be/data/accidents.pdf
http://fimi.ua.ac.be/data/webdocs.pdf
4
http://en.wikipedia.org/wiki/Apriori algorithm
3
101
Damit ein Itemset häufig ist wird verlangt, dass dieses in mindestens drei Transaktionen
auftritt. Folglich ergeben sich als frequent 1-Itemsets die vier Itemsets aus Tabelle 5.2
mit der entsprechenden Anzahl von Vorkommen.
Itemset
{1}
{2}
{3}
{4}
Anzahl
3
6
4
5
Tabelle 5.2: Häufige 1-Itemsets
Es sind somit alle vier vorkommenden 1-Itemsets der Transaktionsdatenbank häufig.
Die sich daraus ergebenden Kombinationen von 2-Itemsets sind mit ihrem Support in
Tabelle 5.3 dargestellt.
Itemset
{1, 2}
{1, 3}
{1, 4}
{2, 3}
{2, 4}
{3, 4}
Support
3
1
2
3
4
3
Tabelle 5.3: Häufige 2-Itemsets
Von diesen sechs 2-Itemsets sind jedoch nur vier häufig. Das einzige sich daraus ergebende 3-Itemset, welches nicht aufgrund nicht-häufiger Teilmengen verworfen wird, ist in
Tabelle 5.4 abgebildet. Da es jedoch nur einen Support von zwei hat wird es verworfen.
Itemset
{2, 3, 4}
Support
2
Tabelle 5.4: 3-Itemset Kandidaten
Auf dieser Transaktionsdatenbank werden nun der MapReduce-Partition- sowie der
Parallel-FP-Growth-Algorithmus ausgeführt. Hierbei zeigt sich, dass der MR-PartitionAlgorithmus die gleichen häufigen Itemsets der Länge eins und zwei liefert, die in Tabelle
5.2 und 5.3 aufgeführt sind. Die Ausführung des Parallel-FP-Growth liefert keine Ausgabe; die Ursache hierfür ließ sich nicht klären.
Aus diesem Grund wird eine theoretische Betrachtung des Algorithmus mit der Transaktionsdatenbank (Tabelle 5.1) als Eingabe vorgenommen. Es wird zunächst festgelegt,
dass der Algorithmus vier Gruppen verwendet; alle Items werden folglich durch die HashFunktion verschiedenen Gruppen zugewiesen.
102
Zunächst wird die Menge aller häufigen 1-Itemsets bestimmt und diese nach ihrem Support absteigend sortiert. Diese sind bereits in Tabelle 5.2 aufgelistet und ergeben, absteigend nach der Anzahl der Vorkommen sortiert die Liste {2, 4, 3, 1}.
Transaktion
{2, 4, 3, 1}
{2, 4, 1}
{2, 1}
{2, 4, 3}
{2, 3}
{4, 3}
{2, 4}
G-1
{2, 4, 3, 1}
{2, 4, 1}
{2, 1}
G-2
{2}
{2}
{2}
{2}
{2}
G-3
{2, 4, 3}
G-4
{2, 4}
{2, 4}
{2, 4, 3}
{2, 3}
{4, 3}
{2, 4}
{2}
{4}
{2, 4}
Tabelle 5.5: Gruppenabhängige Shards
.
Mit dieser Liste wird nun die Transaktionsdatenbank nach Algorithmus 3.8 in vier gruppenabhängige Shards transformiert transformiert; diese ist in Tabelle 5.5 dargestellt. Ein
Berechnen der frequent Itemsets für jeden gruppenabhängigen Shard von Hand liefert
exakt das gleiche Ergebnis, welches beispielsweise eine Berechnung mit Hilfe des Apriorioder FP-Growth-Algorithmus auf der vollständigen Transaktionsdatenbank ergibt.
Dies zeigt, dass der Parallel-FP-Growth zumindestens für dieses kleine Beispiel ein exaktes Ergebniss liefert. Es werden alle vorhandenen frequent Itemsets mit dem exakten
Support-Wert gefunden.
Ein zweiter Versuch wird mit dem synthetischen Testdatensatz T10I4D100K.dat durchgeführt. Hierfür wird ein relativer Support von 0.0001, was einem absoluten Support von
10 Transaktionen bei 100.000 Transaktionen in der Transaktionsdatenbank entspricht,
verwendet. Zusätzlich soll der PFP-Algorithmus die 100 häufigsten Itemsets für jedes
Item ausgegeben und die vom Parallel FP-Growth vorgegebene standardmäßige Anzahl
von 50 Gruppen verwenden.
Partition
411.365 Itemsets
Parallel FP-Growth
362 Itemsets
Tabelle 5.6: Anzahl gefundener Itemsets
Bei den Anzahl der gefundenen Itemsets für diesen Versuch (Tabelle 5.6) ist zunächst
erkennbar, dass sich die Anzahl der gefundenen Itemsets stark unterscheidet. Die Menge
der vom PFP-Algorithmus erhöht sich auch nicht, wenn man die Anzahl der auszugebenden frequent Itemsets für ein Item massiv erhöht, beispielsweise auf 10.000. Eine
Stichprobenartige Prüfung zeigte, dass die gefundenen frequent Itemsets des Parallel FPGrowth-Algorithmus auch in den Ergebnissen des MapReduce-Partition-Algorithmus
auftreten. Jedoch stimmten die berechneten Supportwerte beider Ergebnisse nicht überein.
103
Hinsichtlich der Anzahl gefundener Itemsets ist die Vermutung naheliegend, dass auf
die Ausgabe der Teilmengen gefundener frequent Itemsets verzichtet wird. Eine tiefergehende Analyse des Quelltextes bestätigt diese Vermutung. Itemsets werden in der
Implementierung des Maximum-Heaps, der vom Parallel FP-Growth-Algorithmus verwendet wird, nur in den Maximum-Heap eingefügt, wenn es keine Teilmenge eines bereits
vorhandenen Itemsets ist oder aber einen größeren Support als dieses besitzt. Außerdem
werden Itemsets, die Teilmenge eines einzufügenden Itemsets sind und einen geringeren
oder gleichen Support als dieses besitzen, ersetzt.
Abschließend lässt sich als Fazit festhalten, dass der Parallel FP-Growth offensichtlich
nicht immer eine exakte Lösung hinsichtlich der Supportwerte liefert. Sofern sich dies
im Rahmen dieses Versuchs sagen lässt, beinhaltet die Lösung jedoch sehr wohl die
tatsächlichen existierenden frequent Itemsets.
5.1.2
Untersuchung der Performance des Partition-Algorithmus
In diesem Versuch soll für die MapReduce-Implementierung des Partition-Algorithmus
mit verschiedenen Datensätzen die Leistungsfähigkeit untersucht werden. Hierfür werden
die drei Datensätze T10I4D100K.dat, T40I10D100K.dat und accidents.dat verwendet.
Aufgrund der geringen Datengröße werden diese jeweils in vier Teile aufgeteilt, damit
der Algorithmus mehrere Eingabe-Teile erhält und eine der Menge der Eingabedateien
entsprechende Anzahl von Map-Tasks erzeugt, die parallel verarbeitet werden. Genutzt
werden alle drei Knoten des Clusters. Getestet wird die Laufzeit des Algorithmus mit
verschiedenen Eingabedaten sowie minimalen Support-Werten.
minimaler Support
0,01 (1000 Transaktionen)
0,001 (100 Transaktionen)
0,0001 (10 Transaktionen)
T10I4D100K
01:16
11:28
Fehler
T40I10D100K
Fehler
Fehler
Fehler
accidents
Fehler
Fehler
Fehler
Tabelle 5.7: Testresultate für den MR-Partition-Algorithmus
Die Ergebnisse der Test (Tabelle 5.7) sind sehr ernüchternd. Die Berechnung der frequent Itemsets gelingt nur bei dem kleinsten Testdatensatz, und selbst bei einem sehr
kleinen minimalen Support scheitert die Berechnung. Die Ursache hierfür liegt in der
Tatsache begründet, das der Algorithmus keine komprimierte Darstellung der Transaktionsdatenbank für die Suche nach frequent Itemsets verwendet. In jeder Iteration
ist es erforderlich, dass die gefundenen Itemsets der vorherigen Iteration mit den zugehörigen Transaktionslisten vorgehalten werden, um aus diesen durch die Kombination
zweier Itemsets ein neues zu generieren; es ist folglich in jedem Schritt notwendig, die
gefundenen frequent Itemsets von zwei Iterationen zusammen mit den TID-Listen zu
104
speichern. Aus dem vorherigen Versuch ist bereits bekannt, dass eine Ausführung des
MapReduce-Partition-Algorithmus auf den Datensatz T10I4D100K.dat bei einem minimalen Support von 0,0001 bereits etwa 400.000 frequent Itemsets findet (Tabelle 5.6.
Verstärkend kommt in diesen Versuchen noch hinzu, dass durch Aufteilung der Eingabedaten in vier Teile der minimale Support ebenfalls durch vier geteilt wird und somit
0,000025 beträgt, also noch weiter reduziert wird, und folglich zu noch mehr gefundenen
Itemsets führt.
Somit lässt sich an dieser Stelle bereits festhalten, dass der MapReduce-Partition-Algorithmus für eine Anwendung auf große Transaktionsdatenbanken völlig ungeeignet ist,
da der aus der Kandidatengenerierung resultierende Speicherbedarf viel zu groß ist.
5.1.3
Untersuchung der Performance des Parallel FP-GrowthAlgorithmus
Für den Parallel FP-Growth-Algorithmus sollen in dieser Versuchsreihe zwei verschiedene Aspekte betrachtet werden. Zum einen soll die Laufzeit des Algorithmus bei einer
verschiedenen Anzahl von Rechenknoten ermittelt werden. Zum anderen soll erprobt
werden, wie sich die drei Steuerparameter des Algorithmus, die Gruppenzahl, der minimale Support und die Anzahl zu auszugebender frequent Itemsets für jedes Item, auf
die Laufzeit auswirken.
Knotenzahl
Für diesen Versuch wurde der Testdatensatz webdocs.dat verwendet. Als Parameter für
den Algorithmus wurde eine Gruppenzahl von 10, ein minimaler Support von 10 sowie
die Ausgabe der 500 häufigsten Itemsets gewählt.
Die Ergebnisse für die Gesamtlaufzeit (Abbildung 5.1) zeigen, dass der Algorithmus für
den verwendeten Datensatz keinen linearen Speed-Up besitzt und dieser beim Übergang
von zwei auf drei Knoten sogar noch stärker abflacht.
Eine Betrachtung der Laufzeiten der einzelnen Phasen des Algorithmus zeigt, dass das
Zählen der häufigen 1-Itemsets die größte Zeit des Algorithmus in Anspruch nimmt. Bereits in diesen ist kein linearer Speed-Up erkennbar. Das Mining von frequent Itemsets
aus den gruppenabhängigen Transaktionsdatenbanken zeigt hier jedoch einen näherungsweisen linearen Speed-Up. Der letzte Schritt hingegen, die Aggregierung der Ergebnisse,
ist von der Laufzeit her annähernd konstant und verändert sich bei einer veränderten
Knotenzahl nicht signifikant.
105
Abbildung 5.1: Laufzeit PFP-Algorithmus (Gruppen: 10, Support: 10, K: 50)
Steuerparameter
Betrachtet werden soll in diesem Test, wie sich die verschiedenen Steuerparameter des
Algorithmus auf die Laufzeit auswirken. Die Tests werden mit allen drei Knoten des
Clusters ausgeführt und es kommt der Datensatz webdocs.dat zur Anwendung.
Zuerst wird die Laufzeit mit einer verschieden großen Zahl von gruppenabhängigen
Transaktionsdatenbanken getestet. Wie in Tabelle 5.8 zu sehen ist, hat die Variation
der Anzahl zu bildender Gruppen bei der verwendeten Knotenzahl und Datensatzgröße keinen keine gravierenden Veränderung der Laufzeit zur Folge. Zwar steigt bei einer
Anzahl von 5000 Gruppen die Laufzeit für die Zählung der häufigen 1-Itemsets leicht
an, da diese Phase des Algorithmus noch nicht von der Anzahl der Gruppen beeinflusst
wird ist dieses aller Wahrscheinlichkeit eine Folge des Nichtdeterminismus der Programmausführung auf dem Cluster.
Gruppen
100
500
2.500
5.000
Gesamtzeit
03:46
03:29
03:24
03:53
Zählen
03:03
02:46
02:45
03:15
Suchen
00:27
00:25
00:24
00:25
Aggregierung
00:16
00:18
00:15
00:13
Tabelle 5.8: Laufzeit bei variabler Gruppenzahl (minSup = 10, K = 500)
In einer weiteren Versuchsreihe, bei der mit unterschiedlichen Parameterausprägungen
des minimalen Supports getestet wird, zeigt sich, dass der minimale Support keine
106
Abbildung 5.2: Laufzeiten der einzelnen Teile des PFP-Algorithmus
messbaren Einfluss auf die Performance des Verfahrens hat (Tabelle 5.9).
Support
10
100
1.000
10.000
Laufzeit
03:37
03:38
03:58
03:41
Zählen
02:48
02:59
03:17
03:01
Suche
00:32
00:24
00:24
00:28
Aggregierung
00:17
00:15
00:17
00:12
Tabelle 5.9: Laufzeit bei unterschiedlichem minimalen Support (100 Gruppen, K = 500)
Das gleiche trifft auf die Laufzeiten zu, wenn die Anzahl der häufigen Itemsets, die für
ein Item ausgegeben werden sollen, variieren. Wie in Tabelle 5.10 zu sehen ist, gibt es
auch hier keinen deutlichen Unterschied zwischen Laufzeiten der einzelnen Versuche.
Insgesamt haben die Steuerparameter für den gewähltem Datensatz keine signifikante
Auswirkung auf die Laufzeit des Algorithmus. Dieses mag jedoch der dem geringen
Umfang des Testdatensatzes geschuldet sein.
107
K
50
250
500
2.500
Laufzeit
03:41
03:26
03:46
03:36
Zählen
02:54
02:42
03:00
02:50
Suchen
00:29
00:26
00:31
00:31
Aggregierung
00:18
00:18
00:15
00:15
Tabelle 5.10: Laufzeit bei einer verschiedenen Anzahl auszugebener frequent Itemsets
pro Item (100 Gruppen, minSup = 10)
5.1.4
Performancevergleich zwischen MR-Partitition und PFPGrowth
Effektiv ist ein Vergleich der beiden Verfahren auf großen Datenmengen nicht möglich.
Die Ausführung des MapPartition-Algorithmus mit dem Datensatz webdocs.dat bricht
bereits nach wenigen Minuten wegen Speichermangels ab. Mit einer sequentiellen Implementierung des Partition-Algorithmus wurde ermittelt, wie viele frequent Itemsets in
den einzelnen Iterationen entstehen (Tabelle 5.11). Bereits bei sehr kleinen Datenmengen werden unheimlich große Mengen (> 400.000) frequent Itemsets gefunden, wenn ein
sehr kleiner minimaler Support (hier 0,0001) gewählt.
Itemset-Länge
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Gesamt
Laufzeit
Anzahl Itemsets
867
110.793
152.805
80.710
39.954
16.172
6.225
2.473
954
318
80
13
1
0
411.365
749 Sekunden
Tabelle 5.11: Anzahl gefundener frequent Itemsets für T10I4D100K.dat
Eine Suche mit demselben minimalen Support von 0,0001 mit dem Datensatz T40I10D100K.dat wurde nach 7 Stunden und 3 Minuten abgebrochen. Zu diesem Zeitpunkt hatte der Algorithmus bereits 924 Itemsets der Länge 1, 179.740 der Länge 2 und 1.383.971
108
Itemsets der Länge 3 geliefert. Ein Ausführung des Algorithmus ist also generell nur bei
einem relativ großen Support möglich.
Im Gegensatz dazu steht die Ausführung der Versuche aus Abschnitt 5.1.2, die mit
dem MR-Partition-Algorithmus durchgeführt wurden, mit dem Parallel FP-GrowthAlgorithmus.
minimaler Support
0,01 (1.000 Transaktionen)
0,001 (100 Transaktionen)
0,0001 (10 Transaktionen)
T10I4D100K
00:51
00:44
00:54
T40I10D100K
00:49
00:47
00:49
accidents
00:53
00:54
00:49
Tabelle 5.12: PFP-Algorithmus - Test mit verschiedenen Datensätzen
Die Ergebnisse (Tabelle 5.12) zeigen auch, dass die Laufzeit des Algorithmus auch bei
diesen Versuchen weitestgehend unabhängig von der Parameterzahl ist und die Datensätze, im Gegensatz zum problemlos verarbeitet werden können.
5.2
Clusteranalyse
In den Versuchsreihen mit den beiden Clustering-Algorithen, dem MapReduce-kMeansund MapReduce-DBSCAN-Algorithmus, sollen die Verhaltensweisen der Algorithmen
unter verschiedenen Einflüssen getestet werden. Es soll zum einen untersucht werden,
wie sich unterschiedliche Blockgrößen der Eingabedaten und Knotenzahlen auf den
MapReduce-kMeans-Algorithmus auswirken, für den MapReduce-DBSCAN-Algorithmus
sollen das Laufzeitverhalten bei verschiedenen Steuerparameterausprägungen und Knotenzahlen untersucht werden. Abschließend werden beide Algorithmen in einem Versuch
gegeneinander getestet.
Als Testdaten für das Clustering kommen zwei verschiedene Geodatensätze von OpenStreetMap zum Einsatz. Bei diesen handelt es sich um die Datensätze für Deutschland
und Europa, die von der Geofabrik GmbH5 bereitgestellt werden. Aus diesen Datensätzen werden lediglich die ”Road”-Objekte genutzt. Aus dessen Geometrien werden die
Punkt-Koordinaten extrahiert und von Duplikaten befreit. Als Format für diese wird
das SequenceFileFormat genutzt, so dass als Schlüssel die Zeilennummer und als Wert
der Punkt in Form eines zweidimensionalen Vektors verwendet wird. Die Eingabedaten
besitzen die folgenden Charakteristika:
• DE - Deutschland, 7.949.433 Punkte (248 MB)
• EU - Europa, 64.684.856 Punkte (2,3 GB)
5
http://download.geofabrik.de/
109
5.2.1
Auswirkungen der Blockgröße und Knotenanzahl auf die
Laufzeit am Beispiel des MR-kMeans-Algorithmus
In diesem Versuch wird untersucht, wie sich eine variable Blockgröße für im HDFS
gespeicherten Dateien sowie eine verschiedene Anzahl von Rechenknoten auf die Laufzeit
der MapReduce-Implementierung auswirkt. Für diese Betrachtung wird explizit der MRkMeans-Algorithmus genutzt, da er so gut wie keine sequentiellen Komponenten enthält
und somit ideal für einen Vergleich verschiedener Knotenzahlen ist. Getestet werden zum
einen die Laufzeiten, wenn nur bestimmte Knoten des Clusters verwendet werden sowie
die Auswirkungen auf die Laufzeit bei Blockgrößen von 32 MB, 64 MB und 128 MB.
Für den Versuch werden die Straßendaten von Europa (Testdatensatz EU) verwendet.
Blockgröße
32 MB
64 MB
128 MB
1 Knoten
16:02 min
13:35 min
12:18 min
2 Knoten
06:41 min
04:53 min
05:16 min
3 Knoten
02:39 min
02:21 min
02:23 min
Tabelle 5.13: Ergebnisse der Versuche mit dem MR-kMeans-Algorithmus für variable
Knotenzahl und Blockgröße
Die Ergebnisse in Tabelle 5.13 zeigen hinsichtlich der Blockgröße, dass diese nur bei
der Verwendung von einem Rechenknoten gravierende Änderungen bewirken, bzw. bei
zwei Rechenknoten nur beim Sprung von 32 MB auf 64 MB eine signifikante Laufzeitverbesserung bewirken. Bei der Verwendung von drei Rechenknoten ist bereits keine
Unterscheidung der Laufzeit bei verschiedenen Blockgrößen mehr möglich. Die starken
Unterschiede der Laufzeit bei unterschiedlichen Blockgrößen bei einem Knoten erklären
sich durch die Tatsache, dass für jeden Teil der Eingabe ein Map-Tasks erzeugt wird.
Eine geringe Blockgröße bedeutet folglich, dass eine größere Anzahl von Map-Tasks generiert wird, was dann zu einem größeren Overhead führt für das Starten und Beenden
von JVMs, die die Tasks ausführen.
In Bezug auf die Laufzeit bei Verwendung einer verschiedenen Anzahl von Rechenknoten
lässt sich unabhängig von der Blockgröße festhalten, dass der Speed-Up annähernd linear
ist; eine Verdoppelung der Rechenkapazität führt grob zu einer Halbierung der Laufzeit,
was anschaulich in Abbildung 5.3 zu sehen ist.
5.2.2
Einfluss der Parameterwahl und Knotenzahl auf den MRDBSCAN-Algorithmus
In diesem Abschnitt wird eine experimentelle Untersuchung des MR-DBSCAN-Algorithmus durchgeführt. Hierbei stehen die drei Parameter zur Steuerung der maximalen
110
Abbildung 5.3: Einfluss verschiedener Blockgrößen und Knotenzahlen auf die Laufzeit
Partitionsgröße, der Nachbarschaftsgröße sowie des für Kernpunkte notwendigen Nachbarschaftsumfangs im Mittelpunkt. Die Untersuchung wird in drei verschiedene Versuche
aufgeteilt. Im ersten Versuch wird der Einfluss der Partitionsgröße auf die Laufzeit des
Algorithmus untersucht. Im zweiten Versuch wird betrachtet, welche Auswirkungen die
Wahl der Nachbarschaftsgröße sowie die Anzahl der für Kernpunkte notwendigen Punktzahl sich auf die Laufzeit auswirken. Eine Untersuchung des Speed-Up wird in einem
dritten Versuch vorgenommen. Hierbei wird bei einer festen Parameterwahl und variabler Knotenzahl untersucht, wie sich eine unterschiedliche Anzahl von Rechenknoten auf
die Gesamtlaufzeit auswirkt. Die Versuche werden auf drei Rechenknoten ausgeführt.
Map- und Reduce-Tasks stehen 1GB Speicher pro Tasks zur Verfügung, wovon 768MB
für den Heap der JVM, die den Task ausführt, bereitgestellt werden. Verwendet wird
der Datensatz DE mit den Straßendaten von OpenStreetMap für Deutschland.
Partitionsgröße
In einem ersten Test mit des MR-DBSCAN-Algorithmus soll die Auswirkung der Partitionsgröße auf die Laufzeit des Algorithmus getestet werden. Der Test wird mit 3
Rechenknoten auf dem DE-Datensatz ausgeführt für minP ts = 350 und ε = 1500, wobei für jeden Task 1GB Speicher bereitgestellt wird, von welchem 768MB als Heap für
die ausführende Java-VM verwendet werden.
111
Abbildung 5.4: Laufzeit in den einzelnen Phasen des MR-DBSCAN-Algorithmus für den
DE-Datensatz
Was auf den ersten Blick in den Ergebnissen aus Abbildung 5.4 ins Auge fällt sind die
starken Ausreißer für das Clustering und das Relabeling der Daten für eine maximale
Partitionsgröße von 5000 Punkten. Es ist klar, dass bei einer derart geringen Anzahl von
Punkten pro Partition eine große Anzahl von Partitionen gebildet wird. Für jede von
diesen wird ein Reduce-Tasks erzeugt, so dass die schiere Anzahl von abzuarbeitenden
Tasks und dem daraus resultierenden Overhead für die lange Laufzeit verantwortlich
sind. Die Ursache für die lange Laufzeit des Relabelings sind der Tatsache geschuldet,
dass es durch die große Anzahl von Partitionen eine entsprechende hohe Anzahl von aneinander angrenzenden Paaren von Partitionen gibt. Folglich existiert eine große Anzahl
von Punkten in Überlappungsbereichen, so dass für eine größere Anzahl von Punkten
mindestens zwei verschiedene Klassifikationen und Clusteridentifikatoren vorliegen, die
vereinheitlicht müssen. Zusätzlich ist in den Ergebnissen gut zu sehen, dass bereits ab
einer Partitionsgröße von 100.000-250.000 Punkten für den verwendeten Testdatensatz
eine Reduzierung der Laufzeit mehr festzustellen ist.
Abbildung 5.5 zeigt die Laufzeit der einzelnen Testläufe noch einmal reduziert auf die
Gesamtlaufzeit. Es zeigt sich, dass die Laufzeit des Algorithmus für den gewählten Datensatz bis zu einer Partitionsgröße von 250.000 Punkten mit jeder Verdoppelung der
Größe etwa um ein Drittel reduziert wird.
112
Abbildung 5.5: Gesamtlaufzeit des MR-DBSCAN-Algorithmus für den DE-Datensatz
mit 3 Knoten, minP ts = 50, ε = 250
Anzahl der Rechenknoten
In diesem Versuch wird die Laufzeit des MR-DBSCAN-Algorithmus untersucht, wenn
eine unterschiedliche Anzahl von Rechenknoten für die Berechnung verwendet. Als Parameter wurden hier eps = 1500 und minP ts = 350 und eine Partitionsgröße von 250.000
Punkten verwendet.
Die Betrachtung der Gesamtlaufzeit (Abbildung 5.6) zeigt, das das Verfahren im Rahmen
dieses Versuches einen linearen Speed-Up besitzt; eine Verdoppelung der Rechenleistung
bedeutet also eine Halbierung der Laufzeit.
Eine genauere Betrachtung der Laufzeit (Abbildung 5.7) der einzelnen Phasen des Algorithmus zeigt, dass besonders das Clustering und das Relabeling von einer Erhöhung der
Knotenzahl profitieren. Bei diesen führt eine Verdopplung der Rechenleistung hier annähernd zu einer Halbierung der Laufzeit. Wenig Einfluss hat die Knotenzahl hingegen auf
die Berechnung des MER des Datensatzes, der Partitionierung und der Vereinigungsabbildung. Für die Berechnung des MERs ist eine signifikante Auswirkung erst zu erwarten,
wenn die zu verarbeitende Datenmenge stark ansteigt, so dass das parallele Lesen der
Eingabedaten davon profitiert. Ähnliches gilt für die Partitionierung, da die Berechnung
113
Abbildung 5.6: Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Knotenzahl
der Teilergebnisse zur Bestimmung eines Rasters mit der Punktdichteverteilung des Datensatzes so auf mehrere Rechenknoten verteilt werden kann. Nicht betroffen hiervon ist
jedoch die eigentliche Berechnung der Partitionen, die rein sequentiell auf einem Knoten ausgeführt; hier ist für eine Erhöhung der Knotenzahl keine Laufzeitreduktion zu
erwarten. Für das Erzeugung der Vereinigungsabbildung ist nur bei einer großen Menge von Punkten in Überlappungsbereichen mit einer Beschleunigung der Map-Phase zu
rechnen, in dessen Folge mehrere Map-Tasks für die Berechnung der Punkt-Paare aus
überlappenden Partitionsrändern erzeugt werden. Die Reduce-Phase ist hiervon unbeeinflusst, da die Erzeugung der eigentlichen Abbildung sequentiell in einem Reduce-Task
ausgeführt wird.
Variation der Nachbarschaftsgröße
In diesem Versuch wird untersucht, wie sich der Algorithmus verhält, wenn die Größe
der Nachbarschaft variiert. Verwendet wurden 3 Rechenknoten, die Partitionsgröße liegt
bei 250.000 Punkten und die Kernpunktklassifikation beträgt 350.
Die Betrachtung der Gesamtlaufzeiten in Abbildung 5.8 zeigt recht anschaulich, dass
Nachbarschaftsgrößen von weniger als 500 zu einer massiven Verlängerung der Laufzeit
führen. Die zentrale Ursache hierfür liegt in der Tatsache, dass der gesamte Datensatz
in Raster unterteilt wird, in der jede Rasterzelle die Kantenlänge ε, also die Nachbarschaftsgröße, besitzt. Eine kleine Nachbarschaft bedeutet also zwangsläufig, dass eine
Rasterisierung mit vielen Zellen entstehen. Hieraus entstehen gravierende Folgen für die
Berechnung der Partitionierung. Dies bedeutet zum einen, dass für die binäre Rechteckaufteilung viele Schnittlinien und folglich viele verschiedene Varianten von Rechteck-
114
Abbildung 5.7: Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler
Knotenzahl
Unterteilungen in Frage kommen, die zu testen sind. Zum anderen bedeutet dies, dass
für die Kostenberechnung der Rechtecke eine große Menge von Zellen betrachtet werden
muss, die von den resultierenden Rechtecken überdeckt werden.
Festzuhalten ist weiterhin, dass für eine Nachbarschaftsgröße von 1000 oder 2000 in der
Gesamtlaufzeit keine Unterschiede vorhanden sind. Eine Betrachtung der aufgeschlüsselten Laufzeiten (Abbildung 5.9) der einzelnen Phasen zeigt jedoch, dass eine Verdoppelung der Nachbarschaftsgröße von 1000 auf 2000 zu einer Verdoppelung der Laufzeit
des Clusterings führt.
Diese wird jedoch ausgeglichen durch eine starke Reduktion der Laufzeit der Partitionierung, da durch die größere Nachbarschaft weniger Rasterzellen und folglich weniger
Schnittlinien entstehen, die für die binäre Rechteckunterteilung betrachtet werden müssen.
Gleichzeitig bedeutet eine größere Nachbarschaft auch, dass die Überlappungsbereiche
der Partitionen größer werden. In der Folge gibt es mehr Punktpaare in diesen Überlappungsbereichen, die für die Berechnung der Vereinigungsabbildung verarbeitet werden
müssen. Dies hat ebenfalls zur Folge, dass für die Umbennung mehr Punkte verarbeitet
115
Abbildung 5.8: Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Nachbarschaftsgröße
werden müssen, für die mehr als ein Paar von Klassifikation und Cluster-Identifikator
existiert.
Variation der Kernpunktklassifikation
In diesem Versuch wird untersucht, wie sich die Anzahl der Punkte, die für eine Klassifikation als Kernpunkt notwendig sind, auf die Laufzeit des Algorithmus auswirkt.
Ein Blick auf die Gesamtlaufzeit (Abbildung 5.10) zeigt, dass die Kernpunktklassifikation
keine messbaren Einfluss auf die Gesamtlaufzeit des Algorithmus hat.
Wird jedoch die detaillierte Laufzeit der einzelnen Phasen (Abbildung 5.11) betrachtet,
so zeigt sich, dass die Laufzeit des Clusterings bei einer größeren Anzahl notwendiger
Kernpunkte etwas ansteigt. Außerdem bedeutet dies auch, das weniger Punkte in den
Randbereichen als Kernpunkte klassifiziert werden, so dass sich gleichzeitig die Laufzeit
zur Berechnung der Vereinigungsabbildung reduziert, da weniger Punkte in den überlappenden Partitionsbereichen betrachtet werden müssen.
116
Abbildung 5.9: Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler
Nachbarschaftsgröße
Ein Beispiel für ein Clustering des DE-Datensatzes mit MapReduce-DBSCAN-Algorithmus ist in Abbildung 5.12 dargestellt. Dieses wurde mit einer Kernpunktklassifikation
von 350 Punkten, einer Nachbarschaftsgröße von 1500 und einer Partitionsgröße von
250.000 Punkten durchgeführt.
5.2.3
MapReduce-kMeans und MapReduce-DBSCAN im Vergleich
In diesem Versuch werden die beiden Cluster-Algorithmen direkt gegenübergestellt. Diese werden auf dem DE und EU Datensatz ausgeführt. Für den MR-kMeans-Algorithmus
werden 50 Zentroide zufällig aus der Datenmenge ausgewählt und eine Iteration ausgeführt. Für den MR-DBSCAN-Algorithmus wird eine Nachbarschaftsgröße von 3.000,
eine Kernpunktklassifikation von 1.000 und eine Partitionsgröße von 750.000 Punkten
verwendet. Die Hadoop-Konfiguration wurde für diesen Versuch dahingehend verändert,
dass jedem Container 1,5 GB Speicher zu Verfügung stehen, von denen 1,2 GB von der
JVM als Heap genutzt werden können.
Die Ergebnisse in Tabelle zeigen recht deutlich, dass der MR-kMeans-Algorithmus eine
Iteration bedeutend schneller ausführt als der MR-DBSCAN-Algorithmus. Trotz einer
etwa 8-mal größeren Punktmenge steigt die Laufzeit nicht im selben Umfang an. Die
Laufzeit des MR-DBSCAN-Algorithmus steigt um etwa das 18-fache.
117
Abbildung 5.10: Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Kernpunktklassifikation
MR-DBSCAN
MR-kMeans
Europa
03:14:17
00:02:12
Deutschland
00:11:15
00:00:47
Tabelle 5.14: Laufzeitvergleich MR-DBCSCAN und MR-kMeans
MBR
Partitionierung
Clustering
Merge-Mapping
Umbenennung
Gesamtzeit
DE
18s
20s
9m 7s
30s
1m
11m 15s
EU
1m 4s
2h 58m 3s
5m 9s
2m 21s
7m 40s
3h 14m 17s
Tabelle 5.15: Einzellaufzeiten des MR-DBSCAN-Algorithmus
Eine Betrachtung der Laufzeiten der einzelnen Phasen (Tabelle 5.15) zeigt, dass die
Partitionierung für den massiven Laufzeitanstieg verantwortlich ist. Hier treten die selben Effekte auf, die bereits in den Tests des MR-DBSCAN-Algorithmus auf dem DEDatensatz mit verschiedenen Nachbarschaftsgrößen in Abschnitt 5.2.2 beobachtet wurden. In diesem Fall treten die Effekte jedoch auch bei großen Nachbarschaften wegen
118
Abbildung 5.11: Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler Kernpunktklassifikation
der großen räumlichen Ausdehnung der Daten auf. Die Ursache hierfür liegt in der Verrasterung des minimal-einschließenden Rechtecks des Datensatzes, welches zu einer sehr
großen Anzahl von Rasterzellen mit einer Kantenlänge, die dem Radius der Nachbarschaftsgröße entspricht, führt. Entsprechend groß ist folglich die Anzahl der Zellen, die
für jede Kostenberechnung eines Rechtecks während der Partitionierung betrachtet werden muss. Ein interessanter Effekt ist weiterhin, dass die Laufzeit des Clusterings für
den DE-Datensatz fast doppelt so lange dauert wie für den EU-Datensatz.
Fazit
Auf den ersten Blick erscheint es, als sei der MapReduce-kMeans-Algorithmus für das
Clustering räumlicher Daten die deutlich bessere Lösung. Nicht vergessen werden darf
jedoch, dass in der Regel mehrere Iteration notwendig sind, bis die finalen Clusterschwerpunkte gefunden worden sind. Zusätzlich sind gegebenenfalls mehrere Anwendungen des
Verfahrens oder eine Vorabanalyse der Daten notwendig, um eine passende Anzahl sowie Position initialer Clusterschwerpunkte zu bestimmen. Die reine Reduktion auf die
Laufzeit einer Iteration ist folglich nicht geeignet, um ein Urteil über die grundsätzliche Tauglichkeit oder Untauglichkeit eines Verfahrens zu treffen, zumal die Wahl des
Verfahrens maßgeblich von der Aufgabenstellung bestimmt wird.
119
Abbildung 5.12: Visualisierung des Clusterings für die Versuche in 5.2.2
120
Kapitel 6
Fazit
Die Versuche, die in Kapitel 5 durchgeführt wurden, zeigen, dass das MapReduceParadigma nicht für jedes Problem das Mittel der Wahl und nicht jeder Algorithmus
geeignet ist. Verfahren wie der Partition-Algorithmus, die bereits bei sequentieller Ausführung mit kleinen Datenmengen Probleme habe, lassen sich auch durch MapReduceImplementierungen nicht effizient nutzen.
Grenzwertig sind Verfahren wie der MapReduce-DBSCAN-Algorithmus, welche innerhalb der Prozessausführung sequentielle Phasen durchlaufen. So wird der Algorithmus
durch die sequentielle Berechnung einer adaptiven Partitionierung des Datensatzes für
Datensätze mit einer großen räumlichen Ausdehnung massiv verlangsamt. Wie bereits
in Abschnitt 5.2.3 dargelegt, ist eine gezielte Optimierung von Engpässen notwendig,
um eine Ausführung in akzeptabler Zeit zu gewährleisten. Dieses gilt hier insbesondere
vor dem Hintergrund, dass bestimmte Problemstellungen möglicherweise einen kleinen
Nachbarschaftsradius erfordern; auf einem Datensatz mit großer räumlicher Ausdehnung
könnte dies zur Folge, dass eine Berechnung der Partitionierung eventuell gar nicht möglich ist, da der Speicherbedarf für die Repräsentation des Rasters bereits den verfügbaren
Speicher überschreitet. Folglich besteht an dieser Stelle Optimierungsbedarf. Vorstellbar
ist hierfür eine Entkopplung der Rasterzellengröße vom Radius der Nachbarschaftsanfragen, so dass die Größe der Rasterzellen separat definiert wird.
Ganz anders sieht es hingegen beim MapReduce-kMeans- und Parallel FP-GrowthAlgorithmus aus. Diese kommen weitestgehend ohne sequentielle Phasen aus und profitieren somit von der Parallelisierungsmöglichkeiten, die sich aus dem MapReduceParadigma ergibt.
Bei den Versuchen hat sich ebenfalls gezeigt, dass die Konfiguration der RessourcenNutzung aufwändig sein kann. Es ist gegebenenfalls eine Justierung der Speicherkonfiguration in Try-and-Error-Manier notwendig, bis die Ausführung einer MapReduceAnwendung gelingt, ohne dass einzelne Tasks wegen Speichermangels von der ausführen-
121
den JVM terminiert werden. Dieses tritt insbesondere verstärkt in Erscheinung, wenn
Anwendungsparameter wie die Nachbarschafts- oder Partitionsgröße des MapReduceDBSCAN-Algorithmus starken Einfluss auf den Speicherbedarf der einzelnen Jobs einer
Anwendung haben.
Zusätzlich hat sich beim Aufsetzen des Hadoop-Clusters das geringe Alter und die hohe
momentane Entwicklungsgeschwindigkeit von Hadoop bemerkbar gemacht. Die offizielle
Dokumentation für aktuelle Versionen ist teilweise sehr dürftig, während sich inoffizielle
Dokumentation beispielsweise in Blogs oftmals auf veraltete Versionen bezieht nicht ohne
weiteres auf neue Versionen übertragbar.
Zusammenfassend lässt sich sagen, dass das MapReduce-Paradigma eine sehr gute Grundlage für die Parallelisierung von Data Mining-Algorithmen ist. Dieses liegt insbesondere
in der Tatsache begründet, dass für die Implementierung von Verfahren keinerlei Kenntnisse über parallele Softwareentwicklung vorhanden sein muss und somit während der
Entwicklung der Aufwand zur Fehlersuche von beispielsweise Race-Conditions und ähnlichen Problemen, die in parallelen Programmen auftreten können, wegfällt. Zusätzlich
ist die Erweiterung eines Hadoop-Clusters um zusätzliche Rechenknoten sehr einfach; es
muss im wesentlichen nur Hadoop und die Knotenkonfiguration auf eine neue Maschine
kopiert werden. Im Gegenzug sind natürlich tiefgehende Kenntnisse über die Funktionsweise von Hadoop, bzw. der jeweils verwendeten MapReduce-Implementierung, notwendig, um effiziente Programme zu schreiben, die alle Möglichkeiten zur Parallelisierung
ausreizen.
Zwar ist das MapReduce-Paradigma und die zugehörigen Implementierungen noch verhältnismäßig jung, jedoch ist zu erwarten, dass dieses in Zukunft einen festen Platz einnehmen wird, wenn es um massiv-parallele Datenanalyse mittels Data Mining-Algorithmen
geht.
122
Abbildungsverzeichnis
2.1
Der KDD-Prozess . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
10
2.2
Beispiel eines Frequent-Pattern Baums . . . . . . . . . . . . . . . . . . .
17
2.3
Konstruktion eines FP-Baumes . . . . . . . . . . . . . . . . . . . . . . .
19
2.4
Beispiel für die Generierung von Itemsets aus einem FP-Baum . . . . . .
21
2.5
Hierarchisches Clustering - Beispiel . . . . . . . . . . . . . . . . . . . . .
23
2.6
beispielhafte Darstellung des k-Means Algorithmus . . . . . . . . . . . .
25
2.7
-Nachbarschaft . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
26
2.8
DBSCAN - Beispiel . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
29
2.9
MapReduce-Beispiel . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
34
2.10 Der MapReduce-Prozess . . . . . . . . . . . . . . . . . . . . . . . . . . .
35
2.11 Ablauf der Ausführung einer MapReduce-Anwendung mit dem YARNFramework [Whi12] . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
39
3.1
Ablaufdiagramm des PFP-Algorithmus (nach [LWZ+ 08]) . . . . . . . . .
51
3.2
Die 90-10-Regel im hierarchischen Clustering . . . . . . . . . . . . . . . .
56
3.3
POP-Algorithmus - Überlappende Partitionen . . . . . . . . . . . . . . .
57
3.4
Beispielhafte Abbildung der Phasen des DSDBSCAN-Algorithmus (nach
[PPA+ 12]) . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
62
3.5
Ablauf des MR-DBSCAN-Algorithmus [HTL+ 14] . . . . . . . . . . . . .
66
3.6
Beispiel für kostenbasierte Partitionierung [HTL+ 14]
. . . . . . . . . . .
69
3.7
Beispiel einer Partitionierung mit ε-Randbereichen [HTL+ 14] . . . . . . .
71
3.8
Beispiel einer minimal vollständigen Partitionierungsmenge [HTL+ 14] . .
72
3.9
Beispiel eines Vereinigungs-Mappings . . . . . . . . . . . . . . . . . . . .
77
3.10 Graph-Repräsentation des Vereinigungs-Mappings . . . . . . . . . . . . .
78
123
3.11 Beispiel für den Ablauf der Vereinigung sechs lokaler Dendrogramme zu
einem globalen Dendrogramm (nach [HPA+ 12]) . . . . . . . . . . . . . .
80
3.12 SHRINK-Algorithmus - Beispiel . . . . . . . . . . . . . . . . . . . . . . .
84
3.13 MapReduce k-Means - Datenfluss . . . . . . . . . . . . . . . . . . . . . .
85
4.1
Ablauf der MapReduce-Implementierung des Partition-Algorithmus . . .
91
4.2
Ablauf der Implementierung des MR-DBSCAN-Algorithmus . . . . . . .
94
5.1
Laufzeit PFP-Algorithmus (Gruppen: 10, Support: 10, K: 50)
5.2
Laufzeiten der einzelnen Teile des PFP-Algorithmus . . . . . . . . . . . . 107
5.3
Einfluss verschiedener Blockgrößen und Knotenzahlen auf die Laufzeit . . 111
5.4
Laufzeit in den einzelnen Phasen des MR-DBSCAN-Algorithmus für den
DE-Datensatz . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 112
5.5
Gesamtlaufzeit des MR-DBSCAN-Algorithmus für den DE-Datensatz mit
3 Knoten, minP ts = 50, ε = 250 . . . . . . . . . . . . . . . . . . . . . . 113
5.6
Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Knotenzahl . 114
5.7
Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler
Knotenzahl . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 115
5.8
Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Nachbarschaftsgröße . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 116
5.9
Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler
Nachbarschaftsgröße . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 117
. . . . . . 106
5.10 Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Kernpunktklassifikation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 118
5.11 Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler
Kernpunktklassifikation . . . . . . . . . . . . . . . . . . . . . . . . . . . . 119
5.12 Visualisierung des Clusterings für die Versuche in 5.2.2 . . . . . . . . . . 120
124
Tabellenverzeichnis
3.1
Übersicht und Notation der verschiedenen Itemset-Mengen . . . . . . . .
44
5.1
Beispiel-Transaktionsdatenbank . . . . . . . . . . . . . . . . . . . . . . . 101
5.2
Häufige 1-Itemsets . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 102
5.3
Häufige 2-Itemsets . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 102
5.4
3-Itemset Kandidaten . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 102
5.5
Gruppenabhängige Shards . . . . . . . . . . . . . . . . . . . . . . . . . . 103
5.6
Anzahl gefundener Itemsets . . . . . . . . . . . . . . . . . . . . . . . . . 103
5.7
Testresultate für den MR-Partition-Algorithmus . . . . . . . . . . . . . . 104
5.8
Laufzeit bei variabler Gruppenzahl (minSup = 10, K = 500) . . . . . . . 106
5.9
Laufzeit bei unterschiedlichem minimalen Support (100 Gruppen, K = 500)107
5.10 Laufzeit bei einer verschiedenen Anzahl auszugebener frequent Itemsets
pro Item (100 Gruppen, minSup = 10) . . . . . . . . . . . . . . . . . . . 108
5.11 Anzahl gefundener frequent Itemsets für T10I4D100K.dat . . . . . . . . . 108
5.12 PFP-Algorithmus - Test mit verschiedenen Datensätzen . . . . . . . . . . 109
5.13 Ergebnisse der Versuche mit dem MR-kMeans-Algorithmus für variable
Knotenzahl und Blockgröße . . . . . . . . . . . . . . . . . . . . . . . . . 110
5.14 Laufzeitvergleich MR-DBCSCAN und MR-kMeans . . . . . . . . . . . . 118
5.15 Einzellaufzeiten des MR-DBSCAN-Algorithmus . . . . . . . . . . . . . . 118
125
Literaturverzeichnis
[AHU83] A. V. Aho, J. E. Hopcroft, J. D. Ullman. Data Structures And Algorithms.
Addison-Wesley Publishing Company, 1983. ISBN 0-201-00023-7.
[AS94]
R. Agrawal, R. Srikant. Fast Algorithms for Mining Association Rules in
Large Databases. In J. B. Bocca, M. Jarke, C. Zaniolo, Hg., Proc. VLDB.
Morgan Kaufmann, 1994. ISBN 1-55860-153-8, 487–499.
[BCC10]
E. Baralis, T. Cerquitelli, S. Chiusano. A persistent HY-Tree to efficiently support itemset mining on large datasets. In S. Y. Shin, S. Ossowski,
M. Schumacher, M. J. Palakal, C.-C. Hung, Hg., SAC. ACM, 2010. ISBN
978-1-60558-639-7, 1060–1064.
[BCCG13] E. Baralis, T. Cerquitelli, S. Chiusano, A. Grand. P-Mine: Parallel itemset
mining on large datasets. In C. Y. Chan, J. Lu, K. Nørvåg, E. Tanin, Hg.,
ICDE Workshops. IEEE Computer Society, 2013. ISBN 978-1-4673-5303-8,
266–271.
[Bre13]
J. Brehm. Parallelrechner. Vorlesungsfolien, 2013.
[DG04]
J. Dean, S. Ghemawat. MapReduce: Simplified Data Processing on Large
Clusters. In Proc. OSDI. USENIX Association, 2004, 137–150.
[DL01]
M. Dash, H. Liu. Efficient Hierarchical Clustering Algorithms Using Partially
Overlapping Partitions. In D. W.-L. Cheung, G. J. Williams, Q. Li, Hg.,
PAKDD, Bd. 2035 von Lecture Notes in Computer Science. Springer, 2001.
ISBN 3-540-41910-1, 495–506.
[DPS07]
M. Dash, S. Petrutiu, P. Scheuermann. pPOP: Fast yet accurate parallel
hierarchical clustering using partitioning. Data Knowl. Eng., 61(3), 2007,
563–578.
[EKSX96] M. Ester, H.-P. Kriegel, J. Sander, X. Xu. A Density-Based Algorithm for
Discovering Clusters in Large Spatial Databases with Noise. In E. Simoudis,
J. Han, U. M. Fayyad, Hg., Proc. KDD. AAAI Press, 1996. ISBN 1-57735004-9, 226–231.
126
[ES00]
M. Ester, J. Sander. Knowledge Discovery in Databases. Springer-Verlag,
2000. ISBN 3-540-67328-8.
[HKP11]
J. Han, M. Kamber, J. Pei. Data Mining - Concepts and Techniques. Morgan
Kaufmann, 2011.
[HPA+ 12] W. Hendrix, M. M. A. Patwary, A. Agrawal, W. Liao, A. N. Choudhary.
Parallel hierarchical clustering on shared memory platforms. In Proc. HiPC.
IEEE, 2012. ISBN 978-1-4673-2372-7, 1–9.
[HPY00]
J. Han, J. Pei, Y. Yin. Mining Frequent Patterns without Candidate Generation. In W. Chen, J. F. Naughton, P. A. Bernstein, Hg., SIGMOD Conference.
ACM, 2000. ISBN 1-58113-218-2, 1–12.
[HTL+ 14] Y. He, H. Tan, W. Luo, S. Feng, J. Fan. MR-DBSCAN: a scalable
MapReduce-based DBSCAN algorithm for heavily skewed data. Frontiers
of Computer Science, 8(1), 2014, 83–99.
[KPP03]
W. A. Kosters, W. Pijls, V. Popova. Complexity Analysis of Depth First and
FP-Growth Implementations of APRIORI. In P. Perner, A. Rosenfeld, Hg.,
MLDM, Bd. 2734 von Lecture Notes in Computer Science. Springer, 2003.
ISBN 3-540-40504-6, 284–292.
[LWZ+ 08] H. Li, Y. Wang, D. Zhang, M. Zhang, E. Y. Chang. Pfp: parallel fp-growth
for query recommendation. In P. Pu, D. G. Bridge, B. Mobasher, F. Ricci,
Hg., Proc. RecSys. ACM, 2008. ISBN 978-1-60558-093-7, 107–114.
[Mei13]
A. Meier. Efficient Algorithms. Vorlesungsskript, 2013.
[OH11]
R. O. Obe, L. S. Hsu. PostGIS In Action. Manning Publications Co., 2011.
[PPA+ 12] M. A. Patwary, D. Palsetia, A. Agrawal, W.-k. Liao, F. Manne, A. Choudhary. A New Scalable Parallel DBSCAN Algorithm Using the Disjoint-set
Data Structure. In In Proceedings of the International Conference on High
Performance Computing, Networking, Storage and Analysis, SC ’12. IEEE
Computer Society Press, Los Alamitos, CA, USA, 2012. ISBN 978-1-46730804-5, 62:1–62:11.
[RDFU12] M. Riondato, J. A. DeBrabant, R. Fonseca, E. Upfal. PARMA: A Parallel Randomized Algorithm for Approximate Association Rules Mining in
MapReduce. In Proceedings of the 21st ACM International Conference on
Information and Knowledge Management, CIKM ’12. ACM, New York, NY,
USA, 2012. ISBN 978-1-4503-1156-4, 85–94.
[RU11]
A. Rajaraman, J. D. Ullman. Mining of Massive Datasets. Cambridge University Press, 2011.
127
[Sib73]
R. Sibson. SLINK: An Optimally Efficient Algorithm for the Single-Link
Cluster Method. Comput. J., 16(1), 1973, 30–34.
[SON95]
A. Savasere, E. Omiecinski, S. B. Navathe. An Efficient Algorithm for Mining
Association Rules in Large Databases. In U. Dayal, P. M. D. Gray, S. Nishio,
Hg., Proc. VLDB. Morgan Kaufmann, 1995. ISBN 1-55860-379-4, 432–444.
[TSK05]
P. Tan, M. Steinbach, V. Kumar. Introduction to Data Mining. AddisonWesley, 2005.
[WD11]
S. Wang, H. Dutta. PARABLE: A PArallel RAndom-partition Based HierarchicaL ClustEring Algorithm for the MapReduce Framework. Techn. Ber.,
Columbia University, 2011.
[Whi12]
T. White. Hadoop - The Definitive Guide. O’Reilly, 2012.
[XJK99]
X. Xu, J. Jäger, H.-P. Kriegel. A Fast Parallel Clustering Algorithm for Large
Spatial Databases. Data Min. Knowl. Discov., 3(3), 1999, 263–290.
[ZEHL01] O. R. Zaı̈ane, M. El-Hajj, P. Lu. Fast Parallel Association Rule Mining
without Candidacy Generation. In N. Cercone, T. Y. Lin, X. Wu, Hg., ICDM.
IEEE Computer Society, 2001. ISBN 0-7695-1119-8, 665–668.
[ZMH09] W. Zhao, H. Ma, Q. He. Parallel K-Means Clustering Based on MapReduce. In Proceedings of the 1st International Conference on Cloud Computing,
CloudCom ’09. Springer-Verlag, Berlin, Heidelberg, 2009. ISBN 978-3-64210664-4, 674–679.
128
Erklärung
Hiermit versichere ich, dass ich die vorliegende Arbeit und die zugehörige Implementierung selbstständig verfasst und dabei nur die angegebeben Quellen und Hilfsmittel
verwendet habe.
Hannover, 24.04.2014
Oliver Pabst
129
Zugehörige Unterlagen
Herunterladen