Fakultät für Elektrotechnik und Informatik Institut für Praktische Informatik Fachgebiet Datenbanken und Informationssysteme Parallelisierung von Data Mining Algorithmen Masterarbeit im Studiengang Informatik Oliver Pabst Matrikelnummer: 2297090 Prüfer: Prof. Dr. Udo Lipeck Zweitprüfer: Dr. Hans Hermann Brüggemann Betreuer: Prof. Dr. Udo Lipeck 13. Mai 2014 Zusammenfassung In der heutigen Gesellschaft fallen in vielen Bereichen (Onlinehandel, Web-Mining für Suchmaschinen, soziale Netzwerke, etc.) immer größere Datenmengen an, in denen potentiell nützliches Wissen enthalten sein kann. Diese bewegen sich in vielen Fällen bereits im Bereich von Petabytes. Traditionelle sequentielle Algorithmen zur Assoziations- und Clusteranalyse stoßen hier schon lange an ihre Grenzen, da eine effiziente Analyse der anfallenden Daten mit diesen Verfahren nicht möglich ist. Um diesem Problem beizukommen ist eine Parallelisierung der Problemlösung unumgänglich, wenn keine grundsätzlich effizienten Algorithmen bekannt sind. Im ersten Teil dieser Arbeit wird eine Auswahl paralleler Data Mining-Algorithmen vorgestellt, die aus dem Bereich der Assoziations- und Clusteranalyse stammen. Grundlage für diese Verfahren sind Parallelrechnerarchitekturen mit gemeinsam-globalem Speicher oder Kommunikation über Nachrichten für Informationsaustausch sowie das relativ neue MapReduce-Paradigma. Der zweite Teil der Arbeit beginnt zunächst mit der Vorstellung der Implementierung von vier parallelen Algorithmen, jeweils zwei aus dem Bereich der Assoziations- und Clusteranalyse. Der Fokus der Implementierungen und der nachfolgenden Versuche liegt hierbei auf dem MapReduce-Paradigma. In den Versuchen soll die Tauglichkeit für große Datenmengen, soweit es im Rahmen dieser Arbeit und der vorhandenen Möglichkeiten machbar ist, untersucht werden sowie der Einfluss der Steuerparameter auf die Algorithmen betrachtet werden. Inhaltsverzeichnis 1 Einleitung 7 1.1 Motivation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 7 1.2 Überblick . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 8 2 Grundlagen 9 2.1 Knowledge Discovery in Databases . . . . . . . . . . . . . . . . . . . . . 9 2.2 Assoziationsanalyse . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 12 2.2.1 Der Apriori-Algorithmus . . . . . . . . . . . . . . . . . . . . . . . 13 2.2.2 Frequent Pattern Growth . . . . . . . . . . . . . . . . . . . . . . 15 Clusteranalyse . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 22 2.3.1 Hierarchisches Clustering . . . . . . . . . . . . . . . . . . . . . . . 22 2.3.2 Partitionierendes Clustering . . . . . . . . . . . . . . . . . . . . . 24 2.4 Die UNION-Find-Struktur . . . . . . . . . . . . . . . . . . . . . . . . . . 30 2.5 Parallelrechner . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 32 2.6 MapReduce . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 33 2.6.1 Programmiermodell . . . . . . . . . . . . . . . . . . . . . . . . . . 33 2.6.2 Der Map-Reduce Prozess . . . . . . . . . . . . . . . . . . . . . . . 35 2.6.3 Hadoop . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 36 2.6.4 Ein- und Ausgabeformate . . . . . . . . . . . . . . . . . . . . . . 41 2.3 3 Parallele Algorithmen 3.1 43 Algorithmen zur Assoziationsanalyse . . . . . . . . . . . . . . . . . . . . 43 3.1.1 Partition-Algorithmus . . . . . . . . . . . . . . . . . . . . . . . . 43 3.1.2 Laufzeit . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 47 5 3.2 3.1.3 MapReduce-Partition . . . . . . . . . . . . . . . . . . . . . . . . . 48 3.1.4 MapReduce Frequent-Pattern-Growth (PFP) . . . . . . . . . . . . 50 3.1.5 Weitere Algorithmen . . . . . . . . . . . . . . . . . . . . . . . . . 55 Algorithmen zur Clusteranalyse . . . . . . . . . . . . . . . . . . . . . . . 56 3.2.1 Parallel Partially Overlapping Partitioning (pPOP) . . . . . . . . 56 3.2.2 Parallel Disjoint-Set DBSCAN (PDSDBSCAN) . . . . . . . . . . 60 3.2.3 MapReduce-DBSCAN . . . . . . . . . . . . . . . . . . . . . . . . 65 3.2.4 Shared-memory SLINK (SHRINK) . . . . . . . . . . . . . . . . . 79 3.2.5 MapReduce k-Means Clustering . . . . . . . . . . . . . . . . . . . 85 3.2.6 Weitere Algorithmen . . . . . . . . . . . . . . . . . . . . . . . . . 88 4 Implementierung 90 4.1 MapReduce-Partition . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 90 4.2 MapReduce-DBSCAN . . . . . . . . . . . . . . . . . . . . . . . . . . . . 93 4.2.1 97 MapReduce-kMeans und Parallel FP-Growth . . . . . . . . . . . . 5 Versuche 5.1 5.2 99 Assoziationsanalyse . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 100 5.1.1 Experimentelle Untersuchung der Korrektheit des Parallel FPGrowth-Algorithmus . . . . . . . . . . . . . . . . . . . . . . . . . 101 5.1.2 Untersuchung der Performance des Partition-Algorithmus . . . . . 104 5.1.3 Untersuchung der Performance des Parallel FP-Growth-Algorithmus105 5.1.4 Performancevergleich zwischen MR-Partitition und PFP-Growth . 108 Clusteranalyse . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 109 5.2.1 Auswirkungen der Blockgröße und Knotenanzahl auf die Laufzeit am Beispiel des MR-kMeans-Algorithmus . . . . . . . . . . . . . . 110 5.2.2 Einfluss der Parameterwahl und Knotenzahl auf den MR-DBSCANAlgorithmus . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 110 5.2.3 MapReduce-kMeans und MapReduce-DBSCAN im Vergleich . . . 117 6 Fazit 121 6 Kapitel 1 Einleitung 1.1 Motivation Das Erfassen von Daten in den verschiedensten Lebensbereichen und Umfeldern findet in der heutigen Zeit in immer größer werdenden Ausmaß statt. Um aus diesen gesammelten Daten potentiell nützliche Informationen zu extrahieren reichen traditionelle sequentielle Data Mining-Verfahren schon lange nicht mehr aus, da beispielsweise in hoch-frequentierten Bereichen des Internets, etwa Suchmaschinen oder Onlinehandel, Datenmengen anfallen, die bereits den Petabyte-Bereich erreicht haben. Um diese Datenmengen effizient analyisieren zu können wurden, im Wesentlichen im vergangenen Jahrzehnt, parallele Data Mining-Verfahren entwickelt. Diese basieren zum Großteil auf Parallelrechner-Architekturen, die als Grundlage einen gemeinsamen globalen Speicher oder Nachrichten für den Informationsaustausch verwenden. In den letzten Jahren ist im verstärkten Ausmaß das MapReduce-Paradigma als Grundlage neuer Verfahren hervorgetreten. Dieses bezeichnet sowohl ein Programmiermodell, nach dem Anwendungen entwickelt werden, als auch eine Software-Infrastruktur für massiv-parallele Berechnungen in großen verteilten Systemen. In dieser Arbeit wird die Parallelisierung von Data Mining betrachtet. Zuerst wird eine Auswahl verschiedener Algorithmen aus den Bereichen der Assoziations- und Clusteranalyse vorgestellt, welche auf verschiedenen Parallelisierungs-Paradigmen basieren. Anhand dieser werden verschiedene Ansätze vorgestellt, mit denen eine Parallelisierung, zum Teil bekannter sequentieller Algorithmen, erreicht wird. Mit einer experimentellen Untersuchung wird für eine Auswahl von Algorithmen analysiert, ob sich diese für die Parallelisierung und somit Verarbeitung großer Datenmengen eignen. Der Fokus für die Versuche liegt hier auf dem MapReduce-Paradigma. 7 1.2 Überblick In Kapitel 2 werden zunächst die relevanten Grundlagen der Arbeit vorgestellt. Es soll ein Überblick geliefert werden, wie die Entdeckung von Wissen aus Daten funktioniert. Daran anschließend werden die für diese Arbeit relevanten Teilbereiche, die Assoziationsund Clusteranalyse mit den bekanntesten Verfahren vorgestellt. Weiterhin werden die für die nachfolgenden Kapitel relevanten Themen Parallelrechner sowie MapReduce präsentiert. Außerdem wird mit der Union-Find-Struktur eine Datenstruktur vorgestellt, die von einigen der vorgestellten parallelen Algorithmen verwendet wird. Eine Auswahl paralleler Data Mining-Algorithmen wird in Kapitel 3 vorgestellt. Es werden Algorithmen aus den Bereichen Assoziations- und Clusteranalyse vorgestellt, die zum einen auf verschiedenen Parallelisierungsparadigmen, wie etwa MapReduce oder Shared-Memory, basieren. Zum anderen basieren diese auf vielfältigen Ansätzen, um zum Teil populäre sequentielle Algorithmen zu parallelisieren. Kapitel 4 präsentiert die Implementierungen der Algorithmen, die im Rahmen dieser Arbeit für Versuche auf Grundlage von Hadoop entwickelt worden sind. Bei diesen handelt es sich um den MapReduce-Partition- sowie den MapReduce-DBSCAN-Algorithmus, welche in Kapitel 3 vorgestellt werden. Außerdem werden oberflächlich die Implementierungen der beiden anderen untersuchten Algorithmen, MapReduce-kMeans und Parallel FP-Growth, analysiert, die ebenfalls für die Versuche verwendet werden; diese stammen aus dem Mahout-Projekt der Apache Software Foundation, welches skalierbare Algorithmen für maschinelles Lernen bereitstellt. Die Tauglichkeit der MapReduce-Implementierungen wird in Kapitel 5 betrachtet. Hierbei steht unter anderen die Skalierbarkeit der Verfahren im Vordergrund und ob diese mit verhältnismäßig großen Datenmengen umgehen können. Weiterhin wird untersucht, wie sich die Laufzeit der Algorithmen bei Variation der Steuerparameter verhält sowie jeweils ein Vergleich der Algorithmen aus dem jeweiligen Gebiet der Versuche vorgenommen. In Kapitel 6 werden abschließend die wichtigsten Punkte der Arbeit zusammengefasst. Zusätzlich wird ein abschließendes Fazit gezogen. 8 Kapitel 2 Grundlagen In diesem Kapitel werden die beiden Bestandteile des Prozesses der Knowledge Discovery in Datenbanken vorgestellt, die in dieser Arbeit betrachtet werden. Hierbei handelt es sich um die Data Mining-Techniken der Assoziationsanalyse sowie der Clusteranalyse. Der Fokus liegt auf diesen beiden Verfahren, da diese es ermöglichen, Muster in unbekannten Daten zu erkennen. Diese sind die Grundlage um Regeln zu entwickeln, mit denen Verfahren wie die Klassifikation oder die Generalisierung angewendet werden. Für diese beiden Analysearten werden grundlegende sequentielle Algorithmen präsentiert. Zusätzlich wird die weniger bekannte Union-Find-Struktur erläutert, die im späteren Verlauf in einigen Implementierungen paralleler Clusteringverfahren zum Einsatz kommt, sowie das MapReduce-Paradigma, welches für einige Implementierungen paralleler Algorithmen, die in dieser Arbeit vorgestellt werden, verwendet wird. 2.1 Knowledge Discovery in Databases Ester und Sander definieren den Begriff der Knowledge Discovery in Databases (KDD) [ES00] als einen Prozess, der, automatisch oder semi-automatisch, Wissen aus Datenbanken gewinnt. Dieses muss jedoch im statistischen Sinne gültig sein, darf bisher nicht bekannt sein und soll nach Möglichkeit Nutzen bringen. Der iterative Prozess, der in Abbildung 2.1 dargestellt ist, besteht aus fünf Schritten, die im Folgenden kurz vorgestellt werden. Fokussierung Das Ziel der Fokussierung ist es, zunächst ein Verständnis über die Anwendung zu gewinnen und in Erfahrung zu bringen, welches Anwendungswissen bereits vorhanden ist. 9 Gesamter Datenbestand Fokussieren Relevanter Datenbestand Vorverarbeitung Vorverarbeitete Daten Evaluation Entdecktes Wissen Transformation Transformierte Daten Data Mining Erkannte Muster Abbildung 2.1: Der KDD-Prozess Hieraus wird das zu erreichende Ziel des KDD-Prozesses abgeleitet, insbesondere unter dem Gesichtspunkt, dass das neu generierte Wissen der Anwendung Nutzen bringen muss und nicht bereits bekannt sein darf. Weiterhin gilt es festzulegen, welche Quellen herangezogen werden sollen für die Suche und wie diese verfügbar gemacht werden können. Ein weiterer wichtiger Punkt ist, in welcher Form die zu verwendenden Daten vorgehalten werden sollen. In vielen Fällen geschieht dies mit Hilfe eines Datenbanksystems, da dies beispielsweise im nächsten Schritt, der Vorverarbeitung, eine einfache Selektion der gewünschten Daten aus dem für die Anwendung relevanten Datenbestand erlaubt. Vorverarbeitung Im zweiten Schritt, der Vorverarbeitung, geht es um die Integration der für relevant erachteten Werte, welche gegebenenfalls in eine konsistente Form zu bringen und zu vervollständigen sind. Für Daten aus unterschiedlichen Quellen gilt, dass für diese beispielsweise verschiedene Einheiten (z.B. angloamerikanisches Maßsystem ↔ metrisches Maßsystem) oder verschiedene Namen für dasselbe Objektattribut verwendet wurden. Zusätzlich können Inkonsistenzen durch Schreibfehler auftreten oder fehlerhafte Messungen zu Rauschen führen, also zu einem Zufallsmuster, das wiederum die tatsächlich vorhandenen Muster überlagern kann. Transformation Die vorverarbeiteten Daten sind zunächst in eine Form zu bringen, die für die zur Anwendung kommenden Data Mining-Verfahren geeignet sind. In den meisten Fällen bedeutet dies, dass nur ausgewählte Attribute der vorverarbeiten Daten verwendet werden (Selektion), die für die Aufgabe relevant sind, oder dass Attributwertbereiche reduziert werden 10 (Diskretisierung), etwa die Reduktion eines Attributs ”Alter” von einem numerischen Wert auf einen kategorischen Wert (Kind|Erwachsener). Data Mining Der vierte Schritt ist die Anwendung von effizienten Algorithmen auf die vorverarbeiteten Daten, um gültige Muster in dem Datenbestand zu finden. Hierfür ist es zunächst notwendig, die Aufgaben zu identifizieren, die im Data Mining-Schritt zu bewältigen ist. Die wichtigsten Aufgaben sind: • Clusteranalyse: Die Aufgabe der Clusteranalyse ist die Gruppierung der Datenbankobjekte in Gruppen (Cluster), so dass Objekte desselben Clusters in ihren Attributen möglichst ähnlich, Objekte verschiedener Objekte jedoch möglichst unähnlich sind. Aufgrund der Bedeutung im Rahmen dieser Arbeit wird die Clusteranalyse in Abschnitt 2.3 auf Seite 22 ausführlicher beschrieben. • Klassifikation: Gegeben ist eine Menge von Trainingsobjekten mit Attributwerten, für die eine Klassenzuordnung vorhanden ist. Ziel ist das Anlernen einer Funktion z.B. basierend auf Entscheidungsbäumen oder Regeln, die eine Klassifikation zukünftiger Objekte anhand ihrer Attributausprägungen durchführt. • Assoziationsanalyse: Aus einer gegebenen Datenbank mit Transaktionen (Itemsets) sollen Regeln der Form IF A AND B THEN C “gefunden werden, die ” häufig auftretende und starke Zusammenhänge unter den vorkommenden Items beschreiben. Da diese Data Mining-Aufgabe, ebenso wie die Clusteranalyse, für diese Arbeit von zentraler Bedeutung ist, erfolgt eine tiefergehende Beschreibung der Assoziationsanalyse in Abschnitt 2.2 auf Seite 12. • Generalisierung: Das Ziel ist das Erreichen einer möglichst kompakten Beschreibung einer Datenmenge, indem Attributwerte generalisiert und die Anzahl von Datensätzen reduziert wird. Sobald die Aufgabe identifiziert ist kann basierend auf den Anwendungszielen und der Beschaffenheit der Daten ein angemessener Algorithmus ausgewählt und ausgeführt werden. Evaluation Zum Abschluss des KDD-Prozesses werden vom ausführenden System die Ergebnisse, in diesem Fall gefundene Muster, einem Experten der Anwendungsdomäne in geeigneter Weise dargestellt. Dieser entscheidet darüber, ob die gesetzten Ziele erreicht worden sind, 11 oder ob eine zusätzliche Iteration des KDD-Prozesses durchzuführen ist. Hierbei ist festzuhalten, dass in einem weiteren Iterationsschritt nicht der vollständige Prozess abermals auszuführen ist; es kann auch beispielsweise nur das Data Mining mit einer veränderten Parameterwahl ausgeführt werden, wenn die zugrunde liegenden vorverarbeiteten Daten von ausreichender Qualität sind. Damit auch zukünftige Prozesse neues Wissen generieren, ist es notwendig, gefundenes Wissen in das bestehende System einzupflegen, damit eine wiederholte Durchführung des KDD-Prozesses auch weiterhin unbekanntes Wissen liefert. 2.2 Assoziationsanalyse Assoziationsregeln, die aus Verfahren der Assoziationsanalyse gewonnen werden, werden auch als Warenkorbanalyse bezeichnet. Warenkörbe sind hier analog zu Kundeneinkäufen zu betrachten. Ein Warenkorb ist hierbei zusammengesetzt aus Objekten (Items), die zusammen gekauft werden, unabhängig davon ob es sich um materielle (Artikel im Supermarkt) oder immaterielle (Dienstleistungen, Informationen) Objekte handelt. Ein anschauliches Beispiel für eine Datenbank von Transaktionen ist die Datensammlung, die sich aus den Scanvorgängen an den Kassen eines Supermarkts ergibt. Eine Transaktion besteht in diesem Fall aus Informationen zu allen Artikeln (z.B. Barcodes), die ein Kunde zu einem Zeitpunkt gekauft hat. Die Grundlage für eine tiefer gehende Analyse von Transaktionsdatenbanken ist das Bestimmen von Objektmengen, so genannten Itemsets, die häufig in dieser Datenbank auftreten frequent Itemsets, wobei die Itemsets auch als k-Itemsets bezeichnet werden; k bezeichnet hier die Anzahl der Elemente des Itemsets. Hierfür wird zunächst der Support s(X) für ein Itemset X berechnet, welcher den Anteil von Transaktionen darstellt, die X enthalten. Von einem häufigen Itemset X spricht man nun, wenn der Support s(X) größer gleich einem festgelegten Schwellenwert minimumSupport ist, also s(X) ≥ minimumSupport. Für ein Itemset {Chips, Brezeln, Bier} wäre der Support beispielsweise s({Chips, Brezeln, Bier}) = |{Chips, Brezeln, Bier}| , |Transaktionsdatenbank| also in wie vielen Transaktionen der Datenbank das Itemset vorhanden ist. Auf Grundlage der häufigen Itemsets können Assoziationsregeln gesucht werden, welche Zusammenhänge in den Transaktionen darstellen. Ein Beispiel hierfür wäre die Regel s({Chips, Brezeln ⇒ Bier}), welche besagt, dass das Vorhandensein von Chips und Brezeln in einem Warenkorb auf das gleichzeitige Vorhandensein von Bier schließen lässt. 12 Es ist jedoch wichtig, auch die Qualität der Regeln zu bewerten. Ein Kriterium dafür ist die Konfidenz. Diese bezeichnet die relative Häufigkeit, in der die Regel korrekt ist, und bedeutet im Falle der Regel {Chips, Brezeln ⇒ Bier}, in wie vielen Regeln, die {Chips, Brezeln} enthalten, zusätzlich auch {Bier} enthalten ist (Gleichung 2.1). confidence({Chips, Brezeln ⇒ Bier}) = s{Chips, Brezeln, Bier} s{Chips, Brezeln} (2.1) Ein weiteres Kriterium ist der Lift, mit dessen Hilfe sich berechnen lässt, ob und wie weit die Konfidenz für eine Regel den Erwartungswert übertrifft (Gleichung 2.2); der Lift ist folglich ein Indikator für die generelle Bedeutung einer Regel. lift({Chips, Brezeln ⇒ Bier}) = s({Chips, Brezeln} ∪ {Bier}) s({Chips, Brezeln}) · s({Bier}) (2.2) Der Fokus ist hinsichtlich großer Datenmengen auf den Aufgabenbereich der Bestimmung häufiger Itemsets (Frequent-Itemset-Mining) gerichtet, da für diese Aufgabe zunächst alle möglichen Itemsets, die sich aus dem Datenbestand ergeben, zu bestimmen sind. Sei nun I die Menge aller Items, die in einer Transaktionsdatenbank mindestens einmal auftreten. Dann ist die Menge aller möglichen Itemsets die Potenzmenge von I ohne die leere Menge und somit |I| = 2n − 1. Deshalb betrachtet diese Arbeit nur das Frequent-Itemset-Mining, für welches im Folgenden die beiden bekannten Algorithmen Apriori und FP-Growth vorgestellt werden. 2.2.1 Der Apriori-Algorithmus Der Apriori-Algorithmus [AS94] ist ein Verfahren der Assoziationsanalyse, um häufige, bzw. interessante Itemsets aus Transaktionsdatenbanken zu berechnen. Die Grundlage dieses Algorithmus ist das Apriori-Prinzip, welches besagt, dass wenn ein Itemset häufig ist, so sind auch dessen Teilmengen häufig. Zusätzlich gilt für häufige Itemsets eine AntiMonotonie (Gleichung 2.3), aus welcher gefolgert werden kann, dass der Support eines Itemsets nicht größer sein kann als der größte Support der Teilmengen. ∀X, Y : (X ⊆ Y ) =⇒ s(X) ≥ s(Y ) (2.3) Der Apriori-Algorithmus 2.1 zählt in der ersten Iteration zunächst alle häufigen 1Itemsets durch einen Scan der Transaktionsdatenbank. Die nachfolgenden Iterationen (Zeile 2-8) bestehen jeweils aus zwei Phasen. In der ersten wird aus den im vorherigen Durchlauf berechneten häufigen (k−1)-Itemsets mit dem Apriori-Gen-Algorithmus 2.2 13 die Kandidatenmenge der häufigen k-Itemsets berechnet (Zeile 5). In der zweiten Phase (Zeile 5-8) wird der Support für die Kandidaten in Ck gezählt. Itemsets der Länge k mit einem Support größer gleich dem minimalen Support minSup werden zum Abschluss der Menge der häufigen k-Itemsets hinzugefügt (Zeile 9). Am Ende gibt der Algorithmus schließlich die Vereinigung aller häufigen Itemsets zurück (Zeile 10). Algorithmus 2.1 Apriori-Algorithmus ([AS94] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: input: T - list of transactions input: minSup - minimal support required for frequent itemsets output: set of found frequent itemsets procedure Apriori(T , minSup) L1 = {frequent 1-itemsets}; for (k = 2; Lk−1 6= ∅; k + +) do Ck = Apriori-Gen(Lk−1 ); for all transactions t ∈ T do // calculate itemsets contained in transaction t Ct = Subset(Ck , t); for all candidates c ∈ Ct do c.count++; Lk = {c ∈ Ck |c.count ≥ minSup} S return k Lk Der Algorithmus zur Erzeugung neuer, größerer Itemsets (Algorithmus 2.2) benutzt die Menge Lk−1 , die Menge aller häufigen (k−1)-Itemsets und konstruiert daraus eine Obermenge von Itemsets der Länge k. In den Zeilen 2 bis 5 werden diese erzeugt, indem zunächst zwei häufige (k − 1)-Itemsets p und q gewählt werden, die auf den Position 1, . . . , k−2 identisch sind und sich in der k−1-ten Position unterscheiden, das (k−1)-te Element von p jedoch kleiner als das von q ist. Die Positionen 1, . . . , k −2 werden nun mit den zwischen p und q gleichen Elementen besetzt, die (k −1)-te Position mit dem (k−1)-ten Element aus p und das k-te Element mit dem aus q. Im Anschluss an die Erzeugung der Itemsets ist abschließend die Apriori-Eigenschaft zu prüfen, also ob die Kandidaten häufige (k-1)-Itemsets enthalten. Dadurch lässt sich bereits die Menge der Itemsets, deren Support zu berechnen ist, reduzieren. Hierfür werden alle Kandidaten durchlaufen (Zeile 7), von denen wiederum alle (k-1)-Teilmengen durchlaufen werden. Wenn nun eine dieser Teilmengen nicht häufig ist, also nicht in Lk−1 enthalten ist, so folgt aus der Apriori-Eigenschaft, dass das k-Itemset nicht häufig sein kann. 14 Algorithmus 2.2 Apriori-Gen - Erzeugung neuer Itemsets ([AS94] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: input: Lk−1 - frequent itemsets of length k−1 output: Ck - frequent itemsets of length k procedure Apriori-Gen(Lk−1 ) // generating new itemsets insert into Ck select p.item1 , p.item2 , . . . , p.itemk−1 , q.itemk−1 from Lk−1 p, Lk−1 q where p.item1 = q.item1 , . . . , p.itemk−1 = q.itemk2 , p.itemk−1 < q.itemk−1 ; // prune candidates containing (k − 1)-subsets that are not in Lk−1 for all itemsets c ∈ Ck do for all (k−1)-subsets s of c do if (s ∈ / Lk−1 ) then delete c from Ck return Ck Laufzeit Für den ersten Schritt, die Bestimmung der frequent 1-Itemsets, ist eine Laufzeit von O(|T | · w) erforderlich, wobei w die durchschnittliche Transaktionslänge ist. Für die Erzeugung von Kandidaten werden Itemsets der Länge k −1 paarweise verglichen, wobei hierfür maximal k − 2 Vergleiche pro Paar benötigt werden. Maximal müssen hier alle gefundenen Itemsets betrachtet werden, so dass hier für die Pwder vorherigen Iteration 2 Laufzeit maximal O( k=2 (k − 2) · |Lk−1 | ) beträgt. Für das Pruning ist für jeden generierten Kandidaten zu prüfen, P ob alle (k −1) Teilmengen ebenfalls häufig sind, so dass Bei der Zählung des Supportes sich eine Laufzeit von O( w k=2 k · (k −1) · |Ck |) ergibt. |t| ergeben sich für jede Transaktion mit |t| k Itemsets. Insgesamt ist die LaufP Länge w zeit für die Supportzählung O(|T | · k · k αk ), wobei αk die Kosten zur Aktualisierung eines Itemset-Kandidaten sind. Für einen sehr niedrigen Support ergibt sich insgesamt aufgrund der Anzahl potentieller Itemsets eine exponentielle Laufzeit. Nachteile Bei einem niedrig angesetzten minimalen Support-Schwellwert kann es beim AprioriAlgorithmus passieren, dass es eine Vielzahl häufiger und langer Itemsets geben kann. Dies führt dazu, dass der Apriori-Algorithmus sehr teuer hinsichtlich des Speicherbedarfs wird, da eine große Anzahl von Kandidaten-Itemsets zu handhaben ist. Beispielsweise führt eine Menge von 104 häufigen 1-Itemsets bereits zu mehr als 107 2 10000 Itemset-Kandidaten ( 2 = 4, 9995 · 107 ). Zusätzlich ist für diese Kandidatenmenge auch der Support zu bestimmen. Ein weiterer Punkt ist, dass für die Entdeckung ei- 15 nes häufigen Itemsets {a1 , . . . , a100 } der Länge 100 der Erzeugung von 2100 − 1 ≈ 1030 Kandidaten notwendig ist, was in Folge der Kandidatengenerierung nicht zu vermeiden ist. 2.2.2 Frequent Pattern Growth Der Frequent-Pattern-Growth(FP-Growth)-Algorithmus [HPY00] versucht die Nachteile des Apriori-Algorithmus zu umgehen, in dem die Transaktionsdatenbank in Form einer Baum-Struktur dargestellt wird, aus welcher die häufigen Itemsets gewonnen werden können. Der Ansatz, eine Baum-Struktur zur Kodierung der Transaktionen zu verwenden, ermöglicht, dass die Menge der häufigen Itemsets berechnet werden kann, ohne dass, wie im Apriori-Algorithmus, iterativ Kandidatenmengen berechnet und bewertet werden müssen. Dieses wird durch drei verschiedene Ansätze erreicht. Erstens wird eine erweiterte Präfix-Baum-Struktur und quantitative Informationen der Itemsets in der Baum-Struktur gespeichert. Zusätzlich werden in einem FP-Baum nur Knoten gespeichert, die in der Menge der häufigen 1-Itemsets vorkommen. Schließlich werden die Knoten des Baumes so angeordnet, dass häufig vorkommende Items eine größere Chance haben, sich Knoten mit anderen Pfaden zu teilen. Der zweite Ansatz ist eine Methode zur Erzeugung wachsender Itemsets aus einem FPBaum, die mit häufigen 1-Itemsets beginnt (Ausgangs Suffix-Itemset) und im weiteren Verlauf nur eine bedingte Itemset-Basis betrachtet, d.h. die Teildatenbank der Menge von häufigen Itemsets, die zusammen mit dem Suffix-Pattern auftreten. Basierend auf dem Suffix-Itemset und der daraus resultierenden bedingten Itemset-Basis wird ein bedingter FP-Baum erzeugt, auf welchem die Itemset-Erzeugung rekursiv aufgeführt wird. Das Wachstum der Itemsets geschieht durch das Aneinanderhängen der Suffix-Patterns mit neuen Suffix-Patterns aus dem bedingten FP-Baum. Die Vollständigkeit dieser Methode des Pattern-Wachstums wird garantiert, da häufige Itemsets einer Transaktion immer in einem zugehörigen Pfad des FP-Baums codiert ist. Als drittes wird als Suchtechnik zur Generierung häufiger Itemsets ein Divide-andConquer-Ansatz verwendet. Dieser reduziert die Größe der bedingten Itemset-Basis, da das Problem des Findens langer, häufiger Itemsets in die Suche nach kurzen Itemsets und das Aneinanderhängen von Suffixen umgewandelt wird. Frequent-Pattern Baum Ein Frequent-Pattern-Baum (Abbildung 2.2) ist eine Baumstruktur, die als Wurzel einen als null“ bezeichneten Knoten besitzt, der als Attribut die Anzahl der im Baum gespei” cherten Transaktionen festhält. An der Wurzel wird eine Menge von Präfix-Teilbäumen 16 der Elemente eingehängt. Die Struktur verfügt zusätzlich über eine Header-Tabelle, in der die im Baum enthaltenen Items sowie Zeiger zum ersten Vorkommen des Items im Baum enthalten sind. null:5 Item A B C D E F A:2 Header C:1 F:1 B:2 D:1 E:1 D:1 C:1 F:1 F:1 Abbildung 2.2: Beispiel eines Frequent-Pattern Baums Jeder Knoten besitzt drei Attribute: das erste ist der Knotenname, der das Item angibt, welches der Knoten repräsentiert. Das zweite ist die Anzahl der Transaktionen, die durch den Teilpfad, in dem der Knoten liegt, bis zu diesem Knoten repräsentiert werden. Das dritte Element schließlich ist ein Zeiger, der auf den nächsten Knoten im Baum mit demselben Namen zeigt. Die Header-Tabelle schließlich besitzt zwei Spalten; in der ersten Spalte steht der Name des Items, die zweite enthält einen Zeiger zum ersten Vorkommen des Elements im Baum. Dieses Konstrukt ermöglicht ausgehend von der Header-Tabelle das einfache Finden jedes Vorkommens eines Items in einem FP-Baum durch Verfolgen von Zeigern. Algorithmus 2.3 Aufbau eines FP-Baums (basierend auf [HPY00] erarbeitet) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: input: tdb - database of transactions, ξ: minimum support threshold output: T - a fp-tree procedure ConstructFpTree(tdb, ξ) define list F[] . list of frequent 1-itemsets for each transaction t in tdb do for each item ai in t do F [ai ] + + sort F in descending order relating to item count create root of FP-Tree T , labeled as null“ ” for each transaction t ∈ tdb do order and filter frequent items in t according to F into f InsertTree(f, T ) return T Um den FP-Baum aufbauen zu können (Algorithmus 2.3), wird zunächst ein Scan der 17 Transaktionsdatenbank durchgeführt und eine Liste der häufigen Itemsets erstellt, welche absteigend nach der Vorkommenshäufigkeit sortiert wird. Anschließend wird die Transaktionsdatenbank nochmals vollständig durchlaufen. Jede Transaktion wird nun hinsichtlich der Liste der häufigen Itemsets sortiert und gefiltert, so dass die Transaktion nur noch häufige Items enthält und diese nach der Häufigkeit des Vorkommens sortiert ist; diese wird mit der Funktion InsertTree in den Baum eingefügt. Das Einfügen eines Itemsets in den FP-Baum (InsertTree, Algorithmus 2.4) beginnt an der Wurzel des Baumes T . Hier wird überprüft, ob T einen Kindknoten n mit dem Namen des ersten Items e aus t hat (Zeile 2). Wenn dies der Fall ist, wird im Kindknoten n der Zähler für das Element um eins inkrementiert. Algorithmus 2.4 Einfügen eines Itemsets in einen FP-Baum (basierend auf [HPY00] erarbeitet) input: t - itemset, T - a fp-tree procedure InsertTree(t, T ) let e be the first item of t if T.hasChild(n)and n.itemName == e.itemName then increase n’s item count by 1 else create new node n with name e and count 1 set n’s parent to T set n’s node link to nodes with same item name using the node link structure 9: remove item e from the itemset t 10: if t is not empty then 11: InsertTree(t, n) 1: 2: 3: 4: 5: 6: 7: 8: Falls T jedoch kein Kindknoten mit dem Namen e besitzt, so wird ein neuer Knoten n mit einen Itemzählwert von 1 und T als Vorgänger erzeugt. Zusätzlich wird n in die node link Struktur des Baumes eingebunden. Im Anschluss an beide Fälle wird das Item e aus t entfernt und erneut die Funktion InsertTree aufgerufen mit den verbliebenen Rest von t sowie n als Wurzel, sofern t nicht leer ist. Ein Beispiel für den Aufbau eines FP-Baumes ist in Abbildung 2.3 zu sehen; eingefügt wird in diesen die Transaktionsmenge {T1 = {A, B}, T2 = {A, C}, T3 = {B, C}}. Zu Beginn des Einfügevorgangs besteht der Baum nur aus der Wurzel null:0“. Wenn nun ” also T1 eingefügt wird, muss ein neuer Knoten A“ mit einem Zähler von 1 als Kind von ” der Wurzel erzeugt werden. Zusätzlich wird ein Zeiger in der Header-Tabelle angelegt, der auf A“ zeigt. Das Einfügen von B“ beginnt mit Knoten A“ als Startknoten. Da ” ” ” auch hier kein entsprechendes Kind vorhanden ist, wird analog zum ersten Item ein neuer Knoten B“ mit einem Zählerwert von 1 angelegt und ein Zeiger von der Header-Tabelle ” auf den Knoten erzeugt. Das Einfügen von T2 beginnt analog zu T1 , jedoch ist unterhalb der Wurzel von T bereits ein Kindknoten mit dem Namen A“ vorhanden, so dass von ” 18 null:1 Items Header A:1 A B B:1 null:2 Items Header A A:2 B C B:1 C:1 null:3 Items Header A A:2 B B:1 C B:1 C:1 C:1 Abbildung 2.3: Konstruktion eines FP-Baumes diesem lediglich der Zählerwert um eins inkrementiert wird. Um das Item C“ unterhalb ” von A einzufügen, ist jedoch wieder ein neuer Knoten C“ mit einem Zählerwert von ” eins anzulegen und zusätzlich wird ein Link von der Header-Tabelle auf C“ angelegt. ” Das Einfügen von T3 erfolgt weitestgehend analog, jedoch müssen zusätzliche Zeiger von den bereits vorhandenen Pfadenden der Headertabelle auf die neuen Knoten angelegt werden. Die Komplexität für das Einfügen eines Itemsets in einen FP-Baum beträgt, grob abgeschätzt, O(|t|), wobei t die Länge des Itemsets ist. Sei A nun die Menge aller in der Transaktionsdatenbank vorkommen Items mit der Anzahl ihrer Vorkommen. Dann beträgt die Komplexität für den Aufbau eines FP-Baumes grob O((|tdb| ∗ |t|) + O(A ∗ log(A)) + O(|tdb| ∗ (|t| + |f |)) unter der Annahme, dass A in O(n ∗ log(n)) sortiert werden kann. 19 Erzeugung von häufigen Itemsets aus einem FP-Baum Das Erzeugen von häufigen Itemsets aus einem FP-Baum (FPGrowth, Algorithmus 2.5) lässt sich in zwei Fälle unterteilen. Im ersten Fall (Zeile 2-6), wenn der bedingte FPBaum T nur noch einen einzigen Pfad p enthält, so werden die verbleibenden Knoten β des Pfades p in allen möglichen Kombinationen mit dem bisherigen Itemset-Fragment α kombiniert und erhalten als Support das Minimum aus β. Ausgegeben werden die so erzeugten Itemsets, wenn der Support von β den minimalen Support ξ erreicht oder überschreitet. Im anderen Fall (Zeile 7-14) ist nicht nur ein einzelner Pfad in T verbleibend, sondern ein ganzer Baum. Zunächst wird jedes im Baum vorhandene Item ai mit dem bis hierher entstandenen Itemset-Fragment α zu einen Itemset β verbunden, welches den Support von ai erhält und ausgegeben, wenn der Support des neuen Itemsets den minimalen notwendigen Support ξ erreicht. Hierfür werden alle Pfade von der Wurzel T ’s ausgehend betrachtet, die mit dem Item ai enden. Die Items, aus denen die Pfade bestehen, bilden die bedingte Itemset-Basis (conditional pattern base). Auf dieser Grundlage wird dann der bedingte FP-Baum (conditional frequent pattern tree) von Tβ für das Itemset-Frament β aufgebaut (ConstructFPTree, Algorithmus 2.3), der auf β endende Transaktionen enthält. Algorithmus 2.5 Frequent-Pattern Growth ([HPY00] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: input: T - a fp tree input: α - itemset fragment input: ξ - minimum support output: β - itemsets procedure FPGrowth(T , α, ξ) if T contains a single path p then for each combination β of nodes in path p do generate pattern β ∪ α with suppport = min{s(β)} if β.support > ξ then Output β else for each ai in header-table of T do generate pattern β = ai ∪ α with sup = ai .sup if β.sup > ξ then Output β construct ai ’s cond. pattern base and then β’s cond. FP-Tree Tβ if Tβ 6= ∅ then call FPGrowth(Tβ , β, ξ) Eine Abschätzung der Laufzeit des Algorithmus nach unten ist nicht möglich. Dieses liegt an der Tatsache, dass sich in keinem Rekursionsschritt vorhersagen lässt, wie groß die Anzahl und Größe der resultierenden Teilprobleme ist. Eine Abschätzung nach oben ist jedoch möglich. Angenommen wird hierfür der Worst-Case, in welchem jede Trans- 20 aktion aus Items besteht, die in keiner anderen Transaktion auftreten. In diese Fall ist der FP-Baum mindestens so groß wie die Transaktionsdatenbank. Für die Suche nach frequent Itemsets muss nun im schlimmsten Fall jeder Zweig des Baumes abgearbeitet werden, so dass die Laufzeit für eine Transaktionsdatenbank T mit einer durchschnittlichen Transaktionslänge maximal O(|T | · w) beträgt. Beispiel Ein einfaches Beispiel für die Erzeugung von Itemsets aus einem FP-Baum ist in Abbildung 2.4 zu sehen. In (1) ist zunächst der initiale FP-Baum dargestellt, in welchem Itemsets gesucht werden sollen, die auf c enden. Der Teilbaum in (2) ist der resultierende Teilbaum, der lediglich Transaktionen enthält, die auf c enden. null:3 A:2 (1) B:1 B:1 C:1 C:1 null:3 (2) A:2 B:1 C:1 C:1 null:3 null:3 (3) A:2 B:1 Abbildung 2.4: Beispiel für die Generierung von Itemsets aus einem FP-Baum In (3) sind schließlich die bedingten Bäume zu sehen, die aus (2) für jedes häufige Itemset in T gebildet werden. Die Abarbeitung wird rekursiv fortgesetzt, so dass schlussendlich die beiden Itemsets {AC} und {BC} als Ergebnis geliefert werden. 21 Nachteile Der FP-Growth Algorithmus ist hinsichtlich seiner Fähigkeit, große Datenmengen verarbeiten, ebenso eingeschränkt wie der Apriori-Algorithmus. Zwar entfällt der Aufwand für Erzeugung und Verwaltung der Itemset-Kandidaten, trotzdem können bei großen Transaktionsdatenbanken mit vielen Kandidaten die FP-Bäume eine Größe erreichen, die nicht mehr im Hauptspeicher handhabbar ist. Zusätzlich spielt der minimale Support eine entscheidende Rolle. Je größer der minimale Support ist, desto kleiner fallen die FP-Bäume und die Menge der gefundenen Itemsets aus. Für die Warenkorbanalyse mag dies ein gangbarer Weg sein, bei Internet-Suchmaschinen etwa können jedoch auch Itemsets mit geringem Support von Interesse sein. 2.3 Clusteranalyse Die Clusteranalyse wird mit dem Ziel durchgeführt, Daten automatisch bzw. semiautomatisch in Kategorien, Gruppen(Cluster) oder Klassen einzuordnen. Das Ziel ist hierbei, dass Objekte, die sich in demselben Cluster befinden, hinsichtlich der betrachteten Attribute möglichst ähnlich zueinander sind, Objekte aus verschiedenen Clustern jedoch möglichst unähnlich zueinander sind. Allerdings ist es zuvor notwendig, den Begriff der Ähnlichkeit, der Aufgabenstellung entsprechend, geeignet zu modellieren. Im Falle numerischer Attributwerte kann für die Berechnung der Ähnlichkeit beispielsweise die euklidische Distanz (Gleichung 2.4) verwendet werden, wenn für die Attributwerte xi der Datensätze x = (x1 , . . . , xd ) Kategorien verwendet werden, so ergibt sich die Distanz aus der Anzahl der verschiedenen Komponenten zwischen unterschiedlichen Datensätzen (Gleichung 2.5). p (x1 − y1 )2 + . . . + (xn − yn )2 (2.4) ( 0, wenn(xi = yi ) δ(xi , yi ) mit δ(xi , yi ) = 1, wenn(xi 6= yi ) (2.5) dist(x, y) = dist(x, y) = d X i=1 Im Folgenden werden drei bekannte sequentielle Algorithmen für die hierarchische und partitionierende Clusteranalyse vorgestellt und hinsichtlich ihrer Eignung unter Berücksichtung aktueller Anforderungen betrachtet. 22 2.3.1 Hierarchisches Clustering Im agglomerativen hierarchischen Clustering wird zunächst jeder Datenpunkt als ein Cluster aufgefasst. Anschließend werden diese Cluster Schritt für Schritt vereinigt, bis am Ende ein einzelner Cluster übrig bleibt (Algorithmus 2.6). Ein hierarchisches Clustering wird oftmals durch ein Dendrogram dargestellt (Abbildung 2.5). 4 5 3 2 1 1 2 3 4 5 Abbildung 2.5: Hierarchisches Clustering - Beispiel Die Vereinigung der Cluster erfolgt über die Betrachtung der Distanz zwischen den Clustern; in jeder Iteration werden die Cluster vereinigt, die die geringste Distanz, bzw. die größte Ähnlichkeit, zueinander besitzen. Algorithmus 2.6 Naiver Algorithmus für agglomeratives hierarchisches Clustering ([TSK05] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: input: set of points D output: dendrogram d representing the clustering procedure AgglomerativeHierarchical-Clustering(D: Set of points) compute proximity matrix P from point-set D initialize dendrogram d let each data point ∈ D be a cluster repeat merge closest pair of clusters adjust d to represent this merging update P to reflect changed proximity between new and old clusters until only one cluster left return dendrogram d Für die Berechnung der Distanz bzw. der Ähnlichkeit zwischen den Clustern gibt es verschiedene Ansätze, unter anderem: 23 • Min: Verwendung der Distanz zwischen den nächsten Punkten zweier verschiedener Cluster (single link) • Max: Verwendung der Distanz zwischen den am weitesten voneinander entfernten Punkten zweier verschiedener Cluster (complete link) • Group-Average: Distanz zweier verschiedener Cluster zueinander ist der Durchschnitt der paarweisen Distanz zwischen den Punkten der Cluster c1 und c2 : P dist(c1 , c2 ) : pi ∈c1 ,pj ∈c2 dist(pi ,pj ) |c1 |∗|c2 | Die Laufzeit des naiven Algorithmus aus 2.6 liegt bei Nutzung einer Distanzmatrix und n−1 Vereinigungen bei O(n3 ). Ein optimaler Algorithmus mit einer Laufzeit von O(n2 ) wird in Kapitel 3.2.4 vorgestellt. 2.3.2 Partitionierendes Clustering Beim partitionierenden Clustering wird eine Aufteilung der Daten berechnet, so dass jeder Punkt genau einem oder keinem Cluster zugeordnet ist. Zwei der bekanntesten Ansätze werden im Folgenden vorgestellt. k-Means Durch den k-Means-Algorithmus (siehe Algorithmus 2.7) wird aus einer Menge von Punkten eine Partitionierung in k disjunkte Gruppen berechnet. Hierfür werden zunächst k initiale Schwerpunkte, die nicht Teil der zu partitionierenden Punktemenge sein brauchen, bestimmt und jeder Punkt dem ihm nächsten Schwerpunkt zugewiesen. Algorithmus 2.7 k-Means Clustering ([TSK05] angepasst) 1: 2: 3: 4: 5: 6: 7: input: D - dataset to be clustered, K - initial centroids output: K - final centroids procedure KMeansClustering(D, K) assign each point to its nearest centroid k ∈ K repeat compute new centroids based on groups of points assigned to former centroids assign each point to its nearest centroid k ∈ K until centroids have not moved or centroid movement < δ return K Mittels dieser Zuordnungen werden nun neue Schwerpunkte berechnet und die Punkte wieder dem nächsten Schwerpunkt zugeordnet. Dieser Vorgang wird so lange iteriert, 24 bis sich die Schwerpunkte nicht mehr ändern oder die Änderungen einen festzulegenden Schwellwert unterschreiten. Die Berechnung der Schwerpunkte ist im k-Means Algorithmus die Summe der quadrierten Abweichungen (SSE) von den Clusterschwerpunkten; das Ziel des Algorithmus ist die Minimierung dieser Abweichung. SSE = K X X dist2 (mi , x) (2.6) i=1 x∈Ci Formal ist dies ist in der Gleichung 2.6 dargestellt. Für jeden Datenpunkt ist der Fehler die Distanz zum nächstliegenden Schwerpunkt. x ist hierbei ein Datenpunkt des Clusters Ci , mi ist der Schwerpunkt von Ci . Die Berechnung der Schwerpunkte erfolgt, nachdem die Summe der quadrierten Abweichung berechnet wurde, nach Gleichung 2.7. mi = 1 X x |Ci | x inC (2.7) i Die aufwändigste Berechnung in k-Means ist die Bestimmung der Distanzen der Datenpunkte zu den Clusterschwerpunkten. Hierbei ist zu beachten, dass diese unabhängig voneinander sind, also parallel berechnet werden können. Jedoch werden nach jeder Iteration die Clusterschwerpunkte angepasst; die Iterationen müssen folglich sequentiell berechnet werden. Abbildung 2.6: beispielhafte Darstellung des k-Means Algorithmus Ein Beispiel ist in Abbildung 2.6 gezeigt; die Schwerpunkte sind hier als Kreise, die Datenpunkte als Quadrate dargestellt und die Farbe der Quadrate symbolisiert die Zuordnung zu Schwerpunkten. Im ersten Bild sind farblich die initialen Schwerpunkte hervorgehoben. Im zweiten Bild sind die Datenpunkte hinsichtlich der aktuellen Schwerpunkte (blass gefärbt) dargestellt, während Schwerpunkte in kräftigem Grün und Rot die neu 25 berechneten abgebildet sind. Das dritte Bild stellt schließlich die zweite Iteration dar, in der erneut die Zuordnungen aktualisiert und auf Basis dieser die Schwerpunkte neu berechnet wurden. Ausschlaggebend für die Komplexität des Algorithmus ist an erster Stelle die Anzahl der clusternden Punkte n. Aber auch die Anzahl der Cluster K, die Anzahl der Punktattribute d sowie die Anzahl der Iterationen I ist zu berücksichtigen, wobei die Anzahl der Cluster üblicherweise kleiner als die Anzahl der Punkte ist und nur wenige Iterationen benötigt werden. Daraus ergibt sich, dass die Zeitkomplexität O(n ∗ K ∗ I ∗ d) ist. Nachteile Ein Problem des k-Means-Algorithmus ist, dass er ohne Eingreifen von außen keine optimale Lösung berechnet; die Qualität des Ergebnisses hängt stark von Anzahl und Lage der gewählten Schwerpunkte ab, die zudem vorgegeben werden müssen. Ein weiterer wichtiger Punkt ist, dass k-Means die Bildung kugelförmiger Cluster anstrebt. Dies liegt an der Minimierung der Abstände zwischen Datenpunkten und ClusterSchwerpunkten. DBSCAN Der DBSCAN-Algorithmus [EKSX96] ist ein dichtebasierter Algorithmus, der ein partitioniertes Clustering berechnet. Zu den wichtigsten Eigenschaften des Algorithmus zählt, dass die Anzahl der Cluster nicht vorab festgelegt werden muss, da der Algorithmus Cluster selbstständig erkennen kann. Weiterhin kann der DBSCAN-Algorithmus Cluster von beliebiger Form erkennen und unterliegt somit auch hier keiner Limitierung wie der k-Means Algorithmus. p ε Abbildung 2.7: -Nachbarschaft Ein zentraler Aspekt des Algorithmus ist die -Nachbarschaft (Abbildung 2.7). Als Nachbarschaft eines Punktes p werden alle Punkte bezeichnet, die sich innerhalb des Umkreises mit dem Radius befinden. Auf Grundlage der -Nachbarschaft lassen sich nun die 26 drei verschiedenen Klassen von Punkten definieren, die im DBSCAN-Algorithmus auftreten: • Kernpunkte, die sich dadurch auszeichnen, dass die Anzahl von Punkten in ihrer Nachbarschaft einen bestimmten Schwellwert überschreitet; diese werden als dicht bezeichnet. • Randpunkte, die selber nicht dicht sind, sich jedoch innerhalb der Nachbarschaft eines Kernpunkts befinden. Diese werden auch dichte-erreichbar genannt • Rauschpunkte, die weder dicht noch dichte-erreichbar sind Grundlage des Algorithmus ist das Prinzip der Dichte-Verbundenheit. Dieses besagt, dass zwei Punkte dichte-erreichbar sind, wenn sie durch eine Kette von Kernpunkten erreichbar sind. Die Punkte, die durch diese dichte-erreichbaren Punkte verbunden sind, bilden eine dichte-verbundene Menge, die auch als Cluster bezeichnet wird. Der DBSCAN-Algorithmus (Algorithmus 2.8) startet mit einer Menge von zu clusternden Datenpunkten, einem Wert für , der die Größe der Nachbarschaft eines Punktes festlegt, sowie einem Wert minP ts, der definiert, ab welcher Nachbarschaftsgröße ein Punkt ein Kernpunkt ist. Zu Beginn sind zusätzlich alle Punkte als unklassifiziert markiert. Der Algorithmus durchläuft die Punktmenge vollständig (Zeile 3-7), überprüft jedoch nur für noch nicht klassifizierte Punkte, ob von diesen ausgehend ein Cluster gebildet werden kann. Die Beschränkung auf unklassifizierte Punkte erfolgt, da durch das Prinzip der Dichte-Verbundenheit Cluster während ihrer Entdeckung bis zu ihrer maximalen Ausdehnung erweitert werden. Die Beschränkung verhindert folglich eine erneute Betrachtung von bereits zugeordneten Punkten zum Zwecke der Suche und Erkennung von Clustern. Algorithmus 2.8 Der DBSCAN-Algorithmus ([EKSX96] angepasst) 1: 2: 3: 4: 5: 6: 7: input: points - dataset to be clustered input: ε - size of the neighborhood input: minPts - points required for a core point procedure DBSCAN(points, ε, minPts) clusterId = 0 for i from 1 to points.size do point = points.get(i); if point.clusterId = UNCLASSIFIED then if ExpandCluster(points, point, clusterId, ε, minPts) then clusterId = clusterId + 1 Die Clustererweiterung (Algorithmus 2.9) startet mit der gesamten zu clusternden Punktmenge, dem zur Suche und Erweiterung gewählten Startpunkt, dem aktuellen Cluster- 27 Identifikator und den bereits aus Algorithmus 2.8 bekannten Größen und minP ts als Eingabe. Algorithmus 2.9 DBSCAN - Clustererweiterung ([EKSX96] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: input: points - dataset to be clustered input: point - starting point for cluster expansion input: clusterId - identificator of the current cluster input: ε - size of the neighborhood input: minPts - points required for a core point output: boolean - indicates, whether cluster was found procedure ExpandCluster(points, point, clusterId, ε, minPts) seeds = points.regionQuery(point, eps) if seeds.size < minPts then points.changeClusterId(point, NOISE ) return FALSE else points.changeClusterIds(seeds, clusterId ) seeds.delete(point) while seeds 6= ∅ do currentPoint = seeds.first() result = points.regionQuery(currentPoint, ε) if result.size ≥ minPts then for i from 1 to result.size do resultP oint = result.get(i) if resultP oint.clusterId IN (UNCLASSIFIED, NOISE ) then if resultP oint.clusterId == UNCLASSIFIED then seeds.append(resultPoint) points.changeClusterId(resultPoint, clusterId ) seeds.delete(currentP oint); return TRUE; Als erstes wird für den Startpunkt die Nachbarschaft im Umkreis, der durch definiert ist, abgefragt (Zeile 3-5) und in der Variable seeds gespeichert; diese wird später zur Erweiterung des Clusters verwendet. Wenn die Größe der Nachbarschaft größer gleich minP ts ist, so handelt es sich bei dem Punkt um einen Kernpunkt. Ist die Nachbarschaft jedoch kleiner als minP ts, so wird der Punkt vorläufig als Rauschen klassifiziert; diese Zuordnung ist jedoch noch nicht endgültig, da der Punkt noch von einem Kernpunkt dichte-erreichbar sein kann. Falls es sich bei dem Punkt um einen Kernpunkt handelt, so werden zuerst alle Punkte der Nachbarschaft mit dem aktuellen Cluster-Identifikator versehen und der aktuelle Punkt gelöscht (Zeile 7-8). In den Zeilen 9 bis 19 findet nun die eigentliche Erweiterung des Clusters statt. Aus der Menge seeds wird der erste Punkt entnommen und die Nachbarschaft hinsichtlich abgefragt und in der Variable result gespeichert. Wenn die Kardinalität von result größer gleich minP ts ist, so ist auch der 28 aktuelle Punkt ein Kernpunkt und wird zur Erweiterung des Clusters genutzt. Hierfür wird die vollständige Nachbarschaft durchlaufen (Zeile 13 - 19) und diesen die aktuelle Cluster-ID zugewiesen, wenn es sich um unklassifizierte oder Rausch-Punkte handelt (Zeile 18). Unklassifizierte Punkte werden zusätzlich der Menge seeds hinzugefügt und bei der Erweiterung des Clusters genutzt. Der DBSCAN-Algorithmus (Algorithmus 2.8) durchläuft in seiner äußeren Schleife jeden Punkt genau einmal. Die Suche nach Clustern mittels der Funktion ExpandCluster wird jedoch nur für noch nicht-klassifizierte Punkte ausgeführt. Wenn eine Clustererweiterung (Algorithmus 2.9) initiiert wird, wird zunächst die Nachbarschaft des Ausgangspunktes mit einer räumlichen Suche bestimmt. Die Punkte im Ergebnis dieser Abfrage werden zusammen mit unklassifizierten Punkten aus weiteren Nachbarschaftsanfragen für weitere Suchen verwendet. Geeignete Indexstrukturen wie etwa ein R-Baum ermöglichen eine Nachbarschaftssuche in einer Laufzeit von O(log(n)), während die Laufzeit ohne Indexunterstützung O(n2 ) beträgt. Folglich ergibt sich bei der Nutzung einer räumlichen Indexstruktur näherungsweise eine Laufzeit im Bereich von O(n ∗ log(n)) für den DBSCAN-Algorithmus. Beispiel Eine beispielhafte Szene zur Beschreibung des DBSCAN-Algorithmus ist in Abbildung 2.8 abgebildet. Die Kreise stellen hier die Nachbarschaft der Punkte p und q dar, welche einen Radius von besitzen. Weiterhin sei eine Kardinalität der Punktnachbarschaft von 5 notwendiges Kriterium, damit ein Punkt ein Kernpunkt ist. p ε q r s Abbildung 2.8: DBSCAN - Beispiel Der Punkt p ist, gemäß vorheriger Definition, ein Kernpunkt, da er über eine genü- 29 gend große Nachbarschaft verfügt. Innerhalb dessen Nachbarschaft liegt der Punkt q, der somit dichte-erreichbar von p ist. Zusätzlich ist q ein Kernpunkt, da die Kardinalität der Nachbarschaft von q ≥ 5 ist; q gehört folglich dem gleichen Cluster an wie p. Für den Punkt r gilt, dass er von q dichte-erreichbar ist. Jedoch ist die Kardinalität der Nachbarschaft von r < 5, somit ist r ein Randpunkt. Der Punkt s besitzt eine Nachbarschafts-Kardinalität von s < 5 und ist somit kein Kernpunkt. Zusätzlich ist s auch von keinem Kernpunkt erreichbar; s ist somit Rauschen bzw. ein Rauschpunkt. Nachteile Der gravierendste Nachteil des DBSCAN-Algorithmus tritt zu Tage, wenn der Datensatz Gebiete mit sehr stark voneinander abweichenden Punktdichten enthält. Der Algorithmus ist bei derartigen Datensätzen nicht in der Lage, ein brauchbares Clustering zu erzeugen, da es keine Kombination der Parameter und minP ts gibt, die für alle vorhandenen Cluster passt. 2.4 Die UNION-Find-Struktur Bestimmte Probleme, wie beispielsweise hierarchisches agglomeratives Clustering, haben zu Beginn eine Menge von Objekten, die jedes für sich eine eigene Menge bilden. Im weiteren Verlauf werden diese Mengen in einer bestimmten Reihenfolge zusammengefügt, um eine gröbere Partitionierung zu bilden, die jedoch immer vollständig und disjunkt bleibt. Zusätzlich finden Abfragen statt, um die Menge in Erfahrung zu bringen, die ein bestimmtes Objekt enthält. Diese Anforderungen lassen sich durch die Operationen Union und Find auf Mengen realisieren. Die Union-Operation führt hierbei die Vereinigung zweier Mengen durch, während die Find-Operation für eine gegebenes Element die enthaltende Menge liefert. ADT DisjointSet Der abstrakte Datentyp DisjointSet (bzw. Union-Find-Struktur oder auch Merge-FindStruktur) besteht aus einer Menge von Teilmengen, hier als Komponenten bezeichnet und den folgenden Operationen: • Union(A, B) bestimmt die Vereinigung der Teilmengen [A] und [B] und definiert als Namen ein repräsentatives Element, dass in der neu entstandenen Menge enthalten sein muss • Find(x) liefert den Namen der Komponente zurück, in der x enthalten ist 30 • Initial erzeugt eine Partitionierung, die aus disjunkten ein-elementigen Mengen besteht. Ein einfache Implementierung auf Array-Basis findet sich in Algorithmus 2.10 unter der Bezeichnung Merge-Find-Set(MFSET), basierend auf [AHU83]. Die Operation Initial erzeugt ein neues MFSET, in der sich jedes Element in einer eigenen Partition bzw. Menge befindet. Die Menge, in der ein Element x enthalten ist, liefert die Find-Operation, indem einfach der Wert für x im MFSET zurückgegeben wird. Und auch die Vereinigung einer Menge A mit einer Menge B ist in der array-basierten Implementierung des MFSETs durch die Operation Merge sehr einfach; das Array, welches die Komponenten der Struktur enthält, wird von Anfang bis Ende durchlaufen und jedes Feld, das der Menge B zugewiesen ist wird der Menge A zugewiesen. Algorithmus 2.10 ADT DisjointSet über {1, . . . , n}, Array-Implementierung ([AHU83] angepasst) output: returns new MFSET procedure Initial C := Array[1, . . . , n] of [1, . . . , n] for x := 1 to n do C[x] := x 5: return C 1: 2: 3: 4: input: x - integer in the range {1, . . . , n} input: C - an existing MFSET output: integer of the class, that includes x 6: procedure Find(x, C) 7: return C[x] 8: 9: 10: 11: 12: input: A, B - integers in the range {1, . . . , n}, that represent a class input: C - an existing MFSET output: MFSET with merged classes A and B procedure Merge(A, B; C) for x = 1 to C.length do if C[x] = B then C[x] = A return C Die Laufzeitkomplexität für die Initialisierung eines MFSET beträgt O(n), die Abfrage eines Wertes für eine Komponente x beträgt O(1). Die Implementierung zur Vereinigung zweier Klassen benötigt eine Laufzeit von O(n), da für die Suche nach einer Klasse B das vollständige MFSET durchlaufen werden muss. Wenn nun Komponente für Komponente das MFSET mit der Vereinigungs-Operation zu einer einzigen Klasse vereinigt wird, so 31 werden maximal n − 1 Operationen ausgeführt, so dass die Laufzeit insgesamt O(n2 ) beträgt. Eine deutliche Verbesserung der Laufzeit ist zu erreichen, wenn das MFSET als Baum und nicht als Array implementiert wird. Diese ermöglicht für die Suche nach der Klasse von Komponenten eine Laufzeit von O(log(n)), für die Vereinigung von zwei Klassen sogar eine konstante Laufzeit von O(1). 2.5 Parallelrechner Parallelrechner sind Computer, in denen Rechenoperationen nebenläufig ausgeführt werden können. Die Parallelität basiert auf der Grundlage, dass Probleme in kleinere Probleme zerlegt und parallel zeitgleich ausgeführt werden können. Parallelrechner lassen sich grundsätzlich in der Ebene, auf der die Parallelisierung stattfindet, unterteilen. So kann ein einzelner Computer bereits mehrere Rechenelemente beeinhalten, wobei dieser als Multi-Cores im Falle mehrerer Kerne in einem Prozessor oder Multi-Prozessor im Falle mehrerer Prozessoren bezeichnet wird; jedoch arbeiten diese Rechenelemente nicht zwingend alle an der gleichen Aufgabe sondern werden teilweise beispielsweise durch Aufgaben des Betriebssystems ausgelastet. Rechencluster und Rechengrids zeichnen sich dadurch aus, dass sie ein Verbund von Rechnern sind, die gemeinsam an der gleichen Aufgabe arbeiten. Zugleich ist die Entwicklung von Software für Parallelrechner aufwändiger als für sequentielle Rechner, da Probleme durch gleichzeitigen Zugriff auf Datenstrukturen oder Race-Conditions auftreten können, die zu nicht vorhersehbarem Programmverhalten führen können. Dieses führt dazu, dass Kommunikation und Synchronisierung zwischen den parallel laufenden Tasks stattfinden muss; dieses ist gleichzeitig einer der Faktoren, der maßgeblich für die Performance von paralleler Software verantwortlich ist, da nur ohne Kommunikation und Synchronisierung eine wirklich parallele Ausführung von Tasks erfolgt. Dieser Sachverhalt lässt sich anschaulich durch das Amdahl’sche Gesetz verdeutlichen. Ein paralleles Programm besteht aus zwei wesentlichen Komponenten, einem sequentiellen Anteil (fs ) und einem parallelen Anteil (fp ). Diese sind normiert und bewegen sich im Bereich 0 ≤ fs ≤ 1 und folglich fp = 1 − fs . Für einen Parallelrechner mit n Prozessoren ergibt sich nun eine Beschleunigung (Speed-Up) von: sp(n) ≤ 1 fp n + fs ≤ 1 fs Wenn die Anzahl der Prozessoren gegen unendlich geht, geht folglich die Ausführungszeit des parallelen Anteils gegen null, so dass für diesen Fall die Ausführungszeit des parallelen Programms nach unten durch den sequentiellen Programmanteil beschränkt 32 ist. Folglich ist während des Entwurfs und der Implementierung auf eine möglichst starke Entkopplung der Teilaufgaben zu achten, um Synchronisation und Kommunikation, und somit den sequentiellen Anteil, möglichst klein zu halten. Ein weiteres wichtiges Unterscheidungsmerkmal für Parallelrechner ist die Speicherarchitektur. Diese können einerseits einen gemeinsamen globalen Speicher besitzen (SharedMemory-Architektur), der über einen gemeinsamen Adressraum adressiert wird; die Kommunikation zwischen Prozessoren erfolgt in diesem Fall implizit über Bereiche im gemeinsamen Speicher. Andererseits kann jeder Prozessor über einen eigenen lokalen Speicher mit eigenem Adressraum verfügen, auf den andere Prozessoren keinen Zugriff haben; die Kommunikation zwischen Prozessoren erfolgt in diesem Fall explizit über Nachrichten, was als Message-Passing-Architektur bezeichnet wird. 2.6 MapReduce MapReduce[DG04] bezeichnet sowohl ein Programmiermodell als auch eine Implementierung für die Verarbeitung und Generierung großer Datenmengen. Kern des Programmiermodells sind zwei Phasen, die M ap- und die Reduce-Phase, welche inspiriert sind durch die gleichnamigen Funktionen aus der funktionalen Programmierung. Programme, die nach dem MapReduce-Programmiermodell entwickelt wurden, lassen sich automatisch durch MapReduce-Implementierungen, wie beispielsweise Hadoop, auf einem Cluster mit einer Vielzahl von Rechenknoten ausführen, ohne dass Erfahrung für die Entwicklung paralleler Software oder verteilter Systeme notwendig ist. Ein weiterer wichtiger Faktor ist, dass die Ausführung in eine Vielzahl von Tasks aufgeteilt wird, die unabhängig voneinander ausgeführt werden. Der Vorteil dieser Lösung ist, dass selbst der Ausfall von Teilen des Rechenclusters nicht einen Neustart der Berechnung bedingt; Tasks, die nach Ablauf einer bestimmten Frist nicht erledigt worden sind, werden einfach abermals zur Bearbeitung freigegeben und von verbleibenden Teilen des Rechenverbunds durchgeführt. Im Folgenden wird nun zunächst das hinter MapReduce stehende Programmiermodell erläutert, anschließend wird der hinter einem MapReduce-Programm steckende Prozess vorgestellt. Hadoop, eine MapReduce-Implementierung, wird in Abschnitt 2.6.3 präsentiert. 2.6.1 Programmiermodell In der Map-Phase (2.8) wird die Map-Funktion parallel auf jedes Schlüssel-Wert-Paar (k1 , v1 ) der Eingabe angewendet. Als Ergebnis liefert die Map-Funktion für ein eingegebenes Paar eine Liste von Schlüssel-Wert-Paaren. Im Anschluss werden vom MapReduceFramework alle Schlüssel-Wert-Paare mit gleichem Schlüssel aus allen Listen eingesam- 33 melt und unter Beachtung des Schlüssels gruppiert. M ap(hk1 , v1 i) = list(hk2 , v2 i) (2.8) In der Reduce-Phase (2.9) wird die Reduce-Funktion parallel für jeden Schlüssel und die dazugehörige Liste von Werten ausgeführt und die Liste in reduzierter Form, beispielsweise durch Aufsummierung der Werte, als Schlüssel-Wert-Paar zurückgegeben. Reduce(hk2 , list(v2 )i) = hk2 , v3 i (2.9) MapReduce - Beispiel Das kurze Beispiel in Quelltext 2.1 stellt ein MapReduce-Programm dar, welches für ein übergebenes Dokument für jedes Wort die Anzahl der Vorkommen innerhalb des Dokumentes zählt und ausgibt. 1 2 3 4 5 map ( String key , String value ): # key : document name # value : document contents for each substring w in value : Output <w ,1 >; 6 7 8 9 10 11 12 13 reduce ( String key , Iterator values ): # key : a word # value : a list of numbers int result = 0; for each v in values : result += v ; Output <key , result >; Quelltext 2.1: MapReduce - Wortvorkommen in einem Dokument zählen [DG04] Die Aufgabe der Map-Funktion (Zeile 1) ist in diesem Beispiel sehr einfach. Sie durchläuft alle Wörter des übergebenen Dokuments (Zeile 4) und gibt für jedes gefundene Wort ein Schlüssel-Wert-Paar der Form hWort, 1i aus. Die Reduce-Funktion summiert die Anzahl der Vorkommen (Liste von ”values”) eines Wortes (”key”) auf und gibt diese zurück. Eine beispielhafte Ausführung des Beispiels in Quelltext 2.1 ist in Abbildung 2.9 zu sehen. 2.6.2 Der Map-Reduce Prozess Zu Beginn des Prozesses wird zunächst eine Aufteilung der zu verarbeitenden Daten in sogenannte Shards vorgenommen. Anschließend wird der Prozess mit der Map-Phase 34 Die Sonne scheint und der Himmel ist blau. Es scheint ein schöner Tag zu werden, wenn die Sonne weiter scheint. Map < < < < < < < < < < < < < < < < < < < < die , 1> sonne , 1> scheint , 1> und , 1> der , 1> himmel , 1> ist , 1> blau , 1> es , 1> scheint , 1> ein , 1> schöner , 1> tag , 1> zu , 1> werden , 1> wenn , 1> die , 1> sonne , 1> weiter , 1> scheint , 1> Reduce < < < < < < < < < < < < < < < < blau , 1> der , 1> die , 2> ein , 1> es , 1> himmel , 1> ist , 1> scheint , 3> schöner , 1> sonne , 2> tag , 1> und , 1> weiter , 1> wenn , 1> werden , 1> zu , 1> Abbildung 2.9: MapReduce-Beispiel gestartet, in dem für jeden Shard ein Map-Task erstellt wird, die unabhängig voneinander ausgeführt werden. Map Reduce Shard 1 Shard 2 Map Ergebnis Shard 3 Reduce Map Abbildung 2.10: Der MapReduce-Prozess Wenn ein Map-Task gestartet wird, liest dieser den Teil der Eingabedaten, die ihm zugeordnet wurden. Aus diesem parst er die Schlüssel-Wert-Paare und leitet diese an die Map-Funktion weiter. Diese bearbeitet die Eingabe-Paare gemäß ihrer Definition und gibt sie wieder als Schlüssel-Wert-Paare aus. Die Ausgabedaten der Map-Tasks werden nun in der Reduce-Phase an Reduce-Tasks verteilt, was als ”Shuffling” bezeichnet wird. Die Verteilung erfolgt über eine Partitionierungsfunktion, welche die Ergebnisse der Map-Tasks auf verschiedene disjunkte Partitionen verteilt, wobei für jede Partition ein Reduce-Task erzeugt wird. Die Partitio- 35 nierungsfunktion lässt sich beispielsweise einfach mittels einer Hash-Funktion umsetzen (z.B. hash(key) mod R mit R = Anzahl der Reducer). Nun durchläuft der Reduce-Tasks die Schlüssel-Wert-Paare der ihm zugewiesenen Partition und sortiert diese nach dem Schlüssel, so dass die Werte nach diesen gruppiert sind. Die derart gruppierten Paare werden nun gruppenweise an die Reduce-Funktion übergeben, von dieser bearbeitet, und in Form eines einzelnen Schlüssel-Wert-Paares ausgegeben. Mit der Combine-Phase steht zusätzlich eine optionale Funktionalität bereit, die gegebenenfalls Kommunikationskosten reduzieren kann. Funktional ist diese ähnlich wie die Reduce-Phase, wird jedoch lokal auf den Knoten nach Abschluss der Map-Phase ausgeführt. Der Sinn dieser Phase ergibt sich aus der Betrachtung des Beispiels im Quelltext 2.1, dem Wortzählen. Jeder Knoten berechnet aus dem zugewiesenen Textfragment Schlüssel-Wert-Paare der Form h0 gefundenes Wort0 ,0 10 i. So gibt es beispielsweise eine Häufung von Wörtern, etwa ’und’, ’der’ oder ’das’. Eine Reduktion der Kommunikationskosten lässt sich erreichen, indem diese Paare bereits lokal auf den Knoten zusammengefasst werden, so dass beispielsweise nicht 50 Paare h0 das0 ,0 10 i ausgegeben werden, sondern nur h0 das0 ,0 500 i. 2.6.3 Hadoop Hadoop ist eine MapReduce-Implementierung, die unter dem Dach der Apache Software Foundation entwickelt wird. In der Version 2.2.0, die in dieser Arbeit genutzt wird, besteht Hadoop aus drei wesentlichen Komponenten: Hadoop Distributed Filesystem, Hadoop YARN und Hadoop MapReduce, wobei Hadoop MapReduce das Programmiermodell darstellt, welches in allgemeiner Form bereits in Abschnitt 2.6.1 auf Seite 33 vorgestellt wurde. Alle Komponenten wurden vor dem Hintergrund entworfen, dass Ausfälle von einzelnen Rechnern oder ganzen Racks in großen verteilten Systemen an der Tagesordnung sind; die Erkennung und Behandlung von Ressourcen-Ausfällen ist aus diesem Grund direkt in Hadoop implementiert worden. Hadoop Filesystem Das Hadoop Filesystem (HDFS) ist ein Dateisystem, welches entwickelt wurde, um verschiedene Anforderungen zu erfüllen. Zum einen soll das Datensystem in der Lage sein, sehr große Dateien mit einer Größe von mehreren hundert Terabytes zu speichern. Zum anderen soll der Dateizugriff in Form von Streams möglich sein. Hintergrund hierfür ist, dass für die Analyse Daten einmalig geschrieben, aber vielfach gelesen werden. In jeder durchgeführten Datenanalyse werden die gesamten zu analysierenden Daten oder große Teile davon gelesen, so dass der ausschlaggebende Performance-Faktor für eine Leseope- 36 ration die Gesamtzeit für das Lesen der gesamten Daten ist, nicht der Zugriff auf einzelne Teile einer Datei. Außerdem soll das Dateisystem auf normaler Standard-Hardware arbeiten, die im Gegensatz zu wesentlich teurerer hoch-zuverlässiger Hardware, bei einer großen Anzahl von Knoten in einem verteilten System eine hohe Ausfallwahrscheinlichkeit für einzelne Knoten hat. Im Falle eines Komponentenausfalls innerhalb des verteilten Systems soll das Dateisystem ohne Unterbrechungen für den Anwender weiter funktionieren; dies bedeutet, dass das Dateisystem trotz eines Ausfalls von Knoten weiter verfügbar bleibt und ein Ausfall zu keinem Datenverlust führt. HDFS benutzt als grundlegende Aufteilung des Speichers, wie andere Dateisysteme, Blöcke von Speicher. Jedoch ist die Blockgröße mit 64 Megabyte oder mehr bedeutend größer, als dies bei Dateisystemem für Standard-Massenspeicher, wie beispielsweise NTFS oder EXT4, mit 4 Kilobyte. Dateien werden in Stücke entsprechend der Blockgröße aufgeteilt und unabhängig voneinander gespeichert. Im Unterschied zu normalen Dateisystemen belegt ein Block, der nicht vollständig gefüllt ist, jedoch nicht den kompletten Speicher des Blockes im zugrunde liegenden Speichers sondern nur den tatsächlich vom Block verwendeten Speicher. Die Abstraktion des Speichers in Blöcke bringt einige Vorteile mit sich. Zum einen ist es möglich, dass Dateien größer sein können als ein einzelner Massenspeicher der lokalen Knoten. Außerdem vereinfacht die Verwendung von Blöcken die Replikation, um die Verfügbarkeit und Fehlertoleranz zu verbessern. Um mit nicht verfügbaren Blöcken in Folge von Hardware-Fehlern oder dem Ausfall von Knoten umzugehen, werden Blöcke auf eine vom Administrator festzulegende Anzahl von Knoten repliziert, so dass diese gegebenenfalls von einem anderen Knoten gelesen werden kann. Das verteilte Dateisystem wird aus zwei verschiedenen Typen von Knoten aufgebaut, einem Namenode (Master) und mehreren Datanodes (Worker). Der Namenode verwaltet das Dateisystem in Form eines Dateisystem-Baumes und Metadaten für alle Dateien und Verzeichnisse des Dateisystems. Diese werden lokal auf dem Namenode persistent gespeichert. Zusätzlich verfügt der Namenode hinsichtlich der Blöcke, aus denen eine Datei besteht, über die Information, auf welchen Datanodes diese physikalisch gespeichert sind. Diese wird jedoch nicht persistent gespeichert sondern während des Starts des Dateisystems aus den übermittelten Informationen der Datanodes rekonstruiert. Der Zugriff auf das Dateisystem durch Nutzer erfolgt über einen Client, der nach Aufruf durch den Nutzer mit dem Namenode und den Datanodes kommuniziert. Die Schnittstelle für den Dateizugriff auf das verteilte Dateisystem orientiert sich stark am Portable Operating System Interface(POSIX), so dass der Nutzer kein Wissen über die Funktionsweise des Namenodes oder der Datanodes benötigt. Wie zuvor bereits angedeutet sind die Datanodes für den Großteil der Arbeit des verteilten Dateisystems verantwortlich. Auf Anweisung von Clients oder des Namenodes speichern und Empfangen sie Datei-Blöcke und liefern periodisch eine Liste der von 37 ihnen gespeicherten Blöcke an den Namenode. Aus dem Aufbau des Dateisystems ist ersichtlich, dass der Namenode eine zentrale Komponente des verteilten Dateisystems und ein Single-Point-of-Failure ist. Tatsächlich kann das Dateisystem ohne den Namenode nicht benutzt werden und als schwerwiegender Nachteil kommt zusätzlich hinzu, dass ein Ausfall des Namenodes zum Verlust aller Dateien des Dateisystems führt, da nur der Namenode über die Informationen zur Rekonstruktion der Dateien aus den Blöcken verfügt. YARN Das Yet-Another-Resource-Negotiator (YARN)-Framework ist die zentrale Komponente von Hadoop, welche zum einen die Ressourcen in Form von Rechenknoten verwaltet und zum anderen für das Scheduling der von Benutzern gestarteten MapReduceAnwendungen verantwortlich ist. YARN erfüllt diese zwei wesentliche Aufgaben durch zwei verschiedene Dienste sind. Einerseits werden die Berechnungs-Ressourcen des Clusters durch den Resource Manager verwaltet, andererseits steuert und überwacht YARN den Lebenszyklus von Anwendungen (z.B. Verfolgung des Ausführungsstatus von Tasks, fehlgeschlagenen Tasks neu starten), die auf dem Cluster ausgeführt werden, wobei jedes MapReduce-Programm einen eigenen Application-Master besitzt. Die Grundidee ist, dass der Application-Master mit dem Resource-Manager die Nutzung von Resourcen des Clusters in Form von Containern mit festgelegten Speicherlimits aushandelt und die Prozesse der MapReduce-Anwendung in diesen Containern ausführt. Die Überwachung der Container erfolgt von den Node Managern, die auf jedem Clusterknoten laufen und dafür Sorge tragen, dass die Anwendungen sich an die durch die Container vorgegebenen Ressourcenbeschränkungen halten. Für die Ausführung einer MapReduce-Anwendung mit dem YARN-Framework sind die folgenden Komponenten notwendig: • Ein Client, der die auszuführende MapReduce-Anwendung übermittelt • Der YARN-Resource-Manager, welcher die Ressourcenzuweisung des Clusters koordiniert • Die YARN-Node-Manager, die Container für Tasks auf Clusterknoten starten und überwachen • Der Application-Master der MapReduce-Anwendung, der die Ausführung von Tasks der Anwendung überwacht. Der Application-Master und die Tasks werden in Con- 38 tainern ausgeführt, die vom Resource-Manager zugeteilt werden und durch die Node-Manager verwaltet werden. • Das verteilte HDFS-Dateisystem, welches genutzt wird, um Dateien der Tasks zwischen den Komponenten auszutauschen. MapReduce program 1: run job 2: get new application Job Resource Manager 4: submit applications client JVM client node resource manager node 5a: start container 8: allocate resources Node Manager 3: copy job resources 5b: launch 6: initialize job Node Manager 9a: start container MRAppMaster 9b: launch task JVM node manager node YarnChild 7: retrieve input splits Shared FileSystem (e.g. HDFS) 10: retrieve job resources 11: run MapTask or ReduceTask node manager node Abbildung 2.11: Ablauf der Ausführung einer MapReduce-Anwendung mit dem YARNFramework [Whi12] Der Ablauf einer MapReduce-Anwendung ist in Abbildung 2.11 dargestellt und beginnt, indem die Anwendung, auch Job genannt, auf einem Client gestartet wird (1). Für diesen Job wird vom Resource Manager ein Identifikator angefordert (2), um diesen und die zugehörigen Daten im System zuordnen zu können. Der Client berechnet nun die Input-Splits, bzw. die Aufteilung der Eingabedaten, und kopiert diese Information zusammen mit der Anwendungskonfiguration und der Anwendung, welche in Form eines JAR-Archivs an Hadoop übergeben wird, in das verteilte Dateisystem (3). 39 Die Anwendung wird nun zur Ausführung an den Resource Manager übergeben (4). Der Resource-Manager liefert die Anfrage zur Ausführung einer Anwendung an einen Job-Scheduler weiter, der einen Container anfordert, in welchem der Resource-Manager eine Instanz des Application Masters unter Verwaltung des Node Managers startet (5a und 5b). Der Application Master startet die Ausführung der Anwendung, in dem zunächst eine Reihe von Variablen zur Verfolgung des Fortschritts der Anwendung initialisiert werden, da im Verlauf der Anwendung Fortschrittsmeldungen von Tasks und Meldungen über abgeschlossene Tasks beim Application Master eingehen (6). Im Anschluss wird die vom Client berechnete Aufteilung der Eingabedaten vom Application-Master geladen und ein Map-Task für jeden Teil der Eingabe erstellt; zusätzlich wird eine festgelegte Anzahl von Reduce-Tasks erzeugt (7). Nun fordert der Application Master vom Resource Manager Ressourcen in Form von Containern für die Ausführung der Map- und Reduce-Tasks an (8). Die Anforderungen enthalten zusätzlich Information hinsichtlich der Lokalität von zu Tasks gehörenden Daten, insbesondere Informationen, auf welchen Knoten und in welchen Racks der Teil der Eingabe liegt. Diese Informationen können vom Scheduler für die Entscheidung genutzt werden, um Tasks im Idealfall auf einem Knoten auszuführen, auf dem die Daten bereits lokal vorhanden sind, mindestens jedoch auf einem Knoten im selben Rack. Mit den Anfragen sind zusätzlich Anforderungen von Speicher verbunden, so dass für einzelne Tasks gezielte Mengen von Speicher für die Container angefordert wird. Sobald einem Task durch den Resource Manager ein Container zugewiesen wurde, startet der Application Master den Container durch Benachrichtigung des Node Managers (9a und 9b). Der Task wird durch eine Java-Anwendung in einer Java Virtual Machine (JVM) ausgeführt. Jedoch müssen zuvor die Daten geladen werden, die für die Ausführung des Tasks erforderlich sind, unter anderem die Konfiguration des Tasks und das JAR-Archiv, welches die MapReduce-Anwendung enthält (10). Schließlich wird die der Map- oder Reduce-Task ausgeführt (11). Fehlerbehandlung Zusätzlich ist für MapReduce-Programme, die unter Nutzung des YARN-Frameworks ausgeführt werden, der Fehlerfall zu betrachten. Dies betrifft Tasks, den Application Master, Node Manager und den Resource Manager. Falls ein laufender Tasks fehlschlägt, wird diese Information an den Application Master 40 übermittelt und der Ausführungsversuch des Tasks als fehlgeschlagen markiert. Ebenso werden Tasks als fehlgeschlagen markiert, die sich nicht innerhalb eines bestimmten Zeitintervalls beim Application Master melden. Die fehlgeschlagenen Tasks werden vom Application Master erneut zur Ausführung vergeben und ein erneuter Ausführungsversuch unternommen, wobei die Anzahl der Versuche variabel ist. Ebenso wie einzelne Tasks kann auch für Ausführung einer kompletten MapReduceAnwendung mehrere Versuche unternommen werden. Ein Application Master sendet periodisch Lebenszeichen (Heartbeats) an den Resource Manager, so dass der Resource Manager einen Fehler eines Application Masters an ausbleibenden Lebenszeichen erkennen, und eine neue Instanz starten kann. Hierbei ist es möglich, den Status eines fehlgeschlagenen Application Masters wiederherzustellen; bereits erfolgreich ausgeführte Tasks müssen so nicht nochmals ausgeführt werden. Beim Ausfall eines Node Managers hört dieser auf, Lebenszeichen an den Resource Manager zu senden. Dieser kann ihn daraufhin aus dem Pool verfügbarer Knoten entfernen. Tasks oder Anwendungen, die auf diesem Knoten laufen, können gemäß der Vorgehensweisen von fehlgeschlagenen Tasks oder Fehlern des Applications Masters wiederhergestellt werden. Am schwerwiegendsten ist der Ausfall des Resource Managers, da ohne diesen weder Anwendungen noch Tasks gestartet werden können. Jedoch wurde dieser so entworfen, dass periodisch der Zustand persistent gespeichert wird und eine neue Instanz des Resource Managers daraus wiederhergestellt werden kann. Der Zustand umfasst dabei die Node Manager und laufende Anwendungen. 2.6.4 Ein- und Ausgabeformate Ein weiterer Aspekt sind die Ein- und Ausgabeformate der Daten, die von Hadoop verarbeitet und generiert werden. Die Eingabeformate lassen sich grob in zwei Kategorien aufteilen: Dateieingaben und Datenbankeingaben. Die Dateieingaben wiederum lassen sich unterteilen in einfache Textdateien und Binärdateien. Textdateien sind unstrukturierte Eingabedaten die auf verschiedene Arten verarbeitet werden können. Zwei Beispiele hierfür sind das TextInputFormat und das KeyValueInputFormat. In ersterem entspricht jede Zeile einem Datensatz, wobei der Schlüssel der Byteoffset des Zeilenbeginns in der Eingabedatei ist und der Wert die gelesene Zeile. Zweiteres sind zum Beispiel CSV-Dateien mit zwei Spalten pro Zeile, so dass die erste Spalte der Schlüssel und die zweite der Wert ist. Binärdateien, sogennante SequenceFiles, sind Dateien, in denen Schlüssel-Wert-Paare strukturiert gespeichert werden. Strukturiert bedeutet in diesem Fall, dass zwischen jedem Schlüssel-Wert-Paar eine Marke existiert, so dass diese sauber voneinander getrennt 41 werden können. Diese haben den zusätzlichen Vorteil, dass sie komprimiert werden können und serialisierbare Datentypen nutzen können. Hadoop nutzt für die Serialisierung ein eigenes Format, sogenannte Writables. Diese sind von zentraler Bedeutung für Hadoop, da diese oft bei der Ein- und Ausgabe von Tasks als Datentypen für die SchlüsselWert-Paare verwendet werden und somit im verteilten Dateisystem gespeichert und aus diesem gelesen werden. Die beiden wichtigsten Vorteile von diesen sind, dass sie ein kompaktes Speichern und durch geringen Overhead schnelles Lesen und Schreiben erlauben. Datenbankeingaben erlauben das Einlesen von Daten aus Datenbanken über JDBCTreiber. Dieses hat jedoch den Nachteil, dass es kein Sharding, also eine Aufteilung der Eingabe, erlaubt, welches im verteilten Dateisystem durch die Speicherung in Blöcken implizit geschieht. Die Ausgabeformate von Hadoop entsprechen den Eingabeformaten, so dass diese nicht nochmals aufgeführt werden. 42 Kapitel 3 Parallele Algorithmen In den letzten Jahren führt eine immer stärkere Nutzung der Informationstechnik dazu, dass immer größere Mengen von Daten anfallen. Sequentielle Data Mining-Algorithmen sind aus verschiedenen Gründen nicht mehr in der Lage, diese Datenmengen zu bewältigen. Zum Einen steigt die Berechnungszeit in inakzeptable Höhen, zum Anderen sind die Datenmengen derart gewaltig, so dass diese von einzelnen Computern nicht mehr zu handhaben sind. Aus diesem Grund wurden verschiedene Verfahren aus dem Bereich der Assoziations- und Clusteranalyse vorgeschlagen, um diese Herausforderungen zu bewältigen und eine effiziente Analyse der Daten zur ermöglichen. Von diesen wird in diesem Kapitel eine Auswahl von Verfahren vorgestellt, die auf verschiedenen Parallelisierungsansätzen basieren. Auf eine Betrachtung der Laufzeitkomplexität der MapReduce-Algorithmen wird in dieser Arbeit verzichtet, da sich noch kein Modell zur Beschreibung von dieser etabliert hat. 3.1 3.1.1 Algorithmen zur Assoziationsanalyse Partition-Algorithmus Der Partition-Algorithmus [SON95](Algorithmus 3.1) benötigt für das Berechnen von frequent Itemsets in Abhängigkeit eines minimalen Supports ξ in einer Menge von Transaktionen zwei vollständige Suchläufe auf diesem Datenbestand. In der ersten Phase werden alle potentiellen frequent Itemsets durch eine Suche auf dem gesamten Datenbestand bestimmt. Hierfür werden die Daten in nicht überlappende Partitionen aufgeteilt. Auf diesen Partitionen wird unabhängig voneinander die Menge 43 der lokalen frequent Itemsets berechnet. Die Mengen der generierten lokalen frequent Itemsets werden anschließend zu einer Menge zusammengeführt, der Kandidatenmenge für potentielle globale frequent Itemsets. Zu Beginn der zweiten Phase wird zunächst für jedes Itemset der globalen Kandidatenmenge ein Zähler erzeugt. Nun wird der tatsächliche Support der Kandidaten auf der gesamten Datenbank berechnet, indem die Vorkommen der Kandidaten in jeder Partition gezählt und die Zähler entsprechend erhöht werden. Anschließend lässt sich durch einen einfachen Vergleich mit dem minimalen Support die Menge der frequent Itemsets berechnen. Im Gegensatz zum Apriori-Algorithmus, der global auf der vollständigen Transaktionsdatenbank arbeitet, besitzt der Partition-Algorithmus durch die Partitionierung auch eine lokale Komponente. Es findet eine Unterscheidung zwischen lokalen und globalen frequent Itemsets statt. Die verschiedenen auftretenden Itemset-Menge sind in Tabelle 3.1 dargestellt. Zusätzlich sind einige Begriffe zu definieren, die für den PartitionAlgorithmus von Bedeutung sind. Im Kontext dieses Verfahrens ist eine Partition p ∈ D ein Ausschnitt aus der gesamten Menge von Transaktionen, wobei diese als nichtüberlappend definiert sind. Zwei Partitionen sind nicht überlappend, wenn T sie paarweise disjunkt hinsichtlich der enthaltenen Transaktionen sind, formal: pi pj = ∅ | ∀i, j : i 6= j. Für diese wird nun ebenfalls ein lokaler minimaler Support definiert: dieser bezeichnet den Anteil der Transaktionen in der Partition, die ein Itemset benötigt, um als lokales frequent Itemset bezeichnet zu werden. Festgelegt ist der Wert des lokalen minimalen Supports auf ξ/|D|; ein Itemset ist also ein potentielles globales frequent Itemset, wenn dieses in mindestens einer Partition lokal häufig ist. Itemsets, die gegen den lokalen Support auf Häufigkeit getestet werden, werden als lokale Kandidaten-Itemsets bezeichnet. Ckp Lpk Lp CkG CG LG k Menge Menge Menge Menge Menge Menge der lokalen Kandidaten der k-Itemsets in Partition p der lokalen frequent k-Itemsets in Partition p aller lokalen frequent Itemsets in Partition p aller Kandidaten der globalen k-Itemsets aller globalen Itemsets-Kandidaten aller globalen frequent k-Itemsets Tabelle 3.1: Übersicht und Notation der verschiedenen Itemset-Mengen Globaler Support, globale frequent Itemsets und globale Itemset-Kandidaten sind wie die vorher genannten lokalen Varianten definiert, sie beziehen sich jedoch nicht auf eine Partition p ∈ D sondern auf die vollständige Menge von Transaktionen D. 44 Algorithmus 3.1 Partition-Algorithmus ([SON95] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: input: D - database of transactions input: n - number of partitions input: ξ - minimum support output: LG - list of global frequent itemsets procedure Partition(D, n, ξ) P = Partition Database(D) τ = ξ/n for i = 1 to n in parallel do Read In Partition(pi ∈ P ) Li = Gen Large Itemsets(pi , τ ) for (i = 2;SLji 6= ∅, i = 1, 2, . . . , n; i + +) do CiG = j=1,2,...,n Lji for i = 1 to n in parallel do Read In Partition(pi ∈ P ) Gen Count(C G , pi ) LG = {c ∈ C G |c.count ≥ ξ} return frequent itemsets LG . τ - lokaler minimaler Support . Erzeugung der Kandidaten . Merge der lokalen Kandidaten . Berechnung des globalen Supports Ablauf Zu Beginn des Algorithmus wird die Transaktionsdatenbank D in n logische Partitionen aufgeteilt. In der ersten Phase wird in n Iterationen parallel mit dem Algorithmus 3.2 für die jeweilige Partition pi als Eingabe die Menge Li der lokalen frequent Itemsets der Längen 1, . . . , l berechnet mit Li = Li1 ∪ Li2 ∪ . . . ∪ Lil . Anschließend werden sequentiell die berechneten Kandidaten-Mengen lokaler frequent Itemsets aller n Partition zu einer großen Menge vereinigt, der Kandidatenmenge für globale frequent Itemsets. In der zweiten Phase wird nun parallel in jeder Partition der Support jedes Kandidaten mit der Funktion Gen Count gezählt und anschließend aufsummiert. Kandidaten, deren Support über dem globalen minimalen Support ξ liegen werden in die Menge LG , der Menge der globalen frequent Itemsets, eingefügt und am Ende des Algorithmus zurückgegeben. Erzeugung lokaler frequent Itemsets Dier Erzeugung lokaler frequent Itemsets mit Algorithmus 3.2 beginnt mit einer Partition der Transaktionsmenge und dem lokalen minimalen Support τ . Zunächst wird aus der Transaktionsmenge die Menge aller lokalen frequent 1-Itemsets berechnet und jedes 45 dieser Itemsets die Liste der Transaktionen angehängt, in denen das Itemset enthalten ist. Nun beginnt die Erzeugung der lokalen frequent Itemsets mit der Länge k aus Kandidaten der Länge k−1 (Zeilen 3-11). Hierfür wird in einer inneren und äußeren Schleife jeweils über die Kandidaten mit der Länge k−1 iteriert. Wenn zwei Kandidaten l1 und l2 nun bis zur k − 1-ten Stelle identisch sind und sich an dieser unterscheiden, so dass l1 [k−1] < l2 [k−1], so wird ein neues Itemset c generiert, welches bis zur k − 2-ten Stelle identisch ist und dann l1 [k−1] und anschließend l2 [k−1] angehängt. Algorithmus 3.2 Generierung großer Itemsets im Partition-Algorithmus ([SON95] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: input: p - database partition input: τ - local minimum support output: Lp - list of local frequent itemsets in p procedure Gen Large Itemsets(p, τ ) Lp1 ={large 1-itemsets along with their tidlists} for (k = 2; Lpk 6= ∅; k + +) do for all itemsets l1 ∈ Lpk−1 do for all itemsets l2 ∈ Lpk−1 do if l1 [1] = l2 [1]∧l1 [2] = l2 [2]∧. . .∧l1 [k−2] = l2 [k−2]∧l1 [k−1] < l2 [k−1] then c = l1 [1] · l1 [2] · · · l1 [k − 1] · l2 [k − 1] if Prune(c) == false then c.tidlist = l1 .tidlist∩l2 .tidlist if |c.tidlist| / |p| ≥ τ then Lpk = Lpk ∪ {c} return Lp Für diesen Kandidaten ist erst zu testen, ob dieser auf Grund des Apriori-Prinzips (Abschnitt 2.2.1) verworfen werden kann. Dieses passiert mit Algorithmus 3.3, indem für alle k − 1-Teilmengen getest wird, ob diese in Kandidaten-Menge lokaler frequent Itemsets der Länge k −1, Lk−1 enthalten ist. Wenn dies der Fall ist, wird die Schnittmenge der Transaktionen der Itemsets l1 und l2 , aus denen der Kandidat c enstanden ist, berechnet und an c angehängt. Wenn nun die Anzahl der Transaktionen, in den c enthalten ist, durch die Transaktionszahl der Partition p größer ist als der lokale minimale Support τ , so wird c zu der Menge der lokalen frequent Itemsets der Länge k hinzugefügt. Dieser Vorgang wird solange fortgesetzt, bis in einer Iteration keine neuen Itemsets mehr generiert werden. Anschließend wird die Liste Lp mit allen lokalen frequent Itemsets zurückgegeben. 46 Algorithmus 3.3 Der Pruning-Schritt im Partition-Algorithmus ([SON95] angepasst) 1: 2: 3: 4: 5: 6: input: c - k-itemset output: boolean that indicates, whether c can be pruned procedure Prune(c: k-itemset) for all (k − 1)-subsets s of c do if s ∈ / Lk−1 then return true else return false Erzeugung der globalen frequent Itemsets Das Zählen der Vorkommen aller Itemsets c aus C G in einer Partition p beginnt zunächst, in dem für jedes Vorkommende 1-Itemset eine Liste erzeugt wird, in welchen Transaktionen dieses vorkommt. Die Menge von Transaktion, die c enthält, ergibt sich, indem Schritt für Schritt aus den Items von c der Schnitt der Transaktionslisten berechnet wird. Dieses wird für alle c ∈ C G durchgeführt und abschließend C G zurückgegeben. Algorithmus 3.4 Berechnung des Supports der Kandidaten in einer Partition ([SON95] überarbeitet) 1: 2: 3: 4: 5: 6: input: C G - list of all itemset candidates input: p - partition output: count of all c ∈ C G in p procedure Gen Count(c, p) for all 1-itemsets do generate the tidlist for partition p for all c ∈ C G do T T T c.count = c[1].tidlist c[2].tidlist . . . c[k].tidlist return C G 3.1.2 Laufzeit Die Laufzeit des Partition-Algorithmus setzt sich im wesentlichen aus zwei Teilen zusammen, der Erzeugung von lokalen frequent-Itemset-Kandidatenmengen und der Bestimmung des Supports der globalen Kandidatenmenge. Als erstes ist für die Bestimmung der frequent 1-Itemsets eine Laufzeit von O(|p| · w) nötig, wobei w die durchschnittliche Transaktionslänge bezeichnet. Die Erzeugung der lokalen Kandidatenmengen erfolgt ähnlich zum Apriori-Algorithmus aus Abschnitt 2.2.1. Für die Erzeugung eines neuen Kandidaten sind maximal k − 2 Vergleichsoperationen 47 auszuführen. Diese ist für alle Itemset-Paare der vorherigen Iteration durchzuführen. Für das Pruning der Kandidaten sind, wie im Apriori-Algorithmus, alle k −1 Teilmengen hinsichtlich der Häufigkeit des Vorkommens zu überprüfen; zusätzlich sind hierfür ebenfalls k − 1 Zugriffe auf eine Datenstruktur zur Speicherung der frequent Itemset Kandidaten nötig, welche im Falle eines HashSets in O(1) erfolgen. Die Vereinigung der Listen mit den Transaktions-Identifikatoren schließlich erfordert den einmaligen Durchlauf beider Listen, sofern sind. Insgesamt erfolgt die Kandidatenerzeugung Pdiese sortiert p 2 also in O(|p| · w) + O( w |L | · ((k − 2) + k · (k − 1) + (|l1 | + |l2 |))). Wie für den k−1 k=2 Apriori-Algorithmus gilt auch hier, dass die Laufzeit für einen sehr niedrigen Support n exponentiell sein; die Laufzeit-Komplexität liegt also bei O( 2p ) Für die Berechnung der Vorkommen der Kandidatenmenge ist zunächst die Bestimmung der Listen mit den Transaktions-Identifikatoren für die 1-Itemsets erforderlich. Wie in der im Schritt zur Kandidatengenerierung benötigt dies eine Laufzeit von O(|p| · w). Für die Zählung des globalen Supports werden nun alle Kandidaten durchlaufen und die Schnittmengen der Listen mit den Transaktions-Identifikatoren gebildet. Diese kann in O(|C G | · ((k − 1) · (2 · l))) ausgeführt werden, wobei l die durchschnittliche Länge der Listen ist. Analog zum Apriori-Algorithmus ergibt sich für einen sehr niedrigen notwendigen Support eine exponentielle Laufzeit, so dass die Partition-Algorithmus im schlechtesten Fall n O( 2p ) beträgt. 3.1.3 MapReduce-Partition Der Partition-Algorithmus aus dem vorherigen Abschnitt lässt sich auf relativ einfache Weise auf das MapReduce-Paradigma übertragen [RU11]. Hierfür werden zwei aufeinander folgende MapReduce-Phasen verwendet. In der ersten Phase lesen die Mapper die zugewiesenen Eingabepartitionen ein und generieren auf diesen lokale frequent Itemsets, welche ausgegeben werden. Die Reducer der ersten Phase lesen diese ein und fassen diese zu der Kandidatenmenge für potentielle globale frequent Itemsets zusammen. In der zweiten Phase berechnen die Mapper für die zugewiesene Partition, wie oft jedes Kandidaten-Itemset in dieser vorkommt. Das Itemset wird zusammen mit dem Vorkommen als Schlüssel-Wert-Paar ausgegeben. Die Reducer summieren nun die für jedes Itemset die Vorkommen auf und geben das Itemset aus, wenn es den minimalen Support überschreitet. Der folgende Pseudocode zur Beschreibung der beiden Phasen des Partition-Algorithmus ist auf Grundlage der Ausführungen von [RU11] und mit Referenzierung der bereits vorhanden Pseudocodes aus Abschnitt 3.1.1 erstellt worden. 48 Phase 1: Generierung der Kandidaten Die Map-Funktion liest zunächst die ihr zugewiesene Partition, bzw. die darin enthaltenen Transaktionen, und berechnet die lokalen frequent Itemsets dieser Teilmenge nach Algorithmus 3.2. Algorithmus 3.5 Kandidatengenerierung input: key: null, value: list of transactions output: key: local frequent itemset, value - null 1: procedure Mapper(key, value) 2: L ← Gen Large Itemsets(value) 3: for each itemset in L do 4: Output(h itemset, null i) input: key - local frequent itemset, value - null output: key - itemset, value - null 5: procedure Reducer(key,value) 6: Output(hkey, nulli) Die so berechneten Itemsets werden als Schlüssel-Wert-Paare der Form hitemset, nulli ausgegeben. Die Funktionalität der Reducer ist im ersten Durchlauf extrem einfach gehalten; sie geben auftretende Itemsets unverändert als Schlüssel-Wert-Paar der Form h Itemset, null i aus. Die Menge von Itemsets, die nach dem Ende der Reduce-Phase vorliegt, ist die Menge der Kandidaten-Itemsets, für die getestet werden muss, ob sie auch global häufig sind. Phase 2: Berechnung des globalen Supports In der Map-Phase des zweiten Durchlaufes wird nun für jede Partition die Anzahl der Vorkommen für jedes Kandidaten-Itemset berechnet. Die Berechnung erfolgt hierfür nach Algorithmus 3.4. Für jedes Itemset wird das Vorkommen als Paar hItemset, Anzahl der Vorkommeni ausgegeben. In der Reduce-Phase werden nun von den Reducern für jedes Kandidaten-Itemset die Vorkommen aufsummiert und der Support berechnet. Itemsets, deren Support den minimal notwendigen Support erreichen oder überschreiten sind globale frequent Itemsets und werden als Schlüssel-Wert-Paar hItemset, counti ausgegeben. 49 Algorithmus 3.6 Berechnung des globalen Supports input: key - null, value - list of transactions input: candidates: list of frequent itemset candidates output: key - itemset, value - itemset count 1: procedure Mapper(key, value) 2: Gen Count(candidates, value) 3: for each c ∈ candidates do 4: Output (hc, c.counti) 5: 6: 7: 8: 9: 10: input: key - itemset i , value - list of itemset i’s counts output: key - itemset, value - itemset count procedure Reducer(key, value) count ← 0 for each v in value do count ← count + v if count ≥ minSupp then Output(hitemset, counti) 3.1.4 MapReduce Frequent-Pattern-Growth (PFP) Der MapReduce FP-Growth Algorithmus (PFP) [LWZ+ 08] ist eine parallelisierte Variante des FP-Growth Algorithmus (Abschnitt 2.2.2) für verteile Computercluster unter Nutzung des MapReduce-Frameworks. Die Grundidee des Algorithmus ist eine Partitionierung der Transaktionsdatenbank in unabhängige Teile, so dass parallel ohne äußere Abhängigkeiten mit dem FP-Growth Algorithmus häufige Itemsets gesucht werden können. Der Ablauf des Algorithmus (Abbildung 3.1) gliedert sich in fünf Phasen. In der ersten Phase wird die Datenbank in Teile zerlegt, die auf P verschiedenen Knoten in einem verteilten Dateisystem gespeichert werden. Diese Zerlegung und Verteilung der Daten wird als sharding bezeichnet, jedes einzelne Teil als Shard. Die Supportwerte aller Items aus der Datenbank werden in der zweiten Phase berechnet. Hierfür wird ein MapReduce-Durchlauf ausgeführt (Algorithmus 3.7), in welchem jeder Mapper einen Shard als Eingabe erhält und für jedes gefundene Item ai in einer Transaktion Ti ein Schlüssel-Wert-Paar der Form hItem, 1i ausgibt. Die Reducer fassen nun die Menge der Werte für jedes Vorkommen von ai zusammen und geben ein Schlüssel-Wert-Paar der Form hnull, ai + counti aus. In diesem Schritt wird außerdem das Vokabular I der Datenbank bestimmt, welches für große Datensätze oftmals nicht bekannt ist. Das Ergebnis, die häufigen 1-Itemsets und ihr Support, wird in einer Liste, der F-Liste (Frequent Itemset List) gespeichert. In der dritten Phase werden die |I| Items aus der F-Liste gleichmäßig in Q Gruppen 50 Eingabedaten Map CPU CPU CPU 1 & 2: Sharding und paralleles Zählen Reduce CPU CPU CPU Liste häufiger 1-Itemsets CPU 3: Items gruppieren Gruppenliste Neue integrierte Daten Map CPU CPU CPU QR QR QR Gruppe 1 Gruppe 2 Gruppe Q CPU CPU CPU Reduce 4: paralleles und selbstanpassendes FP-Growth Zwischenergebnis Map CPU CPU CPU 5. Aggregierung Reduce CPU CPU CPU Ergebnis Abbildung 3.1: Ablaufdiagramm des PFP-Algorithmus (nach [LWZ+ 08]) 51 Algorithmus 3.7 Berechnung des Supports aller 1-Itemsets ([LWZ+ 08] angepasst) input: key - null, value - transaction Ti output: key - item ai , value - 1 1: procedure Mapper(key, value) 2: for each item ai in Ti do 3: Output (hai ,0 10 i) 4: 5: 6: 7: 8: input: key - an item ai , value - set of values corresponding to key ai output: key - null, value - item with corresponding occurence count procedure Reducer(key, value) count ← 0; for each 0 10 in value do count ← count + 1 Output(h null, ai + counti) verteilt, wobei die Liste der Gruppen als G-Liste (Group List) bezeichnet wird und jede Gruppe einen eindeutigen Identifikator erhält. Aufgrund der geringen Größe und Zeitkomplexität kann diese Phase auf einem einzelnen Knoten ausgeführt werden. Das Kernstück des Algorithmus, das Generieren von häufigen Itemsets, erfolgt in der vierten Phase mit Algorithmus 3.8. Diese benötigt einen MapReduce-Durchlauf, wobei in der Map-Phase Transaktionen für gruppen-abhängige Partitionen erzeugt werden und in der Reduce-Phase der FP-Growth Algorithmus auf den zuvor generierten gruppenabhängigen Transaktions-Partitionen ausgeführt wird. Beim Start eines Mappers wird zunächst die Gruppen-Liste geladen, welche in Form einer Hash-Tabelle organisiert wird (Zeile 2-3). Hierdurch wird jedes Item ai auf eine Gruppe gid abgebildet. Als Eingabe erhalten die Mapper einen Shard der Eingabedaten, wobei das Eingabe-Paar für die Map-Funktion von der Form hnull, Ti i ist und somit nur eine einzige Transaktion als Wert enthält. Für jede gelesene Transaktion wird jedes Item aj durch die Gruppen-ID substituiert, welche sich aus der Anwendung des Items auf die Hash-Tabelle H ergibt (Zeile 6). Für jede nun vorkommende Gruppen-ID gid wird das am weitesten rechts stehende Vorkommen bestimmt, hier j und entsprechende Paare der Form h gid, {Ti [1] . . . Ti [L]} i ausgegeben. Jede Transaktion wird folglich Element für Element, von rechts beginnend, verkürzt und anhand der Gruppenzugehörigkeit des am weitesten rechts stehenden Element einer Gruppe zugeordnet. Wichtig hierbei ist jedoch, dass die ursprüngliche Transaktion bzw. die aus der Verkürzung resultierenden Teilmengen jeder Gruppe höchstens einmal zugeordnet werden. Dieses wird gewährleistet durch das Entfernen der Paare h HashNum, ai i aus der Hash-Tabelle H (Zeile 8). Wenn alle Map-Tasks fertiggestellt wurden, werden von der MapReduce-Infrastruktur die von den Mappern ausgegebenen Paare gesammelt und die Werte nach dem Schlüssel gruppiert. Die gruppen-abhängigen Transaktion werden an die Reducer übergeben in Form von Paaren hgid, DB gid i, wobei DB gid eine Menge von gruppen-abhängigen Transaktionen 52 Algorithmus 3.8 Suche von frequent Itemsets mit dem FP-Growth-Algorithmus ([LWZ+ 08] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: input: key - null, value - transaction Ti output: key - hash-value, value - itemset procedure Mapper(key, value) load G-List generate Hash Table H from G-List a[] ← Split(Ti ) for j = |Ti | − 1 to 0 do HashN um ← getHashNum(H, a[j]) if HashNum 6= N ull then delete all pairs whose hash value is HashNum in H output (hHashN um, a[0] + a[1] + . . . + a[j]i) input: key - group-id, value - transaction-list corresponding to group-id output: key - null, value - itemset with support value procedure Reducer(key, value) load G-List nowGroup ← G − Listgid LocalF P tree ← clear LocalF P tree = ConstructFpTree(DBgid , ξ) for each ai in nowGroup do create a max heap HP with size K TopKFPGrowth(LocalFPtree, ai , HP ) for each vi in HP do call Output(hnull, vi + supp(vi )i) mit der selben Gruppen-ID gid sind. Diese Menge wird als gruppen-abhängiger Shard, bzw. Partition, bezeichnet. In jedem Task erzeugt der Reducer einen FP-Baum (Zeile 14) und generiert mit einen leicht abgewandelten FP-Growth Algorithmus (siehe Algorithmus 2.5 auf Seite 20 für das Original) Itemsets (Zeile 15-19). Der Unterschied zum normalen FP-Growth Algorithmus liegt in der Tatsache, dass für ein Itemset ai nur die K Itemsets mit dem größten Support geliefert werden welche vom Algorithmus direkt in einem Maximum-Heap der Größe K verwaltet werden. Diese werden anschließend als Schlüssel-Wert-Paare hnull, v+supp(v)i, also ohne Schlüssel und als Wert das Itemset mit dem zugehörigen Support, ausgegeben. Eine Aggregierung der Ergebnisse aus Algorithmus 3.8 erfolgt nun in der fünften Phase mit Algorithmus 3.9. Für jedes Item ai werden die K Itemsets mit dem größten Support ausgegeben. Als Eingabe erhalten die Mapper in diesem Schritt Schlüssel-Wert-Paare der Form hnull, v + supp(v)i, also einem Itemset mit dem zugehörigen Support; aus diesem wird für jedes aj ∈ v ein Paar der Form haj , v + supp(v)i ausgegeben. 53 Algorithmus 3.9 Aggregierung der Ergebnisse ([LWZ+ 08] angepasst) input: key - null, value - itemset v with corresponing support count output: key - item, value - itemset v with corresponding support count 1: procedure Mapper(key, value) 2: for all item ai in v do 3: Call Output(hai , v + supp(v)i) 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: input: key - item, value - set of itemsets v with corresponding support counts output: key - null, value - item ai and top-k supported itemsets and support counts procedure Reducer(key, value) define and clear a size K max heap HP ; if |HP | < K then insert v + supp(v) into HP else if supp(HP [0].v) < supp(v) then delete top element in HP insert v + supp(v) into HP for each v ∈ HP do Call Output(h null, ai + vi) Die Reduce-Tasks verarbeiten als Eingabe nun Schlüssel-Wert-Paare haj , set(v+supp(v))i als Eingabe, wobei set(v + supp(v)) eine Menge von Transaktionen mit dem zugehörigen Support bezeichnet, welche das Item aj enthalten. Diese werden direkt in einen Maximum-Heap HP, der nach dem Support-Wert sortiert ist, eingefügt, solange dieser weniger als K Elemente enthält. Wenn der Heap bereits K Elemente enthält können zwei Fälle auftreten. Wenn der Support des einzufügenden Itemsets v größer als der des Itemsets mit dem kleinsten Support-Wert des Heaps ist, so wird dieses aus dem Heap entfernt und v eingefügt. Ansonsten wird v verworfen. Wenn der Reducer alle Werte gelesen hat werden zum Abschluss alle Itemsets v des Heaps als Schlüssel-Wert-Paare hnull, ai + vi ausgegeben. Abschließend ist festzuhalten, dass aus der Beschreibung des Algorithmus nicht hervorgeht, ob die Ergebnisse des Verfahrens exakt oder lediglich approximativ sind. Kernpunkt hierfür ist die Aufteilung in gruppenabhängige Shards. Zunächst erfolgt eine Aufteilung der auftretenden Items in Q Gruppen, wobei eine Gruppe mehrere Items enthalten kann. Anschließend werden die Transaktionen sukzessive gekürzt und nach dem Hashwert des letzten Transaktions-Items einem Shard, der einer Gruppe zugeordnet ist, zugewiesen. Jedoch ist es nicht möglich, dass in einer Partition mehr als eine Teilmenge der Ausgangstransaktion auftritt. Durch das Auftreten von Teilmengen einer Transaktion in verschiedenen gruppenabhängigen Shards ist deshalb anzunehmen, dass der Algorithmus nur approximative Ergeb- 54 nisse liefert. In Kapitel 5 wird deshalb experimentell versucht, in einem Vergleich mit dem MapReduce-Partition-Algorithmus eine Antwort auf diese Frage zu finden. 3.1.5 Weitere Algorithmen Der PARMA-Algorithmus (Parallel Randomized Algorithm for Approximate Association Rule Mining in MapReduce)[RDFU12] ist ein paralleler Algorithmus auf Grundlage des MapReduce-Paradigmas, der durch ein stochastisches Verfahren eine Annäherung der Menge von frequent Itemsets berechnet. Hierfür nimmt jeder Rechenknoten eine Stichprobe der Transaktionsdatenbank. Die Knoten berechnen unabhängig voneinander die frequent Itemsets der Stichprobe, die abschließend zu einem einzigen Ergebnis gefiltert und aggregiert werden. Die Qualität der Näherung ist über Parameter für Genauigkeit und Fehlerwahrscheinlichkeit der Stichprobe kontrollierbar. Diese Parameter bestimmen letzlich auch den Umfang der Stichprobe. Ein weiteres paralleles Verfahren für Multi-Core-Rechner ist der P-Mine-Algorithmus (Parallel Itemset mining on large datasets) [BCCG13], welcher als grundlegende Datenstruktur einen HY-Baum[BCC10] verwendet. Bei diesem handelt es sich, analog zum FP-Baum des FP-Growth-Algorithmus um einen Präfix-Baum, der Transaktionen nach Häufigkeit sortiert repräsentiert. Ein zentraler Aspekt ist, dass für einen Baumdurchlauf die physischen Speicherorte der Knoten über Zeiger festgehalten werden. Außerdem existiert ein sogenannter Item-Index, der es zum einen ermöglicht, alle Pfade, die ein bestimmtes Item enthalten zu ermitteln, zum anderen enthält er alle Items des HYBaumes zusammen mit ihrem Support sowie Zeiger auf den physikalischen Speicherort. Die Suche nach frequent Itemsets erfolgt, analog zum FP-Growth-Algorithmus, durch die Generierung bedingter Teilbäume und der rekursiven Abarbeitung. Der MLFPT-Algorithmus (Multiple local frequent pattern trees)[ZEHL01] ist paralleles Verfahren zur Assoziationsanalyse, welches auf den Grundlagen des FP-GrowthAlgorithmus basiert. Der Algorithmus benötigt zwei Suchläufe auf der Transaktionsdatenbank und besteht aus zwei Phasen. In der ersten Phase werden parallele FP-Bäume erzeugt. In dieser werden in einem Suchlauf parallel die frequent 1-Itemsets bestimmt, nach Häufigkeit sortiert und in einer Header-Tabelle gespeichert. In einem zweiten Durchlauf werden parallel die lokalen FP-Bäume aufgebaut. Jeder Prozessor baut aus der zugewiesenen Menge von Transaktionen einen eigenen FP-Baum auf, verwirft Items, die nicht in der Header-Tabelle vorhanden sind und sortiert die Transaktionen gemäß der Häufigkeit der Items. Jedoch ist die Header-Tabelle im MLFPT-Algorithmus global und die Zeiger verbinden so die Items verschiedener FP-Bäume. Für die Suche nach frequent Itemsets wird der Support aller Items durch die Anzahl der Prozessoren geteilt und so der durchschnittliche Support A gebildet. Die Items werden über den Support-Wert auf die Prozessoren verteilt, so dass der kumulierte Support jedes Prozessors etwa A beträgt. Die Prozessoren bilden nun auf Basis ihrer zugewiesenen Items bedingte FP-Bäume auf 55 Grundlage der lokalen FP-Bäume, die zuvor berechnet worden sind. In diesen wird dann parallel nach frequent Itemsets gesucht. 3.2 3.2.1 Algorithmen zur Clusteranalyse Parallel Partially Overlapping Partitioning (pPOP) Partial Overlapping Partitioning (POP) Minimaler Abstand Der POP-Algorithmus[DL01] nutzt die 90-10-Regel aus, um die Speicheranforderungen und die Laufzeit zu reduzieren. Diese ergibt sich aus Beobachtungen des Verlaufs hierarchischer Clusterings und besagt, dass in den ersten 90% der Iterationen des Algorithmus Cluster vereinigt werden, deren Entfernung weniger als 10% der maximalen Verschmelzungs-Distanz von allen Cluster-Vereinigungen beträgt (Abbildung 3.2). Anzahl der Schritte Abbildung 3.2: Die 90-10-Regel im hierarchischen Clustering Basierend auf dieser Annahme wird nun der zu clusternde Datensatz in p Zellen unterteilt, die sich in den Randbereichen überlappen. Dieser Überlappungsbereich wird als δ-Region bezeichnet, wobei δ die Breite dieses Bereiches bezeichnet (Abbildung 3.3), und gehört immer zu zwei Partitionen. Hierarchische Clusteringverfahren, wie etwa der SLINK-Algorithmus [Sib73], haben im besten Fall eine Komplexität von O(n2 ). Weiterhin lässt sich aus der 90-10-Regel schließen, dass zu Beginn des Algorithmus Paare mit sehr kleinen Distanzen vereinigt werden, so dass diese lokal begrenzt sind. Im Umkehrschluss bedeutet dies, dass durch die, mindestens quadratische, Komplexität die Verschmelzungsoperationen auf den unteren Ebenen des Dendrograms sehr teuer sind. Eine Aufteilung der zu clusternden Daten in Partitionen ermöglicht somit eine signifikante Reduktion der Laufzeit, da die zugehörigen 56 Distanzmatrizen einen geringeren Umfang besitzen. δ δ δ δ δ δ δ δ δ δ δ δ Abbildung 3.3: POP-Algorithmus - Überlappende Partitionen Die Grundidee des POP-Algorithmus (Algorithmus 3.10) ist die Unterteilung des Datensatzes in p Partitionen nach einem zu bestimmenden Attribut; jede der Zellen besitzt mit den angrenzenden Nachbarn einen Überlappungsbereich, der als δ-Region bezeichnet wird (Abbildung 3.3). Während jeder Iteration wird nun in jeder Zelle nach dem Cluster-Paar mit dem geringsten Abstand gesucht, aus diesen wiederum das Paar mit insgesamt geringsten Distanz ausgewählt wird; wenn diese Distanz kleiner als δ ist, so werden die beiden Cluster verbunden und die Prioritätswarteschlangen der betroffenen Zelle aktualisiert. Wenn einer der beiden Clusterschwerpunkte oder der Schwerpunkt des vereinigten Clusters in der δ-Region liegt, so wird zusätzlich die Warteschlange der anderen betroffenen Zelle aktualisiert. Diese erste Phase wird solange fortgesetzt, bis die Distanz zwischen ClusterPaaren δ überschreitet. In der zweiten Phase wird abschließend ein klassischer Algorithmus für hierarchisches Clustering auf den verbleibenden Clustern angewendet, um das Dendrogram zu vervollständigen. 57 Algorithmus 3.10 Der POP-Algorithmus ([DL01] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: input: d - dataset to be clustered input: p - number of partitions, δ - width of cell overlap output: resulting dendrogram procedure POP choose attribute to partition the data divide the dataset d into p partitions with margin δ init priority queues P for each cell while merging distance < δ do init priority queue clusterPairs for each cell c do find closest cluster pair p(C1 , C2 ) ∈ c clusterPairs.insert(p) closestPair = clusterPairs.deque() Cnew = merge(closestPair ) update corresponding priority queue P containing p if C1 ∈ p ∨ C2 ∈ p ∨ Cnew in δ-region then update priority queue P of affected cell if any of the pair merged cluster in δ-region, update P of affected cell remove duplicates from the δ-regions cluster the remaining k’ clusters return dendrogram Parallelisierung des POP-Algorithmus Eine mögliche Parallelisierung ergibt sich direkt, wenn die Struktur des POP-Verfahrens (Abschnitt 3.2.1 Algorithmus 3.10) betrachtet wird. In diesem wird der Datenraum in disjunkte Partitionen unterteilt und in jedem Schritt werden sequentiell in diesen die nächsten Cluster-Paare gesucht. Zusätzlich ist bei einer Verschmelzung die Distanzmatrix der betroffenen Partition zu aktualisieren, wobei Distanzen zwischen Clusterpaaren bestimmt werden müssen. Der pPOP -Algorithmus [DPS07] setzt an diesen Punkten an und parallelisiert die Berechnung der nächsten Paare, sowohl bei der Suche nach Verschmelzungskandidaten als auch bei der Aktualisierung der Distanzmatrizen; die charakteristischen zwei Phasen des POP-Algorithmus, der Aufbau lokaler Dendrogramme und das anschließende Verschmelzen zu einem globalen Dendrogram, bleiben im pPOPAlgorithmus erhalten. In der ersten Phase werden die Daten in überlappende Zellen partitioniert, wobei der Überlappungsbereich die Breite δ besitzt. Nun wird parallel in jeder Partition das Clusterpaar mit der kleinsten Inter-Cluster-Distanz gesucht. Das Paar mit der insgesamt kleinsten Distanz dist < δ wird von einem festgelegten Prozessor gewählt und anschließend vereinigt. Bei der Vereinigung sind nur Cluster und folglich Distanzen derselben 58 Zelle betroffen; es muss nur die Distanzmatrix einer Zelle aktualisiert werden. Wenn sich das nächste Paar jedoch innerhalb einer δ-Überlappungszone, dem Randbereich der Zelle, befindet, müssen zusätzlich zur Zelle, die die vereinigten Cluster enthält, auch die Distanzen zwischen Clustern der an den δ-Breich angrenzenden Zellen aktualisiert werden. Die erste Phase wird beendet, sobald die Distanz zwischen den nächsten Paaren δ überschreitet. In der zweiten Phase werden die verbliebenen Cluster durch Verwendung eines sequentiellen Algorithmus für hierarchisches agglomeratives Clustering vereinigt. Ablauf Die hier vorgestellte Variante des pPOP-Algorithmus (Algorithmus 3.11) verwendet für die Implementierung der Distanzmatrix Prioritätswarteschlangen. Diese werden zu Beginn parallel initialisiert. Die zu berechnenden Zellen werden den verfügbaren Prozessoren so zugeordnet, dass jeder Prozessor die Prioritätswarteschlangen für durchschnittlich c/p Zellen berechnet. Jeder Prozessor erhält so eine eigene Menge kleinster Inter-ClusterDistanzen für die Zellen, die von ihm berechnet werden. Aus dieser wählt er die kleinste aus und liefert die Zellennummer, in der dieses Paar zu finden ist. Folglich ergibt sich hieraus, dass es nach der parallel Bestimmung der kleinsten Distanz (Zeile 6-7) p kleinste Distanzen von Clusterpaaren und zugehörige Zellennummern gibt. Ein Prozessor, der quasi als Master fungiert, wählt hieraus nun das insgesamt kleinste Paar aus und verschmilzt dieses zu einem neuen Cluster (Zeile 8-10). Für den neuen Cluster ist nun eine neue Prioritätswarteschlange anzulegen, in welcher die Distanzen zu allen anderen Clustern der Zelle gespeichert sind. Auch diese Berechnung kann parallelisiert werden, indem jeder Prozessor einen Teil der Distanzen berechnet. Nach der Vereinigung des Clusterpaares sind im Anschluss auch die Warteschlangen der anderen Cluster in der Zelle zu aktualisieren, was ebenfalls parallel erfolgen kann, in (Zeile 11dem jeder Prozessor einen Teil der Warteschlangen aktualisiert, also Cclus(C) p 12). Für den Fall, dass einer der vereinigten Cluster oder der neu gebildete Cluster im δ-Randbereich der Partition lagen, so sind in den hiervon betroffenen Zellen ebenfalls die Warteschlagen zu aktualisieren (Zeile 13-17). Sobald die kleinste gefundene Distanz zwischen Clusterpaaren mindestens die Größe δ erreicht hat endet der parallele Teil des Algorithmus; die bis hierhin gebildeten lokalen Dendrogramme werden durch die Anwendung eines sequentiellen hierarchischen Clustering-Algorithmus durch die Vereinigung der verbliebenen Cluster zu einem globalen Dendrogram verschmolzen. Die Laufzeitkomplexität des pPOP-Algorithmus setzt sich aus vier Teilen zusammen. Für N2 ), die Initialisierung der Prioritätswarteschlagen ergibt sich eine Komplexität von O( c∗p wobei p die Anzahl der Prozessoren und N die Anzahl der Datenpunkte ist. Die anschließende WHILE-Schleife wird maximal N − 1-mal durchlaufen, hat also eine Laufzeit von O(N ). Die Komplexität für die Suche nach den beiden Punkten mit der geringsten 59 Algorithmus 3.11 pPOP-Algorithmus ([DPS07] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: Ccell(p) - number of cells assigned to processor p Cclus(C) - chunk of clusters in cell C input: ds: dataset, c: number of cells, δ: width of cell overlap output: dendrogram of a clustered dataset ds procedure P-POP(ds, c, δ) partition ds in c cells for each processor P in parallel do create priority queues for Ccell(P ) while overall min dist < δ do for each processor P in parallel do determine closest pair (CL1 , CL2 ) for each cell C in Ccell(p) in parallel on designated processor P̄ find overall closest pair (CL1 , CL2 ) and cell C that holds it merge CL1 and CL2 for each processor P in parallel do update the priority queues of Cclus(C) on designated processor P̄ determine if neighbor cells are affected by the merged clusters for each processor P in parallel do for each affected cell C do update priority queues of Cclus(C) merge resuming clusters with sequential hierarchical clustering algorithm return complete dendrogram Distanz ist O( np ). n bezeichnet hier die Anzahl von Clustern, die noch in den Prioritätswarteschlangen vorhanden sind. Für den vierten Schritt, das Aktualisieren der von der n∗log( n ) Vereinigung betroffenen Prioritätswarteschlagen, beträgt die Komplexität O( p c ), da eine einzelne Prioritätswarteschlange nc Cluster enthält. Insgesamt ergibt sich somit eine Laufzeit von O( 3.2.2 N 2 ∗log( N ) c ). p Parallel Disjoint-Set DBSCAN (PDSDBSCAN) Um eine Erweiterung des ursprünglichen DBSCAN-Algorithmus [EKSX96] handelt es sich bei dem PDSDBSCAN-Algorithmus[PPA+ 12], der eine Parallelisierung durch die Verwendung der Union-Find-Datenstruktur erreicht. Zunächst wird die sequentielle Variante, der Disjoint-Set DBSCAN -Algorithmus vorstellt. Mit diesem als Grundlage wird sodann eine parallele Implementierung vorgestellt. 60 Der Disjoint-Set-DBSCAN-Algorithmus Im ersten Schritt des Algorithmus 3.12 wird die Punktmenge X derart aufgeteilt, dass jeder Punkt in einer eigenen Menge enthalten ist. Zusätzlich werden für die jeweiligen Mengen die Zeiger für die Vorgänger auf die eigene Menge gesetzt (Zeile 2-3). Anschließend wird die gesamte Punktemenge durchlaufen; für jeden Punkt wird zunächst dessen Nachbarschaft in Abhängigkeit vom Punkt x und der Nachbarschaftsgröße eps bestimmt und in der Menge N gespeichert. Wenn die Anzahl der Nachbarn größer als die minimale Nachbarschaftsgröße ist, wird x als Kernpunkt markiert. Dann erfolgt eine zusätzliche Betrachtung der Punkte x0 , die in der Nachbarschaft von x enthalten sind. Für jeden dieser Punkte wird geprüft, ob x0 ein Kernpunkt ist oder ob er noch keinem Cluster zugeordnet ist. Wenn der Punkt ein Kernpunkt ist werden die Mengen, die x und x0 enthalten, vereinigt. Falls x0 noch keinem Cluster zugeordnet ist, wird wie im vorherigen Schritt die Mengen, die x und x0 enthalten vereinigt. Analog zum DBSCAN-Algorithmus ergibt sich durch Nutzung einer räumlichen Indexstruktur eine Laufzeitkomplexität von O(n ∗ log(n)) wenn eine DisjointSet-Implementierung verwendet wird, die eine Klassenvereinigung in O(1) ermöglicht. Algorithmus 3.12 DSDBSCAN-Algorithmus ([PPA+ 12] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: input: X - set of points input: eps - neighborhood size input: minpts - number of points required for classification as core point procedure DSDBSCAN(X, eps, minpts) for each point x ∈ X do p(x) ← x for each point x ∈ X do N ← GetNeighbors(x, eps) if |N | ≥ minpts then mark x as core point for each point x0 ∈ N do if x0 is a core point then Union(x, x0 ) else if x0 is not yet member of any cluster then mark x0 as member of a cluster Union(x, x0 ) Ein Beispiel für die einzelnen Phasen des DSDBSCAN-Algorithmus ist in Abbildung 3.4 dargestellt. In diesem sind drei Punkte innerhalb der Punktnachbarschaft für eine Klassifikation als Kernpunkt notwendig. Im ersten Teil der Abbildung ist die räumliche Verteilung der Punkte des Beispiels abgebildet; die Größe der ε-Nachbarschaft ist durch gestrichelte Kreise dargestellt. Das DisjointSet nach seiner Initialisierung mit den Beispielpunkten ist im zweiten Teil zu sehen. Eine Nachbarschaftssuche, ausgehend von den 61 Punkten 4, 5, 6 und 7 ist in dritten Teil zu sehen; das Ergebnis dieser spiegelt sich in den gebildeten Bäumen wieder. Das finale Clustering schließlich ist im vierten Teil der Abbildung abgebildet. Cluster sind hier durch blaue Bäume mit blauen Knoten, Rauschen durch Bäume mit roten Knoten hervorgehoben. 1 5 9 2 7 8 4 1 6 3 2 1 2 3 4 5 6 7 8 9 3 1 3 5 6 8 4 2 7 9 4 5 1 2 3 6 7 8 4 9 Abbildung 3.4: Beispielhafte Abbildung der Phasen des DSDBSCAN-Algorithmus (nach [PPA+ 12]) Der Algorithmus (PDSDBSCAN) Kernpunkt der parallelen Version des DSDBSCAN-Algorithmus ist die Union-FindDatenstruktur (Abschnitt 2.4). Hierbei wird zuerst der Datenraum partitioniert, sodass die Partitionen unabhängig von einander auf den lokalen Datenpunkten die lokalen Cluster, respektive Bäume, berechnen. Sobald die Bäume für die jeweiligen Partitionen 62 berechnet sind, werden diese verschmolzen, um aus den lokalen Clustern die globalen zu erzeugen. Zugrunde liegt zunächst die Annahme, dass der Algorithmus auf einem Rechner mit gemeinsamen Speicher und p Threads ausgeführt wird. Der Datenraum der Punktmenge soll also in p Partitionen, {X1 , X2 , . . . , Xp }, wobei jedem Thread eine Partition zugewiesen wird. Für jeden Thread t bezeichne Yt eine Menge von Punkt-Paaren (x, x0 ), so dass x ∈ Xt und x0 ∈ / Xt . Algorithmus 3.13 PDSDBSCAN-Algorithmus ([PPA+ 12] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: input: X - set of points input: eps - neighborhood size input: minpts - number of points required for classification as core point procedure PDSDBSCAN-S(X, eps, minpts) for t = 1 to p in parallel do for each point x ∈ Xt do p(x) ← x Yt ← ∅ for each point x ∈ Xt do N ← GetNeighbors(x, eps) if |N | ≥ minpts then mark x as a core point for each point x0 ∈ N do if x0 ∈ Xt then if x0 is a core point then Union(x, x0 ) else if x0 ∈ / any cluster then mark x0 as member of a cluster Union(x, x0 ) else Yt ← Yt ∪ {(x, x0 )} for t = 1 to p in parallel do for each (x, x0 ) ∈ Yt do if x0 is a core point then UnionUsingLock(x, x0 ) else if x0 ∈ / any cluster then mark x0 as member of a cluster UnionUsingLock(x, x0 ) Der Algorithmus kann in zwei wesentliche Teile, einen lokalen (Zeile 2-18) und einen globalen (Zeile 19-25), aufgeteilt werden. Die Berechnungen innerhalb des lokalen Teils sind identisch zu denen aus Algorithmus 3.12, jedoch werden nicht Datenpunkte aus der ganzen Punktmenge X sondern nur aus der Partition Xt des Threads verwendet. Zu 63 beachten ist hierbei jedoch, dass in der Abfrage der Nachbarschaft eines Punktes x ∈ Xt auch Punkte x0 ∈ / Xt enthalten sein können. In diesem Fall wird das Verschmelzen der Bäume, die x und x0 enthalten, verzögert und in der Merge-Phase gelöst. Hierfür wird das Paar (x, x0 ) zu der Menge Yt hinzugefügt. Die einzigen Fälle, in denen die Threads auf nicht-lokale Punktedaten, also Punkte außerhalb der eigenen Partition, zugreifen, sind die Lesezugriffe, wenn die Nachbarschaft eines Punktes abgefragt wird. Es ist hierbei festzuhalten, dass in diesem ersten lokalen Schritt keine Kommunikation zwischen den Threads notwendig ist, ja sogar nicht erwünscht ist, damit die lokale Phase vollständig unabhängig voneinander ausgeführt werden kann, es also keine Verluste durch Kommunikationsverzögerungen gibt. Folglich werden hier lokale Union-Find-Strukturen verwendet. Algorithmus 3.14 Vereinigung von Union-Find-Strukturen mit Sperre ([PPA+ 12] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: input: points x and x0 to be merged procedure UnionUsingLock(x, x0 ) while p(x) 6= p(x0 ) do if p(x) < p(x0 ) then if x = p(x) then Lock(p(x)) if x = p(x) then p(x) ← p(x0 ) Unlock(p(x)) x = p(x) else if x0 = p(x0 ) then Lock(p(x0 )) if x0 = p(x0 ) then p(x0 ) ← p(x) Unlock(p(x0 )) x0 = p(x0 ) Im zweiten Schritt, der Vereinigung der Bäume (Algorithmus 3.14), werden die in Yt enthaltenen Punktpaare durchlaufen. Für jedes Paar (x, x0 ) ∈ Yt wird überprüft, ob x0 ein Kernpunkt ist oder noch nicht zu einem Cluster hinzugefügt wurde. Wenn dies der Fall ist, werden die Bäume, die x und x0 enthalten, mit der Union-Operation vereinigt. Implizit sorgt dieser Schritt dafür, dass x und x0 schließlich dem gleichen Cluster angehören. Während des Vereinigungsvorgangs kann es passieren, dass die Vorgänger-Zeiger von Punkten geändert werden, die einem anderen Berechnungsthread zugeordnet sind. Es ist also notwendig, Vorkehrungen für den konkurrierenden Zugriff auf die Punkte zu treffen. Der Grundgedanke bei den Sperren ist, dass jeder Punkt eine eigene Sperre besitzt, 64 was aufgrund des gemeinsamen Speichers notwendig ist. Nehmen wir an, dass wir zwei Wurzeln r1 und r2 haben. Nehmen wir weiter an, dass wir den Vorgängerzeiger von r1 auf r2 umsetzen wollen. Um nun eine Union-Operation zum Verschmelzen auszuführen ist das erfolgreiche Anfordern einer Sperre auf r1 erforderlich. Nun kann es passieren, dass der Thread, der die Sperre anfordert, warten muss, da ein anderer Thread bereits die Sperre des Punkts gesetzt hat. Nun gibt es zwei Möglichkeiten: 1. Wenn r1 weiterhin eine Wurzel ist wird der Vorgängerzeiger von r1 auf r2 gesetzt 2. Falls ein anderer Thread den Vorgängerzeiger geändert hebt der Thread die Sperre auf und setzt die Ausführung von der momentanen Position fort ), da die BerechnunDie Laufzeit für den ersten Teil des Algorithmus liegt bei O( n∗log(n) p gen identisch mit dem DSDBSCAN-Algorithmus sind und völlig unabhängig voneinander ausgeführt werden können. Für den zweiten Teil, die Vereinigung der lokalen Clusterings, hängt die Laufzeit maßgeblich von der Größe der Menge Yt ab, wobei für jedes Element aus Yt eine Vereinigung mit Algorithmus 3.14 ausgeführt wird. Da die diese solange ausgeführt wird, bis beide Bäume auf die gleiche Wurzel zeigen, beträgt die Laufzeit maximal O(log(n)). Patwary et al. verweisen in [PPA+ 12] darauf, dass in der Praxis nur in sehr wenigen Fällen Sperren andere ausführende Threads blockieren, da sich die Punktpaare in Yt oft im gleichen Baum befinden, so dass die Laufzeitkomplexität für die Vereinigung ). Folglich kann die Laufzeit des gesamten PDSDBSCAN-Algorithmus grob O( Yt ∗log(n) p ) abgeschätzt werden. mit O( (n+Yt )∗(log(n)) p 3.2.3 MapReduce-DBSCAN Massiv angestiegene Datenmengen haben zu verschiedenen Vorschlägen wie etwa PDBSCAN [XJK99] oder PDSDBSCAN [PPA+ 12] geführt, um den bekannten DBSCANAlgorithmus (Abschnitt 2.3.2) zu parallelisieren. Diese haben jedoch als Nachteil, dass die gesamte Parallelisierung des Verfahrens durch die Entwickler in Form von Nachrichtenaustausch oder gemeinsam genutztem Speicher behandelt werden muss. Eine Alternative zu klassischen Ansätzen ist das MapReduce-Framework (Abschnitt 2.5), welches die Details der Parallelisierung vor den Entwicklern verborgen hält. [HTL+ 14] präsentieren eine Variante des DBSCAN-Algorithmus, die das MapReduce-Paradigma zu Parallelisierungszwecken nutzt. Der Ablauf des Algorithmus ist in Abbildung 3.5 abgebildet. Im ersten Schritt, der Datenpartitionierung, wird ein zunächst ein Profil der zu clusternden Daten erstellt. Es wird zuerst die Anzahl der Punkte in den Rohdaten und die räumliche Verteilung ermittelt. Basierend hierauf wird ein rechteck-basiertes Partitionierungsprofil des Datenraumes erstellt. Dieses ist die Grundlage für den nächsten 65 1. Datenpartitionierung 3. Globale Vereinigung Profil der Aufteilungsbereiche 1.1: Zellen-basierte statistische Analyse 1.2: Partitionierung Mengen der Mergekandidaten Statistisches Profil 3.1: Erzeuge MergeAbbildung Punktdaten globales Abbildungsprofil Partionierungsprofil 2: lokales Clustering Rohdaten Metadaten 3.2: Umbenennung der Daten Daten mit globalen Cluster IDs Abbildung 3.5: Ablauf des MR-DBSCAN-Algorithmus [HTL+ 14] Schritt, das lokale Clustering. In diesem wird der eigentliche Clustering-Algorithmus für jeden Teilraum, der aus dem Partitionierungsprofil hervorgeht, ausgeführt. Hierfür wird eine Abwandlung des DBSCAN-Algorithmus verwendet, der an einigen Stellen an die Einschränkungen einer MapReduce-Implementierung, die fehlende Kommunikationsmöglichkeit zwischen parallelen Berechnung, angepasst wurde. Im dritten und letzten Schritt, der globalen Vereinigung, werden dann die Teilergebnisse der lokalen Clusterings mittels eines Profils der Aufteilungsbereiche zu einem globalen Clustering aggregiert. Vorbereitung der Partitionierung Eine wichtige Aufgabe fällt der Vorverarbeitung zu, da in dieser das Partitionierungsschema für den Datensatz festgelegt wird. Dieses hat aufgrund der ungleichmäßigen Verteilung von Objekten in Datensätzen zentralen Einfluss auf die Lastverteilung, da nur durch eine gleichmäßige Partitionierung, die an die Datenverteilung angepasst ist, ein optimaler Durchsatz erreicht werden kann. Um ein angepasstes Partitionierungsschema zu erstellen besteht eine mögliche Lösung darin, einen räumlichen Index, wie etwa einen R-Baum oder kD-Baum, aufzubauen. Jedoch gestaltet sich dieses wegen des großen Umfangs der Datensätze schwierig, da hierarchische Indexstrukturen bei umfangreichen Datenmengen selber einen erheblichen Platzbedarf besitzen; somit kann also bereits die räumliche Indexierung nicht mehr effizient handhabbare Dimensionen erreichen. [HTL+ 14] schlagen als Grundlage eine binäre Partitionierung des Raumes (BSP: binary space partitioning), um eine effiziente Aufteilung des Datenraumes in nicht überlappende Rechtecke zu erreichen und daraus Partitionen zu erzeugen. Auf Grundlage vordefinierter Regeln wird durch BSP rekursiv eine Aufteilung des Raumes in zwei kleinere Teilräume vorgenommen. Eine weit verbreitete Regel ist die möglichst gleichförmige 66 Aufteilung von Punkten (ESP: even split partitioning). Eine andere Regel zielt auf eine Minimierung der Anzahl von Punkten innerhalb des Randbereiches von Partitionen ab (RBP: reduced-boundary partitioning). Dies ist deshalb erstrebenswert, da die Partitionen überlappende Ränder mit einer Breite von ε, dem Radius der Punktnachbarschaft im zwei-dimensionalen Fall, haben, die auf mehrere Partitionen kopiert werden und in der globalen Clustervereinigung beachtet werden müssen. Diese Regel kann also die Kommunikationskosten reduzieren, wenn eine geringe Anzahl von Punkten in mehreren Partitionen vorkommt und somit weniger Kandidaten während der Berechnung der Clustervereinigung betrachtet werden müssen. Jedoch haben die Ansätze Nachteile. So fußt der ESP-Ansatz auf der Annahme, dass die Laufzeit des DBSCAN-Algorithmus proportional zur Anzahl der Punkte ist; faktisch hängt diese jedoch maßgeblich von der Datenverteilung ab. Der RBP-Ansatz hingegen bewertet die Kommunikationskosten des lokalen Clusterings und der globalen Clustervereinigung als am teuersten, was jedoch nur für kleine Datenmengen und eine nicht parallel durchgeführte Clustervereinigung stimmt. Um diesen Nachteilen Herr zu werden schlagen [HTL+ 14] eine kosten-abhängige Partitionierung (CBP: cost-based partitioning) vor (Algorithmus 3.15). Vor Beginn der Partitionierung wird SU , das minimal einschließende Rechteck, das alle Punkte des Datensatzes umschließt, in ein Raster mit n×n Zellen mit n = 1/2 unterteilt. Für jede der Zellen des Rasters wird die Dichte, also die Anzahl der in der Zelle enthaltenen Punkte, berechnet. Algorithmus 3.15 Kostenbasierte räumliche Aufteilung ([HTL+ 14] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: input: SU - mbr, that contains all points input: nPointsOfCells - array containing number of points for each cell input: maxCost - maximum cost allowed for a cell S output: set of non-overlapping rectangles satisfying SU = Si procedure CostBasedPartitioning(SU , nPointsOfCells, maxCost) taskQueue.add(SU ) partitions = ∅ while taskQueue is not empty do S = taskQueue.remove() if EstimateCost(S, nPointOfCells) > maxCost then S1 , S2 = CostBasedBinarySplit(S, nPointsOfCells) taskQueue.add(S1 , S2 ) else partitions.add(S) return partitions Der Algorithmus (3.15) startet, indem einer abzuarbeitenden Warteschlange taskQueue das minimal-einschließende Rechteck des gesamten Datensatzes übergeben wird. Die Warteschlange wird nun so lange abgearbeitet, bis keine Rechtecke mehr in ihr enthalten sind. Zu Beginn der Schleife (Zeile 5) wird ein Element der Warteschlange entnommen 67 und mit Hilfe der Funktion EstimateCost (Algorithmus 3.17) die Kosten der Zelle berechnet. Wenn diese über einen festen Wert maxCost liegen, wird das Rechteck S mit der Funktion CostBasedBinarySplit (Algorithmus 3.16) in zwei Teil-Rechtecke S1 , S2 aufgeteilt und der Aufgabenwarteschlange hinzugefügt (Zeile 6-8). Sind die Kosten kleiner, so wird das Rechteck einer Liste von Partitionen hinzufügt (Zeile 9-10). Diese Liste wird nach Abbruch der Schleife zurückgegeben und stellt die Partitionierung des Datensatzes in Form von nicht überlappenden Rechtecken dar, welche den Datenraum vollständig abdeckt. Die Zerteilung eines Rechtecks durch die Funktion CostBasedBinarySplit (Algorithmus 3.16) erfolgt durch vertikale und horizontale Teilungslinien, die entlang der Zellengrenzen verlaufen. In den Zeilen 5 bis 12 durchläuft der Algorithmus alle Teilungslinien und wählt die beiden aus der Teilung resultierenden Rechtecke aus, die die geringste Kostendifferenz haben. Algorithmus 3.16 Kostenbasierte binäre Raumaufteilung ([HTL+ 14] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: input: S - rectangle to be split input: nPointsOfCells - array cotaining number of points for each cell output: S1 , S2 - split result of input rectangle S function CostBasedBinarySplit(S, nP ointsOf Cells) splitLineCandidates = vertical and horizontal lines that split S in two subrectangles minCostDiff = ∞ (S1 , S2 ) = (null, null) for each splitLine in splitLineCandidates do (S10 , S20 ) = sub-rectangles corresponding to splitLine S10 .cost = EstimateCost(S10 , nPointsOfCells) S20 .cost = EstimateCost(S20 , nPointsOfCells) costDif f = |S10 .cost − S20 .cost| if costDiff < minCostDiff then minCostDiff = costDiff (S1 , S2 ) = (S10 , S20 ) return (S1 , S2 ) Ein Beispiel für eine solche Unterteilung ist in Abbildung 3.6 abgebildet. Auf der linken Seite ist die Punkt-, auf der rechten die Kostenverteilung über die Zellen dargestellt. Die Kandidaten für vertikale und horizontale Teilungslinien, von denen es in diesem Beispiel insgesamt acht gibt, sind gepunktet gezeichnet und stellen gleichzeitig die Ränder der zugrunde liegenden Zellen im Inneren des umschließenden Rechtecks dar. Die gewählte Trennungslinie, die das Rechteck in zwei Teilrechtecke unterteilt, so dass die Kostendifferenz zwischen den Partitionen minimal ist, ist fett gezeichnet und verläuft entlang der Trennlinie zwischen der ersten und zweiten Spalte. Für einen maxCost-Wert von 104 wäre die Unterteilung in Abbildung 3.6 bereits eine gültige finale Partitionierung; 68 für eine maxCost-Wert von 100 wäre eine weitere Unterteilung der rechten Partition durchzuführen. 10.8 99 10.8 27.6 10.8 10.8 10.8 10.8 10.8 Abbildung 3.6: Beispiel für kostenbasierte Partitionierung [HTL+ 14] Die Kostenabschätzung erfolgt mit der Funktion EstimateCost (Algorithmus 3.17). Sie erhält als Eingabe ein Rechteck sowie ein Array, welches Informationen zu der Punktdichte für jede Zelle enthält. Die Gesamtkosten für ein Rechteck setzen sich zusammen aus den Kosten für jede Zelle, die das Rechteck überdeckt. Algorithmus 3.17 Kostenabschätzung für ein Rechteck ([HTL+ 14] angepasst) input: S - rectangle, that represents a partition input: nP ointsOf Cells - array containing number of points in each cell output: cost - cost of given partition scheme function EstimateCost(S, nP ointsOf Cells) cost = 0 for each cell in S do nP oints = number of points in cell, gathered with help of numP ointsOf Cell costsOf Cell = cost computation using nP oints by Eq. 3.1 cost = cost + costsOf Cell 10: return cost 1: 2: 3: 4: 5: 6: 7: 8: 9: Die formale Berechnung der Kosten für ein Zelle erfolgt mit den Formeln in 3.1. Die erste Formel W (S) besagt, dass sich die Kosten eines Rechtecks S zusammensetzen aus den Kosten für alle Zellen ci , die von S überdeckt werden. Die Kosten der einzelnen Zellen wiederum setzen sich zusammen aus der Anzahl der in der Zelle enthaltenen Punkte N ci und einer Abschätzung der Speicherzugriffskosten DA(N ci ), die durch Nachbarschaftsanfragen im DBSCAN-Algorithmus zu erwarten sind. Wenn die Zugriffskosten der Einfachheit halber auf 1 gesetzt werden, so sind die Kosten einer Zelle die Anzahl der 69 darin enthaltenen Punkte. Eine detailliertere Beschreibung der Kosten ist in [HTL+ 14] ausgeführt. W (S) = X W (ci ) mit S = [ ci (3.1) W (ci ) = N ci ∗ DA(N ci ) Die Laufzeit der Partitionierung setzt sich aus drei wesentlichen Teilen zusammen, der aufrufenden Funktion, der Funktion zur binären Rechteck-Teilung und der Kostenschätzung. Die Laufzeit für die Kostenabschätzung, vorausgesetzt, dass einfach nur die Summe der in einem Raster enthaltenen Punkte gebildet wird, beträgt im schlechtesten Fall O(n)2 , wenn ohne Optimierung das ganze Raster durchsucht werden muss. Jedoch ist es möglich, aus den Koordinaten des Rechtecks die überdeckten Zellen des Rasters zu berechnen, so dass in diesem Fall lediglich eine Laufzeit von O(n) benötigt wird. Für binäre Rechteckteilung müssen jeweils für jede horizontale und vertikale Teilungslinie die Kosten für die resultierenden Rechtecke geschätzt werden, so dass eine Laufzeit von O(4 · n2 ) für die optimierte Fassung der Kostenschätzung benötigt wird. Wenn für die binäre Rechteckaufteilung eine Laufzeit von O(n2 ) angenommen wird, so lässt sich die rekursive Funktion zur Erzeugung der kostenbasierten Partitionierung mit Hilfe des Master-Theorems auf eine Laufzeit von Θ(n2 ) abschätzen. Partitionierung der Daten Ziel der Partitionierung ist eine Aufteilung des Datensatzes, damit die Clustering-Aufgabe auf mehrere Berechnungen verteilt wird, die unabhängig voneinander durchgeführt werden können, also keinen Zugriff auf Punkte außerhalb der Partition benötigen. Ein einfaches Beispiel für eine Partitionierung ist in Abbildung 3.7 dargestellt. Das minimal einschließende Rechteck des Datenraumes SU wurde hier bereits in zwei Rechtecke S1 und S2 unterteilt. Am Beispiel der Nachbarschaft des Punktes px wird deutlich, dass, wenn eine Partition P1 nur die Punkte aus S1 enthält, nicht feststellbar ist, ob px ein Kernpunkt ist, da die Nachbarschaft von px eventuell Punkte aus S2 enthält. Folglich ist es notwendig, Punkte aus benachbarten Rechtecken zu einer Partition hinzuzufügen, um für jeden Punkt einer Partition die Klasse bestimmen zu können. Zu diesem Zweck wird ein Rechteck Si um ε nach außen erweitert, welches als äußeres Rechteck von Si (OR - outer rectangle) bezeichnet wird und einen äußeren Rand (OM - outer margin) besitzt. Analog wird ein weiteres Rechteck um ε nach innen verkleinert, welches als inneres Rechteck von Si (IR - inner rectangle) mit einem inneren Rand (IM - inner margin) bezeichnet wird. Punkte, die innerhalb von IR liegen, zeichnen sich dadurch aus, dass sie für die Vereinigung der lokalen Cluster nicht mehr betrachtet werden müssen, da diese außerhalb der Überlappungsbereiche angrenzender Partitionen liegen. 70 OR(S1) OR(S2) S1 S2 IR(S2) IR(S1) px ε ε Abbildung 3.7: Beispiel einer Partitionierung mit ε-Randbereichen [HTL+ 14] Insgesamt bildet die Menge aller Rechtecke eine minimal vollständige Partitionierungsmenge (MCPS: minimum complete partition set). Für eine Punktmenge DB und ein diese einschließendes Rechteck SU gilt, dass P = {P1 , P2 , . . . , Pm } ein MCPS von DB ist (Abbildung 3.8), wenn für die Rechtecke S1 , S2 , . . . , Sm der Durchschnitt Si ∩ Sj = ∅ für i 6= j und ∪m i=1 Si = SU ist, wobei Pi = OR(Si ) ist. Für die Implementierung der Partitionierung ist zuerst ein MapReduce-Job auszuführen, der die Statistiken der Datenverteilung erfasst. In der Map-Funktion wird das minimal einschließende Rechteck des Datensatzes SU in kleine quadratische Zellen mit der Kantenlänge 2 aufgeteilt. In der Shuffle-Phase des MapReduce-Frameworks werden die Punkte nun nach den Partitionen, in denen sie enthalten sind, sortiert. Die ReduceFunktion generiert hieraus ein Profil, welches die Anzahl von Punkten in jeder Zelle enthält. Abschließend wird ein MCPS berechnet, welches mit den Algorithmen aus den geschätzten Berechnungskosten für die Datenverteilung abgeleitet wird. Das Ergebnis wird als Datei (file-partition-index) ausgegeben und enthält die unteren linken und oberen rechten Ecken der Rechtecke des MCPS und bildet so einen Partitionierungsindex, welcher für die Zuweisungen der Punkte zu Partitionen in Form von Identifikatoren im nächsten Schritt benötigt wird. 71 S4 S0 S5 S3 S1 S2 Abbildung 3.8: Beispiel einer minimal vollständigen Partitionierungsmenge [HTL+ 14] Lokales Clustering Für das Clustering der Partitionen wird eine leicht abgewandelte Version des DBSCANAlgorithmus (Seite 27) verwendet, in der eine Modifikationen in der Suche und Erweiterung von Clustern vorgenommen wird (Algorithmus 3.18). Es wird für jeden Punkt zusätzlich zum Cluster-Identifikator ein Flag eingeführt, welches explizit die Klassifikation in Kern-, Rand- und Rauschpunkte enthält und eine spätere Unterscheidung der Punkte ermöglicht. Die erste Änderung findet sich in Zeile 4, in welcher der Rauschpunkt keinen Clusteridentifikator zugewiesen bekommt, der ihn implizit als Rauschpunkt identifiziert, sondern explizit ein Flag gesetzt wird, dass den Punkt explizit als Rauschpunkt ausweist. Eine weitere kleine Änderung findet sich in Zeile 8, in der für einen Punkt mit einer ausreichend großen Nachbarschaft das Flag ’CORE’ gesetzt wird, um ihn als Kernpunkt zu markieren. Die größte Änderung findet sich jedoch in der Clusterweiterung in den Zeilen 10 bis 22. So wird zum einen bei Punkten, die über die Nachbarschaftabfrage erreicht wurden, explizit überprüft, ob diese noch nicht besucht worden sind (’UNCLASSIFIED’) oder bereits als Rauschpunkte (’NOISE’) markiert wurden. Zum anderen wird für erreichte unklassifizierte Punkte, die nun ihrerseits eine Abfrage starten, ein Flag ’CORE’ gesetzt (Zeile 16), wenn die Größe der Nachbarschaft den Schwellenwert überschreitet, und ein Flag ’BORDER’ gesetzt, wenn dies nicht der Fall ist (Zeile 19). Die Laufzeitkomplexität des modifizierten DBSCAN-Algorithmus liegt, wie beim normalen DBSCAN-Algorithmus bei O(n · log(n)). 72 Algorithmus 3.18 Modified DBSCAN - Clustererweiterung ([HTL+ 14] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: input: points - dataset to be clustered input: point - starting point for cluster expansion input: clusterId - identificator of the current cluster input: ε - size of the neighborhood input: minPts - points required for a core point output: boolean - indicates, whether cluster was found procedure ExpandCluster(points, point, clusterId, ε, minP ts) seeds = points.regionQuery(point, eps) if seeds.size < minPts then point.setFlag(NOISE ) return FALSE else points.changeClusterIds(seeds, clusterId ) points.setFlag(CORE ) seeds.delete(point) while seeds 6= empty do currentP oint = seeds.first() if currentP oint.clusterId == UNCLASSIFIED then currentP oint.setClusterId(clusterId ) result = points.regionQuery(currentPoint, ε) if result.size ≥ minPts then currentP oint.flag == CORE seeds.append(resultPoint) else currentP oint.flag == BORDER else if currentP oint.flag == NOISE then currentP oint.setClusterId(clusterId ) currentP oint.setFlag(BORDER) seeds.delete(currentP oint) return TRUE; Das eigentliche Clustering findet in einem eigenen MapReduce-Auftrag statt. Zuerst wird mittels der Map-Funktion (Algorithmus 3.19) der Datenbestand auf verschiedene Partitionen verteilt. Dieses geschieht, indem die eingegebenen Schlüssel-Wert-Paare, die keinen Schlüssel und als Wert den Punkt enthalten, mit dem Partitions-Identifikator versehen und ausgegeben werden. Der Partitionierungsindex partitionIndex wird zu Beginn der Ausführung des Map-Tasks aus den Ausgabedaten des ersten Schrittes, der Datenpartitionierung, erzeugt. Für jeden 73 Algorithmus 3.19 Lokales Clustering([HTL+ 14] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: input: - hkey : null, value : pointi output: key - partition id, value - point object procedure Map(key, value) partitionIndex = readFromFile(partition-index-file) partitionIDs = getPartitionIDsForPoint(partitionIndex, point) for each partitionId in partitionIDs do Output (h partitionId, point i) input: key - partitionId, value - all points in partition P(partitionId) input: context - minP ts: points required for core points, ε: neighborhood size procedure Reduce(key, value) rT ree = createRTree(value) S = rectangle of partitionId for each unvisited p inside of S do cluster points with algorithm 3.18 using rT ree for each p in P do OutputToFile(hp, (p.clusterId, p.f lag)i, file-results-partitionID) for each p in P do if p.f lag == CORE and p inside IM (S) then OutputToFile (hp, (p.clusterId, p.f lag)i, file-AP-partitionID) else if p.f lag != N OISE and p inside OM (S) then OutputToFile (hp, (p.clusterId, p.f lag)i, file-BP-partitionID) Punkt point kann über den Index partitionIndex mit der Funktion getPartititionIDsForPoint aus den Koordinaten der Partitionierungsrechteckee bestimmt werden, in welchem Rechteck und somit in welcher Partitionen dieser enthalten ist. Daraus wird für jede enthaltene Partition ein Schlüssel-Wert-Paar hpartitionId, pointi erzeugt und ausgegeben. Nach Abschluss der Map-Phase werden die Schlüssel-Wert-Paare dann vom MapReduceFramework gemischt und nach den Partitions-Identifikatoren gruppiert. Punkte mit dem gleichen Identifikator werden an die Reduce-Funktion übergeben, in welcher das lokale Clustering durchgeführt wird. Das Clustering erfolgt in der Reduce-Phase (Algorithmus 3.19) mit dem modifizierten DBSCAN-Algorithmus (3.18). Für eine effiziente Unterstützung der Nachbarschaftsanfragen verwendet der Algorithmus einen R-Baum als räumlichen Index, der alle Punkte der Partition enthält. Die Ergebnisse des Clustering einer Partition werden in drei verschiedene Ausgabedateien geschrieben. Das Gesamtergebnisse, alle Punkte innerhalb von S, werden eine Datei ’file-results-partitionID’ geschrieben. Kernpunkte, die innerhalb des inneren Randes (S − IR(S)) liegen, werden in eine Datei ’file-AP-partitionID’ geschrieben, Kern- und 74 Randpunkte im äußeren Rand (OR(S) − S) in eine Datei ’file-BP-partitionID’. Lediglich diese Bereiche, die von anderen angrenzenden Partitionen überdeckt werden oder andere angrenzende Partitionen überdecken, sind zu betrachten, wenn die Vereinigung der lokalen Cluster durchgeführt wird. Außerdem werden die Partitions-Identifikatoren in die Cluster-Identifikatoren eingearbeitet, so dass alle lokalen Cluster global eindeutige Identifikatoren besitzen. Globale Clustervereinigung Um das Clustering zu vervollständigen ist eine Vereingung der lokalen Clusterings in ein globales Clustering notwendig. Grob lässt sich dieser Schritt aufteilen in die Erzeugung einer Vereinigungs-Abbildung und das Umbenennen der Daten. Für die VereinigungsAbbildung ist es zunächst notwendig, alle Partitionspaare zu finden, die eine gemeinsame Schnittmenge besitzen. Ziel ist es, für zwei Partitionen P1 und P2 alle Paare lokaler Cluster C1 ⊂ P1 und C2 ⊂ P2 zu finden, so dass C1 und C2 dem selben globalen Cluster angehören. Anschließend werden die globalen Cluster berechnet und eine Abbildung bestimmt, die die lokalen Cluster auf globale Cluster abbildet. Hierbei gilt es festzuhalten, dass ein Cluster C1 mit einem Cluster C2 vereinigt werden kann, wenn es einen Punkt p ∈ C1 ∩ C2 gibt, wobei p ein Kernpunkt in C1 oder C2 ist. p wird in diesem Kontext auch als Vereinigungs-Punkt von C1 oder C2 bezeichnet. Auf dieser Grundlage ist es einfach, alle Vereinigungspunkte der Partitionen P1 und P2 zu bestimmen, zumal zum Abschluss des lokalen Clusterings bereits alle Punkte innerhalb der Randbereiche gesondert als Kandidaten für die Vereinigung von Clustern ausgegeben werden. Es ist lediglich zu überprüfen, dass einer der Punkte ein Kernpunkt in P1 oder P2 ist. Diese Kandidaten lassen sich relativ einfach über den Join (AP1 ∩ BP2 ) ∪ (AP2 ∩ BP1 ) bestimmen. Weiterhin gilt, dass die Vereinigung lokaler Cluster zwischen zwei überlappenden Partitionen unabhängig von allen anderen Partitionen ist und somit parallel ausgeführt werden kann. Sobald alle Paare zu vereinigender lokaler Cluster bekannt sind, ist es einfach, mit einem graph-basierten Algorithmus die globalen Cluster zu berechnen. Diese Abbildungen lassen sich durch einen MapReduce-Job berechnen der aus der MapFunktion in Algorithmus 3.20 und der Reduce-Funktion in Algorithmus 3.20 besteht. In der Map-Funktion (3.20) für zwei Partitionen A und B werden zunächst die jeweiligen Listen mit Kernpunkten im inneren Rand und Kern- und Randpunkten des äußeren Randes geladen. Mit diesen wird über den bereits zuvor beschrieben Join in Zeile 6 die Menge der zu vereinigenden Cluster-Paare berechnet. Die ausgegebenen Schlüssel-WertPaare enthalten keinen Schlüssel, damit diese zum Aufbau der globalen VereinigungsAbbildung nur an einen Reducer übergeben werden. Eine beispielhafte Situation ist in Abbildung 3.9 dargestellt, anhand derer der Aufbau eines Vereinigungs-Mappings verdeutlicht werden soll. Nachdem für jede Partition der innere Rand (S − OR(S)) und der äußere Rand (OR(S) − S) eingelesen wurde, gilt es 75 Algorithmus 3.20 Erzeugung einer Vereinigungs-Abbildung ([HTL+ 14] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: input: key - partition id, value - partition id output: key - null, value - pair of cluster ids procedure Map(key, value) AP1 = read file-AP-key BP1 = read file-BP-key AP2 = read file-AP-value BP2 = read file-BP-value for each p ∈ (AP1 ∩ BP2 ) ∪ (AP2 ∩ BP1 ) do A.clusterId = p.clusterId in partition A B.clusterId = p.clusterId in partition B Output (hnull, (A.clusterId, B.clusterId)i) input: key - null, value - pair of cluster ids output: key - local cluster id, value - global cluster id procedure Reduce(key, value) initialize empty graph G for each item in values do if vertex item.A.clusterId is not in G then G.addVertex(item.A.clusterId) if vertex item.B.clusterId is not in G then G.addVertex(item.B.clusterId) if edge (item.A.clusterId, item.B.clusterId) is not in G then G.addEdge((item.A.clusterId, item.B.clusterId)) for each connected component C in G do globalClusterId = next unique global cluster id for each vertex localClusterId in C do Output hkey=localClusterId, value=globalClusterId i for each localClusterId not in G do globalClusterId = next unique global cluster id Output (hlocalClusterId, globalClusterId i) die Schnittmenge (AP1 ∩ BP2 ) ∪ (AP2 ∩ BP1 ) zu berechnen. Hierbei ist (AP1 ∩ BP2 ) = {p1 , p2 , p3 , p4 } und (AP2 ∩ BP1 ) = {p5 , p6 , p7 }, so dass insgesamt (AP1 ∩ BP2 ) ∪ (AP2 ∩ BP1 ) = {p1 , p2 , p3 , p4 } ∪ {p5 , p6 , p7 } ist. Für jeden Punkt diese Menge wird nun ein Schlüssel-Wert-Paar ausgegeben, welches als Wert das Paar der Clusteridentifikatoren enthält, dem der Punkt in der jeweiligen Partition angehört. Für die Punkte p1 . . . p7 ergeben sich so die folgenden Paare: • h null, (S1 1, S2 1) i • h null, (S1 1, S2 1) i 76 • h null, (S1 2, S2 1) i • h null, (S1 2, S2 1) i • h null, (S2 1, S1 2) i • h null, (S2 1, S1 2) i • h null, (S2 1, S1 1) i OR(S1) OR(S2) S1_1 S1 S2 S1_2 IR(S2) IR(S1) p1 ε ε S2_1 minPts=3 p7 p2 ε p6 p3 p5 p4 Abbildung 3.9: Beispiel eines Vereinigungs-Mappings Der Reducer (3.20) baut aus diesen Abbildungen einen ungerichteten Graph auf. Jeder Cluster-Identifikator in den Schlüssel-Wert-Paaren wird durch einen Knoten im Graph repräsentiert (Zeile 4-7), jedes übergebene Paar von Cluster-Identifikatoren wird durch eine Kante im Graph dargestellt (Zeile 8-9). Aus dem Graph lassen sich schließlich die globalen Cluster einfach ableiten, da jeder zusammenhängende Teilgraph einem globalen Cluster entspricht. Für jeden zusammenhängenden Teilgraphen werden Schlüssel-WertPaare ausgegeben, die die Identifikatoren der lokalen Cluster auf einen globalen Identifikator abbilden (Zeile 10-13). Außerdem werden auch für jeden alleinstehenden Cluster ein Schlüssel-Wert-Paar ausgegeben, welches den lokalen Cluster-Identifikator auf einen globalen Identifikator abbildet (Zeile 14-16). Aus den zuvor generierten Schlüssel-Wert-Paaren wird nun nach Algorithmus 3.20 ein ungerichteter Graph aufgebaut, der in Abbildung 3.10 dargestellt ist. Folglich gehören alle Punkte aus beiden Partitionen zum selben Cluster, da die jeweiligen lokalen Cluster eine zusammenhängende Komponente im Graphen bilden. 77 S1_1 S2_1 S1_2 Abbildung 3.10: Graph-Repräsentation des Vereinigungs-Mappings Algorithmus 3.21 Umbenennung der Daten ([HTL+ 14] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: input: key - point object, value - cluster id and classfication flag input: mergeMapping - stored in a hash map output: key - point object, value - cluster id and classification flag procedure Map(key, value) globalClusterId = mergeMapping[p.clusterId] p.clusterId = globalClusterId Output (hp, (p.clusterId, p.flag)i) input: key - point p, value: set of (clusterId, flag)-pairs output: key - point p, value: (clusterId, flag)-pair procedure Reduce(key, value) clusterId = null f lag = N OISE for each pair in value do if clusterId is null then clusterId = pair.p.clusterId if pair.p.flag == BORDER then f lag = BORDER else if pair.p.flag == CORE then f lag == CORE break Output (hp, (clusterId, flag)i) Um das finale Ergebnis zu erhalten müssen die lokalen Cluster-Identifikatoren aller Punkte substituiert werden und der Typ dieser bestimmt werden. Auch dies wird durch einen MapReduce-Job gelöst. Die Eingabe für den Job besteht aus den Dateien ’file-resultspartitionId’, welche die vollständigen Ergebnisse der jeweiligen lokalen Clusterings enthält. In der Map-Phase (Algorithmus 3.21) werden die lokalen Cluster-Identifikatoren mit Hilfe der Vereinigungs-Abbildung ersetzt (Zeile 2-3) und der Punkt zusammen mit dem 78 finalen Cluster-Identifikator und dem Typ, der im lokalen Clustering bestimmt wurde, ausgegeben. Für Punkte in den Randbereichen gilt hierbei, dass diese an mehreren lokalen Clusterings beteiligt gewesen sein und somit verschiedene Typen besitzen können; das Ergebnis der Map-Phase kann also Paare mit gleichem Schlüssel jedoch verschiedenen Werten liefern. Durch die Shuffle-Phase werden die Paare nach dem Schlüssel gruppiert, so dass sich Duplikate leicht identifizieren lassen. Für diese ist im Detail der Typ zu bestimmen. Die Typisierung folgt einer einfachen Regel: der Typ eines Punktes wird durch die signifikanteste gefundene Klassifikation definiert. Diese ist absteigend geordnet nach ’CORE’, ’BORDER’ und ’NOISE’. Punkte ohne Duplikate werden direkt ausgegeben. Die so produzierte Menge von Schlüssel-Wert-Paaren repräsentiert nun das finale Ergebnis und somit das globale Clustering. 3.2.4 Shared-memory SLINK (SHRINK) Der SHRINK-Algorithmus [HPA+ 12] ist ein paralleler Algorithmus zur Berechnung eines hierarchischen Clusterings, der für Systeme mit gemeinsamen Speicher entworfen wurde. Die Grundidee des Algorithmus besteht in einer Aufteilung des eigentlichen Problems, der Erzeugung eines globalen hierarchischen Clusterings, in überlappende Teilprobleme annähernd gleicher Größe, die schließlich zu einer Gesamtlösung vereinigt werden. Hierfür wird eine Partitionierung des Datenraumes vorgenommen und auf den Partitionen mit einer modifizierten Version des SLINK-Algorithmus [Sib73] das jeweilige hierarchische Clustering in Form eines Dendrogramms bestimmt. Die Gesamtlösung wird abschließend aus den überlappenden Teillösungen kombiniert, indem die Dendrogramme der lokalen Clusterings zu einem globalen Dendrogramm vereinigt werden. Algorithmus 3.22 Ablauf des SHRINK-Algorithmus ([HPA+ 12] überarbeitet) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: input: D - dataset to be clustered, p - number of partitions output: complete dendrogram of dataset D procedure Shrink(D, p) partition D into p subsets S = {S1 , S2 , . . . , Sp } let M be an empty set for all (Si , Sj ) ∈ S|i 6= j in parallel do dendrogram d = Mod-Slink(Si ∪ Sj ) add dendrogram d to set M repeat merge dendrograms in M pairwise as shown in 3.11 with the function Merge until M contains only one dendrogram in parallel return remaining dendrogram in M Zunächst werden in Algorithmus 3.22 die Teilmengen S des Datensatzes gebildet. Diese 79 entstehen durch die Aufteilung des Datenbestandes in p Teile. Jedes Paar (Si , Sj ) ∈ S für i 6= j wird einem separaten Thread zugewiesen, der für die Punktmenge ein hierarchisches Clustering berechnet (Zeile 5-7). Für das Clustering wird eine modifizierte Variante des SLINK-Algorithmus verwendet. Diese unterscheidet sich von dem Original in drei Punkten. Als erstes werden die für Distanzberechnungen verwendeten Punkte mitverfolgt und gespeichert. Dann werden bei gleichen Distanzen die Identifikatoren der Datenpunkte verglichen und der kleinere Identifikator gewählt. Als Drittes und Letztes werden die Dendrogramme repräsentiert durch eine Folge von Vereinigungen von nächsten Clusterpaaren. In Zeile 8-10 werden die Dendrogramme der Partitionen-Paare d aus der Liste M in ein einziges Dendrogramm zusammengeführt, das die Originallösung darstellt. Die Kombination erfolgt, indem über die Clustervereinigungen in aufsteigender Distanz iteriert wird und die Vereinigungen übernommen werden, die Datenpunkte in verschiedenen Clustern vereinigen; Vereinigungen hingegen, die Punkte zusammenführen, welche bereits demselben Cluster angehören, werden verworfen. Die Reihenfolge, in der die Teilprobleme zusammengeführt werden, ist hierbei nicht von Bedeutung. d1 d2 d3 d1 d4 d5 d3 d6 d5 d1 d1 Abbildung 3.11: Beispiel für den Ablauf der Vereinigung sechs lokaler Dendrogramme zu einem globalen Dendrogramm (nach [HPA+ 12]) Die Berechnung der Dendrogramme erfolgt über den Modified-SLINK-Algorithmus (Algorithmus 3.23). Zu Beginn werden drei Arrays erzeugt, welche das Endergebnis des Clusterings sowie Zwischenergebnisse speichern. Die Arrays result und M enthalten jeweils 3-Tupel der Form (P unkt, P unkt, Distanz), wobei ein 3-Tupel eine Clustervereinigung repräsentiert; das Array result wird für jeden Punkt mit Einträgen (−, −, ∞), das Array M mit Einträgen (−, −, −) initialisiert. Das Array clusterId enthält für einen Punkt die Information, zu welchem welchem Cluster er gehört; repräsentiert wird ein Cluster durch den Punkt mit der kleinsten ID, so dass Punkte eines Clusters schlussendlich auf diesen zeigen. Initialisiert wird dieses so, dass jeder Punkt sich selber als Vereinigungspartner besitzt (Zeile 4-6). Der Algorithmus durchläuft nun die ganze Punktmenge (Zeile 7-20), wobei die Position des Punktes in der Punktliste gleichzeitig seine globale ID darstellt. 80 Für jeden Punkt i ∈ D wird die Distanz zu jedem anderen Punkt j in der Liste M in der Form (D[i], D[j], d(D[i], D[j])) gespeichert (Zeile 8-9). d(D[j], D[i]) repräsentiert hierbei implizit über die Distanz die Höhe, in der die Vereinigung im Dendrogramm passiert. Es erfolgen jetzt zwei Durchläufe über alle Punkte j ∈ D\{i} (Zeilen 10-17 und 18-20). Zuerst wird überprüft, ob die Distanz der bisherigen Vereinigung der Punkte i und j in result[j] kleiner ist als die zuvor berechnete in M [j]. Wenn jedoch die Distanz der Vereinigung von i und j größer ist als die berechnete Vereinigung des Vorgängers von j, so wird dessen Zwischenergebnis M [j] auf das bisherige Ergebnis result[j] gesetzt (Zeile 12-13). Anschließend wird das Ergebnis für j in result[j] auf M [j] gesetzt und der Vorgänger von j auf i gesetzt (Zeile 14-15). Wenn weiterhin die Vereinigung von i und j in M [j] eine geringere Distanz als der Vorgänger von j besitzt, so wird das Ergebnis von M [clusterId [j]] auf M [j] gesetzt. In einer weiteren Schleife (Zeile 18-20) wird nun getestet, ob die Distanz der Clustervereinigung von Punkt j größer ist als die des Vorgänger von j. Ist dieses der Fall, so wird der Vorgänger von j auf i gesetzt. Zuletzt werden die Vereinigungen der Cluster, die in result gespeichert sind, nach aufsteigender Distanz sortiert zurückgegeben (Zeile 21-22). Algorithmus 3.23 Modified SLINK ([HPA+ 12] überarbeitet 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: input: D - dataset of points output: result: list of cluster merges sorted by increasing height procedure Mod-Slink(D) initialize array clusterId [i] with i for each i ∈ D initialize array result with (−, −, ∞) for each i ∈ D initialize array M with (−, −, −) for each i ∈ D for i ← 1 to |D| − 1 do for j ← 0 to i − 1 do M [j] ← (D[j], D[i], d(D[j], D[i])), at distance d(D[j], D[i]) for j ← 0 to i − 1 do if result[j] has a higher distance/id than M [j] then if result[j] has a lower distance/id than M [clusterId [j]] then M [clusterId [j]] ← result[j] result[j] ← M [j] clusterId[j] ← i else if M [j] has a lower distance/id than M [clusterId[j]] then M [clusterId[j]] ← M [j] for j ← 0 to i − 1 do if result[j] has a higher distance/id than result[clusterId[j]] then clusterId[j] ← i Sort the merges of result by increasing distance return result Der Algorithmus zur Vereinigung zweier Dendrogramme (Algorithmus 3.24) nutzt im Kern eine Union-Find Struktur, um die Cluster-Zugehörigkeit für jeden Datenpunkt zu 81 verwalten. Zu Beginn werden zwei Datenstrukturen initialisiert, eine Liste M3 , in welcher die Folge der Clustervereinigungen gespeichert wird und eine Union-Find-Struktur, die die Clusterzuordnungen der Datenpunkte festhält. Für die Union-Find-Struktur wird zudem angenommen, dass eine Vereinigung nur erfolgreich ist, wenn die beiden zu vereinigenden Punkte in verschiedenen Clustern liegen. Das Verfahren beginnt, indem aus den Vereinigungslisten der Cluster M1 und M2 die Vereinigungen mit der kleinsten Distanz zusammen mit den vereinigten Punkten ausgewählt werden. Zuvor ist zu überprüfen, ob Duplikate vorhanden sind, Vereinigungen also, die in beiden Vereinigungslisten enthalten sind. Wenn Duplikate vorhanden sind, ist eines der beiden Vorkommen zu entfernen. Wenn die Distanz h1 ∈ m1 nun kleiner als h2 ∈ m2 ist, so wird m1 zu M3 hinzugefügt, vorausgesetzt, dass die Cluster u1 , v1 ∈ m1 in der Union-Find-Struktur verschiedenen Clustern zugeordnet sind; m1 wird anschließend auf die Vereinigung mit der nächst kleinsten Distanz gesetzt. Das Vorgehen ist analog für den Fall, dass h1 keine geringere Höhe als h2 aufweist. Das Vorgehen wird solange fortgesetzt, wie M1 und M2 verbleibende Vereinigungen haben. Sobald nur noch eine Liste Vereinigungen enthält werden diese zu M3 hinzugefügt, sofern diese in verschiedenen Clustern enthalten sind. Abschließend wird als Ergebnis die neue Liste der Vereinigungen M3 zurückgegeben. Algorithmus 3.24 Vereinigung zweier Dendrogramme ([HPA+ 12] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: input: M1 , M2 - dendrograms output: M3 - merged dendrogram procedure Merge((M1 , M2 )) Initialize M3 to be empty Initialize Union-Find data structure with each unique point from M1 , M2 Let m1 = (u1 , v1 , h1 ) be the data points and distance of the lowest merge in M1 Let m2 = (u2 , v2 , h2 ) be the data points and distance of the lowest merge in M2 while M1 and M2 have merges remaining do if h1 < h2 then if union(u1 , v1 ) succeeds then Add m1 to M3 Let m1 = (u1 , v1 , h1 ) be the next lowest merge in M1 else if union(u2 , v2 ) succeeds then Add m2 to M3 Let m2 = (u2 , v2 , h2 ) be the next lowest merge in M2 Add remaining merges (u, v, h) of M1 or M2 to M3 as long as union(u, v) succeeds return M3 Die Laufzeit des SHRINK-Algorithmus setzt sich aus drei wesentlichen Teilen zusammen, der Partitionierung, dem Clustering und der Vereinigung. Hendrix et al. schätzen die Laufzeit für die Partitonierung mit O( √np ) ab, wobei p die Anzahl der Prozessoren, bzw. Threads ist. Die Laufzeit für das Clustering mit dem ModSLINK-Algorithmus beträgt O(n20 ) mit n0 als Anzahl der Punkte pro Partition. In diesem wird eine äußere Schleife 82 (Zeilen 7-20) n0 -mal durchlaufen, die beiden inneren Schleifen (Zeilen 8-9 sowie 10-20) werden ebenso maximal n0 -mal und benötigen jeweils nur konstante Zeit. Das Sortieren der Vereinigungen nach der Distanz wird in O(n0 ∗ log(n0 )) ausgeführt. Insgesamt kann 2 das Clustering somit in O( np ) ausgeführt werden. Die Vereinigung zweier Dendrogramme wird mit O(n ∗ α(n)) abgeschätzt, wobei α die inverse Ackermann-Funktion ist. Da die Vereinigung der Dendrogramme in den Zeile 7-9 jeweils paarweise in Form eines binären Baumes erfolgt, benötigt diese eine Laufzeit von O(log(p)), so dass sich für die gesamte Dendrogrammvereinigung eine Laufzeit von O(n ∗ log(p)) ergibt. Insgesamt beträgt die 2 Laufzeitkomplexität somit nach [HPA+ 12] O( np + √np + n ∗ α(n) ∗ log(p). Diese lässt n Prozessoren verwendet sich sogar auf O(n ∗ α(n) ∗ log(p)) reduzieren, indem p ∼ log(n) werden. Beispiel Um die Funktionsweise des SHRINK-Algorithmus zu verdeutlichen ist in Abbildung 3.12 ein Beispiel für ein solches abgebildet. Erzeugt werden soll das hierarchische Clustering eines Datensatzes, bestehend aus vier Punkten, der in drei Partitionen aufgeteilt ist. Im oberen Teil ist zunächst der Datenraum mit den Datenpunkten und Partitionierungen auf der linken Seite abgebildet, auf der rechten Seite die Resultate der drei möglichen Clusterings von paarweise disjunkten Partitionen in Form einer Liste. Wie bereits zuvor in der Beschreibung des Algorithmus erwähnt, ist die Reihenfolge, in der Teilergebnisse der paarweisen Clusterings zusammengefügt werden, irrelevant. In diesem Beispiel wird zunächst das Ergebnis A mit B vereinigt, anschließend {A, B} mit C. Für die erste Vereinigung werden die Vereinigungs-Listen aus A (M1 ) und B (M2 ) verwendet und zusätzlich eine Union-Find-Struktur für die Datenpunkte, die in M1 und M2 enthalten sind, initialisiert. Algorithmus 3.24 folgend, werden m1 und m2 mit den Vereinigungen initialisiert, die in den jeweiligen Teilergebnissen die kleinste Distanz besitzen. Da m1 < m2 wird die Vereinigung (2, 3, 0.5) gewählt und zu M3 hinzugefügt. Folglich wird auch die Union-Find-Struktur aktualisiert, um zu repräsentieren, dass die Punkte 2 und 3 dem gleichen Cluster angehören. m1 wird nun auf die Vereinigung mit der nächst größeren Distanz gesetzt. Auch im zweitem Schritt ist m1 < m2 und es wird m1 zu M3 hinzugefügt und die Union-Find-Struktur aktualisiert. Da M1 nun keine Vereinigungen mehr enthält werden nun die verbleibenden aus M2 zu M3 hinzugefügt, sofern sich die Punkte, die vereinigt werden, in verschiedenen Clustern befinden. In der zweiten Vereinigung werden nun die Vereinigungs-Listen aus dem vorherigen Schritt {A, B} sowie des Ergebnisses C aus dem Clustering der Partitionen D2 ∪ D3 zusammengeführt, um das finale Clustering zu erhalten. Auch hier werden m1 und m2 mit den Vereinigungen mit kleinster Distanz aus M1 und M2 initialisiert. Hier ist sofort zu sehen, dass die erste Vereinigung in M1 und M2 identisch ist; eine der beiden ist zu eliminieren. In diesem Beispiel wird die Vereinigung aus M2 entfernt. Die Vereinigungen werden nun nach dem bekannten Muster zu M3 hinzugefügt. Nachdem die Vereinigung (1, {2, 3}, 1.1) zu M3 hinzugefügt wurde, 83 D1 A = MOD-SLINK(D1 ᴜ D2): [(2, 3, 0.5), (1, {2,3}, 1.1)] 1 D2 2 3 B = MOD-SLINK(D1 ᴜ D 3): [(1, 4, 2.1)] D3 C = MOD-SLINK(D2 ᴜ D 3): [(2, 3, 0.5), ({2,3}, 4, 1)] 4 MERGE(A,B) M1 = [(2, 3, 0.5), (1, {2, 3}, 1.1)] M2 = [(1, 4, 2.1)] M3 = [(2, 3, 0.5)] 1 2 3 4 1 2 2 4 M3 = [(2, 3, 0.5), (1, {2, 3}, 1.1)] 1 2 3 4 1 1 1 4 M3 = [(2, 3, 0.5), (1, {2, 3}, 1.1), (1, 4, 2.1)] 1 2 3 4 1 1 1 1 m1(u1, v1, h1) = (2, 3, 0.5) m2(u2, v2, h2) = (1, 4, 2.1) m1(u1, v1, h1) = (1, {2,3}, 1.1) m2(u2, v2, h2) = (1, 4, 2.1) da A leer, Rest von B zu M hinzufügen MERGE({A,B}, C) M1 = [(2, 3, 0.5), (1, {2, 3}, 1.1), (1, 4, 2.1)] M2 = [(2, 3, 0.5), ({2, 3},4, 1)] M3 = [(2, 3, 0.5)] 1 2 3 4 1 2 2 4 M3 = [(2, 3, 0.5), ({2, 3}, 4, 1)] 1 2 3 4 1 2 2 2 M3 = [(2, 3, 0.5), ({2, 3}, 4, 1), (1, {2, 3}, 1.1)] 1 2 3 4 1 1 1 1 m1(u1, v1, h1) = (2, 3, 0.5) m2(u2, v2, h2) = ({2, 3}, 4, 1) m1(u1, v1, h1) = (1, {2, 3}, 1.1) m2(u2, v2, h2) = ({2, 3}, 4, 1) m1(u1, v1, h1) = (1, {2, 3}, 1.1) m2(u2, v2, h2) = (1, 4, 2.1) 1 2 3 4 Abbildung 3.12: SHRINK-Algorithmus - Beispiel lässt aus der mitgeführten Union-Find-Struktur ablesen, dass alle Datenpunkte dem selben Cluster angehören. Die verbleibende Vereinigung, (1, 4, 2.1) wird abschließend 84 nicht mehr zu M3 hinzugefügt, da die Punkte 1 und 4 bereits dem gleichen Cluster angehören. M3 enthält nun das finale Clustering des Datensatzes, dargestellt in Form einer Abfolge von Vereinigungen, welches am unteren Rand von Abbildung 3.12 als Dendrogramm dargestellt ist. 3.2.5 MapReduce k-Means Clustering Auch der sequentielle k-Means-Algorithmus (Abschnitt 2.3.2) ist nicht in der Lage, große Datenmengen effizient zu clustern, da der Hauptspeicher zu klein ist und alleine das Einlesen der Daten zu lange dauern würde. [ZMH09] stellen eine Implementierung des k-Means-Algorithmus für das MapReduce-Framework vor, der mit diesen Anforderungen umgehen kann. Das zugrundeliegende Prinzip des Algorithmus ist, dass die Zuweisung von Punkten zum nächsten Clusterschwerpunkt unabhängig von der Berechnung eines neuen Schwerpunktes durchgeführt wird. Zusätzlich kann die Zuweisung zum nächsten Clusterschwerpunkt parallel ausgeführt werden, da unter den Datenpunkten keine Abhängigkeiten vorhanden sind; jeder Punkt kann individuell dem nächsten Cluster zugewiesen werden. Eine Parallelisierung der Berechnung der neuen Schwerpunkte kann ebenso erreicht werden, indem jeder Clusterschwerpunkt unabhängig von den anderen berechnet wird. Erreichen lässt sich dies, indem die Sortier-Fähigkeit des MapReduceFrameworks ausgenutzt und Schlüssel-Wert-Paare derart sortiert werden, dass diese nach ihrem Schlüssel gruppiert werden. Das iterative Vorgehen des k-Means-Algorithmus wird in MapReduce durch mehrere Durchläufe des Prozesses erreicht, eine wiederholte Ausführung der Map-, Combine- und Reduce-Phase, wobei als Eingabe immer die SchlüsselWert-Paare und Cluster-Schwerpunkte der vorherigen Iteration genutzt werden. Ablauf Der Ablauf der MapReduce-Implementierung ist in Abbildung 3.13 dargestellt. Wenn ein Task für einen Datensatz und eine Menge von vorgegebenen Centroiden gestartet wird erfolgt zuerst eine Unterteilung des Datensatzes in Teilstücke. Diese werden von den Mappern zusammen mit den Centroiden als Eingabe verwendet, um darauf parallel für jeden Punkt des Datensatzes zu berechnen, welchem Clusterschwerpunkt dieser am nächsten ist. Diese Zuordnung wird als Schlüssel-Wert-Paar ausgegeben und auf demselben Rechenknoten durch die Combine-Funktion lokal zusammengefasst, quasi die lokalen Clusterschwerpunkte jeder Datenpartition berechnet. Im Reduce-Schritt werden diese Zwischenergebnisse schließlich für jeden Cluster mit der Reduce-Funktion zusammengefasst, um die globalen Clusterschwerpunkte zu erhalten. Analog zum klassichen k-Means- 85 Cluster Centroids Map Combine Reduce Shard 1 Shard 2 Map Combine Result Shard 3 Reduce Map Combine Abbildung 3.13: MapReduce k-Means - Datenfluss Algorithmus können diese neuen Centroide für einen erneuten MapReduce-Durchlauf verwendet werden. Map-Funktion Die Map-Funktion in Algorithmus 3.25 berechnet für jeden Punkt, welches der nächste Cluster-Schwerpunkt ist und ordnet den Punkt diesem zu. Algorithmus 3.25 MapReduce-k-Means: Map-Phase ([ZMH09] angepasst) 1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: input: key - null, value - point object output: key - cluster id, value - point object procedure Map(hkey, valuei) minDistance = Double.MAX VALUE index = −1 for i = 0 to centroids.length do distance = computeDistance(value, centroids[i]) if distance < minDistance then minDistance = distance index = i Output hindex, valuei Für ein eingegebenes Schlüssel-Wert-Paar hnull, pointi, bei dem der Schlüssel nicht be- 86 trachtet wird und der Wert die Repräsentation eines Punktes ist, wird über eine Schleife die Distanz zu jedem Cluster-Schwerpunkt berechnet und der nächste in der Variable index festgehalten (Zeile 4-8). Die so berechnete Cluster-Zuordnung wird nun ausgegeben in Form eines Schlüssel-Wert-Paares hclusterID, pointi (Zeile 9-11). Kombinations-Funktion Die Combine-Funktion (Algorithmus 3.26) führt bereits lokal eine Reduzierung der Schlüssel-Wert-Paare durch, äquivalent zur Reduce-Funktion. Die lokal-generierten Paare werden nach den Cluster-Identifikatoren gruppiert und zusammengefasst. Hierfür wird zum einen ein Array mit so vielen Feldern verwendet wie die Dimensionalität der Daten ist und ein Zähler für die Anzahl der Punkte. Algorithmus 3.26 MapReduce-k-Means: Combine-Phase ([ZMH09] angepasst) input: key - cluster id, value - set of points output: key - cluster id, value - number of cluster points and centroid coordinates procedure Combine(hkey, valuei) init values[dimCount] init pointCount = 0 while value.hasNext() do v = value.next() for i = 0 to v.dimCount do values[i]+ = v[i] 10: pointCount + + 11: for j = 0 to values.length do values[i] 12: values[i] = pointCount 1: 2: 3: 4: 5: 6: 7: 8: 9: 13: Output hkey, pointCount + values.toString()i Für einen Cluster-Identifikator werden jetzt in den jeweiligen Dimensionen die Koordinaten der Punkte aufsummiert (Zeile 6-7) und über einen Zähler mitgeführt, wieviele Punkte aufsummiert worden sind. Abschließend wird jede Dimensionssumme durch die Anzahl der Punkte geteilt um den lokalen Clusterschwerpunkt zu erhalten (Zeile 9-10). Das Ergebnis wird als Paar h clusterID, |Punkte| + localCentroid i ausgegeben. Reduce-Funktion Am Ende des MapReduce-Durchlaufs werden dann durch die Reduce-Funktion (Algorithmus 3.27) die Zwischenergebnisse, die durch die Combine-Funktion bereits lokal reduziert wurden, global zusammengefasst. Als Eingabe erhält die Funktion das Paar 87 hclusterID, set(|points| + localCentroid )i, die, analog zur Combine-Funktion (Algorithmus 3.26) reduziert werden. Algorithmus 3.27 MapReduce-k-Means: Reduce-Phase ([ZMH09] angepasst) input: key - cluster id, value - set of partial centroid calculations output: key - cluster id, value - centroid coordinates procedure Procedure(hkey, valuei) init values[dimCount] init pointCount = 0 while value.hasNext() do v = value.next() for i = 0 to v.dimCount do values[i]+ = v[i] 10: pointCount+ = v.pointCount 11: for j = 0 to values.length do values[i] 12: values[i] = pointCount 1: 2: 3: 4: 5: 6: 7: 8: 9: 13: Output hkey, values.toString()i Jedoch werden nicht Punkte, sondern lokale Cluster-Schwerpunkte koordinatenweise aufsummiert (Zeile 4-8) und anschließend durch die Gesamtzahl der Punkte geteilt (Zeile 9-10). Die Ausgabe der Funktion ist nun das Schlüssel-Wert-Paar h clusterID, centroidCoordinates i, also der neue Schwerpunkt des bearbeiteten Clusters, der für die nächste Iteration verwendet wird. 3.2.6 Weitere Algorithmen Ein weiteres Verfahren zur Berechnung eines approximativen hierarchischen Clusterings auf Grundlage des MapReduce-Paradigmas ist der PARABLE(PArallel RAndompartition Based hierarchicaL clustEring)-Algorithmus [WD11], der in zwei Phasen gegliedert ist. In der ersten Phase wird eine zufällige Partitionierung mit homogener Partitionsgröße des Datensatzes vorgenommen. Auf diesen Partitionen wird parallel mit einen sequentiellen Verfahren ein hierarchisches Clustering berechnet, wobei das Clustering durch ein Dendrogramm repräsentiert wird. In der zweiten Phase wird sequentiell eine Vereinigung der berechneten Dendrogramme zu einem Gesamtergebnis durchgeführt. Diese werden zunächst sequentiell in einem rekursiven Verfahren an einem VorlageDendrogramm ausgerichtet. Anschließend wird das Vorlage-Dendrogramm in Teilbäume zerlegt und in den anderen Dendrogrammen nach Teilbäumen gesucht, die an denen der Vorlage ausgerichtet sind und mit dem Clusteridentifikator der Vorlage versehen. Der PDBSCAN-Algorithmus[XJK99] ist eine weitere Variante des DBSCAN-Algorithmus, welches als Master-Slave-Architektur umgesetzt ist. Zentrale Komponente dieses Ver- 88 fahrens ist die Verwendung eines verteilten R-Baums als räumliche Indexstruktur. Das Verfahren ist in drei Phasen gegliedert. In der ersten Phase wird eine Aufteilung der Eingabedaten in mehrere Partitionen vorgenommen, welche an die verfügbaren Rechenknoten (Slaves) verteilt werden. In der zweiten Phase führen die Rechenknoten parallel das Clustering auf der zugewiesenen Partition mit einer modifizierten Variante des DBSCANAlgorithmus durch. Dieser ist dahingehend verändert, dass er in Randbereichen, die über die Partition herausragen, über den verteilten räumlichen Index im Punkte aus angrenzenden Partitionen abfragen kann und so Punkt-Paare ermittelt, die für eine Vereinigung von Clustern betrachtet werden müssen. Diese wird in der dritten Phase unter Nutzung der Punkte-Paare aus den lokalen Clusterings vom Master ausgeführt. 89 Kapitel 4 Implementierung In diesem Kapitel werden die Implementierungen der MapReduce-Varianten des Partition-Algorithmus sowie des DBSCAN-Algorithmus vorgestellt. Diese sind in der Programmiersprache Java geschrieben und nutzen als Grundlage das MapReduce-Framework der MapReduce-Implementierung Hadoop. In den Versuchen werden zusätzlich der MapReduce-kMeans- sowie Parallel-FP-Growth-Algorithmus untersucht. Für diese werden die Implementierungen des Apache-Projekts Mahout1 genutzt, welches unter anderen Verfahren aus dem Bereich Clustering und Klassifikation für maschinelles Lernen bereitstellt. Diese können unter anderen auf der Hadoop-Laufzeitumgebung ausgeführt werden können. 4.1 MapReduce-Partition Die Implementierung des Algorithmus hält sich an die Beschreibung aus Abschnitt 3.1.3. Die Implementierung setzt sich aus drei MapReduce-Jobs zusammen, die sequentiell ausgeführt werden (Abbildung 4.1). Es ist festzuhalten, dass die Anzahl von Transaktionen, in denen frequent Itemsets gesucht werden sollen, vorab nicht zwangsläufig bekannt ist, insbesondere vor dem Hintergrund, dass sehr große Datenmengen eventuell auf viele Dateien verteilt sind. Aus diesem Grund wird zunächst ein MapReduce-Job ausgeführt, der die gesamten Eingabedaten durchläuft und ausschließlich die Menge der Transaktionen zählt. Im Anschluss beginnt die Berechnung der lokalen frequent Itemsets. Diese wird in einem seperaten Job ausgeführt, wobei die Eingabedaten in disjunkte Teile aufgeteilt werden und für jeden Teil der Eingabe ein Mapper die lokalen frequent Itemsets der Partition berechnet. Diese werden von einem Reducer in eine Menge zusammengefasst und als 1 http://mahout.apache.org/ 90 Daten Transaktionen (Eingabe) MapReduceJobs Datenfluss Prozessfluss Zählung der Transaktionen Generierung der Kandidaten Berechnung des globalen Supports Globale frequent Itemsets (Ausgabe) Kandidaten für global-häufige Itemsets Abbildung 4.1: Ablauf der MapReduce-Implementierung des Partition-Algorithmus Datei im verteilten Dateisystem zwischengespeichert. Anschließend werden in einem dritten Job die Kandidaten für globale frequent Itemsets gegen die gesamte Menge von Transaktionen getestet. Hierfür liest jeder Mapper die Kandidatenmenge ein und einen Teil der Menge von Transaktionen. In diesem Anteil von Transaktionen wird nun für jeden Kandidaten berechnet, wie oft dieser vorkommt und dieses als Schlüssel-Wert-Paar ausgegeben. Der Reducer fasst in diesem Schritt die Vorkommen für jeden Kandidaten zusammen und berechnet, ob der Anteil von notwendigen Vorkommen für jeden Kandidaten erreicht wurde. Kandidaten, die den minimalen Support erreichen, werden ausgegeben und bilden so die Menge der frequent Itemsets. Berechnung der Transaktions-Anzahl Die Berechnung des Umfangs der Transaktionsdaten erfolgt, indem jeder Mapper die Anzahl der gelesenen Zeilen mitzählt und die Summe ausgibt. Ein Reducer sammelt die von den Mappern gelieferten Werte ein, addiert diese, und bildet so die Gesamtsumme der gelesenen Zeilen, respektive Transaktionen. Die Anzahl der Transaktionen wird abschließend im verteilten Dateisystem gespeichert. 91 Berechnung der lokalen frequent Itemsets Ein Mapper beginnt mit dem Einlesen des zugewiesenen Teils der Transaktionsmenge. Während des Einlesens werden die Transaktionen direkt in Listen von TransaktionsIdentifikatoren (TID-Liste) zerlegt, so dass für jedes Element der Transaktionsmenge festgehalten wird, in welcher Transaktion es vorkommt. Die TID-Liste wird als HashMap gespeichert, welches als Schlüssel das Element und als Wert die Liste enthält. Die initiale Menge von lokalen frequent 1-Itemsets lässt sich direkt aus der TID-Liste generieren, indem alle Elemente als 1-Itemsets übernommen werden, deren Vorkommen in Relation zu der Anzahl der Transaktionen des Mappers den minimalen lokalen Support überschreiten. Mit diesen Transaktionen geschehen nun mehrere Dinge. Zum einen werden diese direkt als Schlüssel-Wert-Paare ausgegeben. Zum anderen bilden diese Itemsets die Grundlage, auf der größere Itemsets gebildet werden. Sie werden also zusätzlich in einer Array-List gespeichert. Die Itemsets werden durch Objekt repräsnetiert, welches sowohl das Itemset als auch die TID-Liste in Form eines Arrays enthält. Zur Vereinfachung des Prunings von Itemset-Kandidaten wird zusätzlich die String-Repräsentation des Arrays in einem Hash-Set gespeichert, so dass die Prüfung der häufigen Itemset-Teilmengen durch eine indizierte Suche optimiert wird. Die Kandidatengenerierung erfolgt analog zu Algorithmus 3.2. Zusätzlich werden immer nur die Itemsets der vorherigen Iteration vorgehalten, um den Speicherbedarf zu reduzieren. Das Pruning eines Itemset-Kandidaten gestaltet sich durch die Speicherung der StringRepräsentation aller bisher generierten lokal-häufigen Itemsets relativ einfach. Für einen Kandidaten werden alle Teilmengen berechnet und in dem HashSet gesucht. Sobald eine Teilmenge nicht in dem HashSet enthalten ist, kann der Kandidat direkt verworfen werden. Die Implementierung des Reduce-Tasks ist sehr einfach gehalten, da dieser lediglich die von den Mappern generierten Kandidaten als Eingabe erhält und daraus die Menge der lokalen frequent Itemset-Kandidaten liefert. Diese werden anschließend zur Zwischenspeicherung in das verteilte Dateisystem geschrieben. Berechnung der globalen frequent Itemsets Das initiale Vorgehen der Mapper in der dritten Phase ist identisch zu dem der zweiten Phase. Zunächst wird aus den Transaktionen der zugewiesenen Eingabe eine TID-Liste für jedes Item erstellt, die in Form eines HashSets als Schlüssel das Element und als Wert in Form einer Array-List die Vorkommen in Transaktionen enthält. Zusätzlich wird die Menge der lokalen frequent Itemsets eingelesen. Nun wird für jedes Kandidaten-Itemset der Support berechnet. Dies geschieht, indem das Itemset Item für Item durchlaufen wird 92 und die Schnittmenge der TID-Listen für alle Elemente des Itemsets sukzessive berechnet wird, so dass am Ende für das Itemset die Menge der Transaktionen übrig bleibt, die alle Elemente des Itemsets enthalten. Die Ausgabe erfolgt nun als Schlüssel-Wert-Paar, wobei als Schlüssel das Itemset und als Wert die Anzahl der lokalen Vorkommen ausgegeben werden. Die Reducer fassen abschließend die Ergebnisse der Mapper gemäß Algorithmus 3.6 zusammen. Ausgegeben werden schlussendlich die globalen frequent Itemsets zusammen mit der Anzahl ihrer Vorkommen. 4.2 MapReduce-DBSCAN Die Implementierung des MR-DBSCAN-Algorithmus folgt der Verfahrensbeschreibung in Abschnitt 3.2.3 auf Seite 65. Als zusätzliche Phase ist hier jedoch die Bestimmung eines minimal-einschließenden Rechtecks der zu clusternden Punktmenge eingeführt (Abbildung 4.2). Dieses ist notwendig, da die geometrischen Ausmaße der zu clusternden Punktmenge nicht zwingend bekannt ist. Die Erzeugung einer adaptiven Partitionierung ist jedoch auf die Bekanntheit der Datensatzausmaße angewiesen. Bestimmung eines MERs Die Bestimmung des minimal-einschließenden Rechtecks (MER) für die Eingabedaten erfolgt auf triviale Weise, indem die Mapper den zugewiesenen Teil der Eingabe punktweise durchlaufen und die minimalen und maximalen x- und y-Koordinaten festhalten. Diese wird von jedem Mapper jeweils als Schlüssel-Wert-Paar h null, (x, y) i ausgegeben werden. Ein Reducer verarbeitet diese Zwischenergebnisse nun und bestimmt daraus die linke untere und rechte obere Ecke des einschließenden Rechtecks. Berechnung einer Partitionierung Die Mapper berechnen in der Initialisierung zunächst unabhängig voneinander aus dem MER ein Raster, indem nacheinander in positiver x- und y-Richtung mit Schrittweite ε2 vom Mittelpunkt des minimal einschließenden Rechtecks ausgehend gelaufen wird, bis die erreichte Koordinate der jeweiligen Dimension echt größer als die maximal x-, bzw. y-Koordinate, sind. Die gleiche Zellenzahl kommt auch in negativer Richtung hinzu, so dass die Größe des Rasters [2∗X][2∗Y ] beträgt, wenn X und Y die Anzahl der gezählten Zellen in der jeweiligen Dimension sei. Im Anschluss berechnet jeder Mapper für jeden 2 Die Schrittweite beträgt hier nicht, im Gegensatz zur Beschreibung in 3.2.3, um das Raster nicht zu groß werden zu lassen. 93 Daten Rohdaten (Eingabe) MapReduceJobs Datenfluss MER der Eingabedaten berechnen Prozessfluss Koordinaten des MER Datenpartitionierung Angrenzende Partitionenpaare Partitionierungsprofil Lokales Clustering Ergebnisse der lokalen Clusterings Menge der lokalen Clusteridentifikatoren Erzeuge MergeAbbildung Merge-Abbildung Umbenennung der Daten Daten mit globalen Cluster IDs (Ausgabe) Abbildung 4.2: Ablauf der Implementierung des MR-DBSCAN-Algorithmus 94 Punkt der zugewiesenen Eingabe, in welcher Rasterzelle ein Punkt enthalten ist, und inkrementiert einen Zähler für diese Zelle. Nachdem die Eingabe vollständig gelesen und bearbeitet ist, gibt der Mapper für jede Rasterzelle die Anzahl der enthaltenen Punkte in der Form h ID Rasterzelle, Punktzahl i aus. Ein Reducer sammelt nun die Schlüssel-Wert-Paare der Mapper ein und erzeugt aus diesen analog zu den Mappern ein Raster für den Datensatz, in welchem die Ergebnisse der Mapper zusammengefasst werden und so die Punktverteilung für den ganzen Datensatz repräsentiert wird. Auf Grundlage des Rasters wird eine binäre Unterteilung des minimal-einschließenden Rechtecks, welches das gesamte Raster einschließt, vorgenommen. Hierfür wird eine Rechtecke enthaltende Queue so lange abgearbeitet, bis diese leer ist. Initialisiert wird diese mit dem MER des Rasters. Aus der Queue wird nun das erste Rechteck entnommen und mit Hilfe des Rasters berechnet, wie viele Punkte es enthält. Dieses geschieht, in dem über die Koordinaten beider Objekte die vom Rechteck überdeckten Zellen berechnet und die darin enthaltenen Punktsummen aufsummiert werden. Wenn diese Punktsumme nun die vorgegebenen Kosten überschreitet, so ist das Rechteck in zwei neue Rechtecke zu unterteilen. Hierfür werden parallel zu der x- und y-Achse verlaufende Unterteilungslinien aus den Rechteck-Koordinaten berechnet, die parallel im Abstand ε zueinander achsparallel verlaufen. Für jede dieser Linien wird die Kostendifferenz wie zuvor durch die Bestimmung der überdeckten Rasterzellen für die zwei sich aus der Unterteilung ergebenden Rechtecke berechnet und die Linie gewählt, die zu einer minimalen Kostendifferenz führt; die resultierenden Rechtecke werden zurückgeliefert und in die Queue für eine eventuelle weitere Unterteilung eingefügt. Ein Rechteck, dass die maximalen Kosten für eine Partition nicht überschreitet, stellt eine gültige Partition dar und wird zwischengespeichert. Sobald die Queue keine Rechtecke mehr enthält ist die Partitionierung abgeschlossen und die Partitionsliste wird im verteilten Dateisystem gespeichert. Zusätzlich wird für jede Partition der berechneten Partitionierung die Menge der direkt angrenzenden Partitionen bestimmt und diese als Schlüssel-Wert-Paar ausgegeben. Lokales Clustering Die Mapper lesen zu Beginn zuerst die Partitionsliste ein. Für jeden Punkt der Eingabe wird gegen die Partitionsliste über die Koordinaten der Partition und des Punktes die Partition bestimmt, in der der Punkt enthalten ist, und der Partitions-Identifikator mit dem Punkt als Schlüssel-Wert-Paar ausgegeben. Für die Reduce-Phase werden nun so viele Reduce-Tasks erzeugt, wie es Partitionen gibt; dieses soll gewährleisten, dass das Clustering jeder Partition in einem eigenen Task stattfindet. Ein Reducer liest zunächst die ihm zugeordneten Punkte ein und speichert diese als Clusterobjekte, so dass einem Punkt zusätzlich ein Klassifikationsflag und ein Cluster-Identifikator zugeordnet ist. Sobald die Eingabe vollständig gelesen ist wird aus der Punktmenge ein R-Baum aufgebaut, wofür die Implementierung Sort-Tile-Recursive 95 R-Tree(STRtree) aus der Java Topology Suite(JTS) verwendet wird. Auf diesem wird das Clustering gemäß des modifizierten DBSCAN-Algorithmus (siehe 3.18 auf Seite 73) durchgeführt. Mit den Ergebnissen des Clusterings passieren nun drei Dinge. Zum einen werden die geclusterten Punkte als Schlüssel-Wert-Paare der Form h Punkt, (lokale Cluster-ID, Punkt-Klassifikation) i ausgegeben; der lokale Clusteridentifikator setzt sich zusammen aus einem Identifikator für die Partition und dem numerischen Wert, der sich aus dem Clustering-Prozess ergibt. Zusätzlich schreibt jeder Reducer die Menge aller lokale vorkommenden Clusteridentifikatoren in das Dateisystem. Außerdem wird für jede Partition die Menge der Punkte innerhalb der inneren und äußeren Randbereiche (siehe Abbildung 3.7 auf Seite 71) bestimmt und in das verteilte Dateisystem geschrieben. Berechnung einer Vereinigungsabbildung Die Mapper erhalten als Eingabe einen Teil der Schlüssel-Wert-Paare mit angrenzenden Partitionspaaren, die in der Partitionserzeugung generiert worden sind. Für jedes Paar wird aus dem verteilten Dateisystem für jede Partition die Liste der Kernpunkte im inneren Randbereich und die Liste der Rand- und Kernpunkte im äußeren Randbereich der Partition gelesen. Die Berechnung des Durchschnitts von jeweils zwei Listen erfolgt, indem eine der beiden zu vergleichenden Clusterobjekt-Listen in eine HashMap umgewandelt wird, die als Schlüssel eine String-Repräsentation des Punktes und als Wert das Clusterobjekt besitzt. Anschließend wird die Liste durchlaufen und für jeden Punkt gegen die HashMap getestet, ob die String-Repräsentation in der Map enthalten ist; ist dies der Fall werden beide Clusterobjekte zu einer Ergebnisliste von ClusterobjektPaaren hinzugefügt. Abschließend wird die Liste durchlaufen und Schlüssel-Wert-Paare für jedes Punktpaar ausgegeben. Ein Reducer sammelt nun die Paare der Mapper ein und generiert aus diesen zunächst einen ungerichteten Graphen, für den die Implementierung des Java Universal Network/Graph(JUNG)-Frameworks verwendet wird. Hierfür wird für jedes gelesene Identifikatoren-Paar jeweils ein Knoten sowie eine die beiden Knoten verbindende Kante generiert und in den Graphen eingefügt; der Bezeichner der Knoten ist hierbei der lokale Clusteridentifikator. Sobald alle Identifikatoren-Paare gelesen und in den Graph eingefügt worden sind, werden die Zusammenhangskomponenten des Graphen berechnet; für die Berechnung wird ein WeakComponentClusterer des JUNG-Frameworks genutzt, der eine Menge von berechneten Zusammenhangskomponenten zurückliefert, wobei jede Zusammenhangskomponente wiederum aus einer Menge von Knoten der Komponente besteht. Es wird nun eine HashMap für die Vereinigungsabbildung erstellt, die als Schlüssel den lokalen und als Wert den globale Clusteridentifikator enthält. Zunächst wird ein Zähler für die globalen Clusteridentifikatoren mit eins initialisiert. Nun wird jede Zusammenhangskomponente durchlaufen und für jeden Knoten ein Eintrag mit dem lokalen und dem globalen Clusteridentifikator zu der Map hinzugefügt; nach jeder Komponente wird der Zähler um eins inkrementiert. Anschließend wird nun ebenfalls aus 96 dem Dateisystem für jede Partition die Liste lokaler Clusteridentifikatoren gelesen, aus denen bereits im Graph vorhandene Identifikatoren entfernt werden. Auch hier wird für jeden lokalen Clusteridentifikator ein Eintrag mit dem lokalen und dem globalen Clusteridentifikator in die Map eingefügt und anschließend der Zähler inkrementiert; Punkte, die bereits lokal als Rauschen markiert worden sind (Partitions-Identifikator + ”0”), erhalten global den Identifikator ”0” als Rausch-Marker. Abschließend wird die Map mit der Abbildung serialisiert und im verteilten Dateisystem gespeichert. Umbenennung der Ergebnisse des lokalen Clusterings Die Mapper lesen in der Initialisierung die Vereinigungsabbildung, die im vorherigen Schritt erzeugt wurde. Diese wurde als HashMap angelegt, damit die Suche nach einem globalen Cluster-Identifikator ausgehend von einem lokalen Cluster-Identifikator über einen Index beschleunigt werden kann. Anschließend erhalten sie einen Teil der Ausgabe, die im lokalen Clusterings erzeugt wurde. Mit Hilfe der HashMap wird der globale Identifikator des Clusterobjekts gesucht und im Clusterobjekt aktualisiert. Schließlich wird das modifizierte Schlüssel-Wert-Paar ausgegeben. Die Reducer führen nun die finale Bestimmung der Punktklassifikation für Punkte aus den Überlappungsbereichen der Partitionen durch. Die implementierte Vorgehensweise orientiert sich direkt an 3.21 auf Seite 78 und wird deshalb nicht weiter ausgeführt. 4.2.1 MapReduce-kMeans und Parallel FP-Growth In diesem Abschnitt sollen die Implementierungen der beiden Algorithmen, die aus dem Apache Mahout Projekt stammen, kurz analysiert werden. Es soll betrachtet werden, ob und, wenn ja, welche Unterschiede es zwischen den vorgestellten Beschreibungen in 3.2.5 und 3.1.3 gibt. MapReduce-kMeans Die Implementierung des MapReduce-kMeans-Algorithmus vom Mahout-Projekt entspricht nicht der Beschreibung in [ZMH09], die in Abschnitt 3.2.5 vorgestellt wurde. Im Gegensatz zu der gelieferten Beschreibung besteht die Implementierung lediglich aus einem einzigen Map-Task pro Iteration. In diesem Map-Task werden die eingegeben Punkte mittels eines Cluster-Klassifikators dem nächstgelegenen Cluster-Zentroiden zugeordnet. Die im Laufe der Berechnung aktualisierten Zentroide werden am Ende jeder Iteration ausgegeben und als Eingabe-Zentroide der nächsten Iteration genutzt. 97 Parallel FP-Growth Die Implementierung des Parallel FP-Growth-Algorithmus folgt der Verfahrensbechreibung von [LWZ+ 08], die in Abschnitt 3.1.3 vorgestellt wurde. Die Implementierung ist ebenfalls in drei Phasen aufgeteilt für die Zählung der frequent 1-Itemsets, die Suche nach frequent Itemsets in gruppenabhängigen Shards und die Aggregierung der Teilergebnisse. Es wird ebenfalls die zentrale Datenstruktur zur Speicherung der frequent Itemsets, ein Maximum-Heap mit einer begrenzten Anzahl von Elementen, verwendet. Eine oberflächliche Betrachtung des Quelltextes der Implementierung hat keine Auffälligkeiten zu Tage gefördert. 98 Kapitel 5 Versuche In diesem Kapitel werden vier verschiedene Verfahren auf MapReduce-Basis, jeweils zwei aus dem Bereich der Assoziations- und Clusteranalyse, untersucht, die in Kapitel 3 vorgestellt wurden. Für die Assoziationsanalyse werden hier die Algorithmen MRPartition sowie Parallel FP-Growth, für die Clusteranalyse die Algorithmen MR-kMeans sowie MR-DBSCAN untersucht. Für die Algorithmen zur Assoziationsanalyse wird hierbei zum einen experimentell die Korrektheit des Parallel FP-Growth-Algorithmus untersucht, zum anderen wird für beide Algorithmen die Tauglichkeit am Beispiel von Transaktionsdatenbanken verschiedener Größe untersucht. Für die Algorithmen der Clusteranalyse wird am Beispiel des MR-kMeans-Algorithmus die Auswirkung der Blockgröße der Eingabedaten auf die Laufzeit des Algorithmus betrachtet. Für den MR-DBSCAN-Algorithmus wird experimentell die Auswirkung der Parameterwahl auf die Laufzeit untersucht, insbesondere im Hinsicht auf die sequentiellen Anteile des Algorithmus. Für beide Verfahren wird zusätzlich der Speed-Up betrachtet. Schließlich werden beide Verfahren in einem Versuch mit großen Testdaten direkt gegenübergestellt und verglichen. Für die Versuche wurde ein Hadoop-Cluster auf drei Rechnern aus dem Rechnerpool des Fachgebiets Datenbanken und Informationssysteme aufgesetzt. Hierbei handelt es sich um zwei Rechner mit Intel Core i3-540 2-Kern-Prozessoren mit 3,07 GHz, 4 virtuellen Kernen und 4 GB Arbeitsspeicher (Typ-1) sowie einen Rechner mit einem Intel Core i7-2600 4-Kern Prozessor mit 8 virtuellen Kernen und 16 GB Arbeitsspeicher (Typ-2). Vom ersten Typ werden dem Hadoop-Cluster jeweils 2 virtuelle Prozessorkerne und 3 GB Arbeitsspeicher, vom zweiten Typ 4 virtuelle Kerne sowie 6 GB Arbeitsspeicher zu Verfügung gestellt. Durch diese Ressourcenzuweisung lässt sich durch sukzessives Hinzufügen von Knoten 99 vom Typ 1 und schließlich Typ 2 zum Cluster grob eine Verdopplung der Leistung mit jedem hinzugefügten Knoten darstellen: • Konfiguration 1 (1 Knoten): Typ-1 • Konfiguration 2 (2 Knoten): Typ-1 + Typ-1 • Konfiguration 3 (3 Knoten): Typ-1 + Typ-1 + Typ-2 Zusätzlich ist der Hadoop-Cluster so konfiguriert, dass der Nodemanager jedes Knotens maximal 3 GB Speicher zur Verfügung stellt. Das verteilte Dateisystem wird aus den lokalen Festplatten der Knoten aufgebaut und besitzt eine standardmäßige Blockgröße von 128MB und einem Replikationsfaktor von 3, so dass im Betrieb mit 3 Knoten jeder Block auf jedem Knoten vorliegt. Auf jedem Knoten des Clusters laufen folglich als Dienste ein Namenode für die Bereitstellung von Speicher für das verteilte Dateisystem sowie ein Nodemanager, der die Rechenressourcen des Knotens verwaltet. Die zentralen Komponenten, der ResourceManager für die Verwaltung der Rechenressourcen des Clusters und der Namenode für die Metadatenverwaltung des Clusters, laufen auf dem Typ-2 Knoten. Sofern nicht anders angegeben beträgt die Blockgröße für die Dateien im verteilten Dateisystem 64 MB. Zusätzlich beträgt die Größe für einen Container 1 GB, wovon 768 MB von der JVM, die im Container ausgeführt wird, für den Heap-Speicher verwendet werden können. 5.1 Assoziationsanalyse In den Versuchen mit dem MapReduce-Partition und Parallel FP-Growth-Algorithmus sollen verschiedene Aspekte untersucht werden. Zum Abschluss der Vorstellung des Parallel FP-Growth-Algorithmus in Abschnitt 3.1.3 blieb die Frage offen, ob dieser eine exakte oder ungefähre Lösung liefert. Dies soll in einem ersten Experiment untersucht werden. In zwei weiteren Versuchen soll unabhängig voneinander die Performance beider Algorithmen untersucht werden. Dies geschieht durch Tests der Implementierungen in unterschiedlichen Konfigurationen. Zum Abschluss sollen schließlich beide in einem Test einem Vergleich hinsichtlich der Tauglichkeit antreten. Für die Versuche mit den Verfahren der Assoziationsanalyse werden die folgenden Testdatensätze von Frequent Itemset Mining Implementations(FIMI)1 verwendet, bei denen es sich sowohl maschinell erzeugte als erfasste Realwelt-Daten handelt: • Synthetische Testdaten: 1 http://fimi.ua.ac.be/ 100 – T10I4D100K.dat (3,9 MB) - 100.000 Transaktion, 1000 verschiedene Items, durchschnittliche Transaktionslänge: 10, durchschnittliche Länge der häufigsten Itemsets: 4 – T40I10D100K.dat (15 MB) - 100.000 Transaktionen, 1000 verschiedene Items, durchschnittliche Transaktionslänge: 40, durchschnittliche Länge der häufigsten Itemsets: 10 • Realwelt-Testdaten: – accidents.dat2 (34 MB) - Offizielle Unfallstatistiken des National Institute of Statistics (NIS) der Region Flandern (Belgien), 340.183 Transaktionen, 572 verschiedene Items, durchschnittlich Transaktionslänge: 54 Items – webdocs.dat3 (1,4 GB) - Sammlung von 1,7 Million HTML-Dokumenten aus dem Internet, 1.692.082 Transaktionen, 5.267.656 verschiedene Items, maximale Transaktionslänge: 71.412 Items Bei diesen handelt es sich um einfache Textdateien, in denen jede Zeile einer Transaktion entspricht. 5.1.1 Experimentelle Untersuchung der Korrektheit des Parallel FP-Growth-Algorithmus Am Ende der Beschreibung des Parallel FP-Growth-Algorithmus (Abschnitt 3.1.3, Seite 49) ist die Frage offen geblieben, ob dieser ein exaktes oder approximatives Ergebnis liefert. Dies soll anhand eines kleinen Beispiels untersucht werden. Verwendet wird hierfür die Transaktionsdatenbank in Tabelle 5.1, welche aus dem Artikel über den AprioriAlgorithmus4 aus der englischen Wikipedia stammt. Itemsets {1, 2, 3, 4} {1, 2, 4} {1, 2} {2, 3, 4} {2, 3} {3, 4} {2, 4} Tabelle 5.1: Beispiel-Transaktionsdatenbank 2 http://fimi.ua.ac.be/data/accidents.pdf http://fimi.ua.ac.be/data/webdocs.pdf 4 http://en.wikipedia.org/wiki/Apriori algorithm 3 101 Damit ein Itemset häufig ist wird verlangt, dass dieses in mindestens drei Transaktionen auftritt. Folglich ergeben sich als frequent 1-Itemsets die vier Itemsets aus Tabelle 5.2 mit der entsprechenden Anzahl von Vorkommen. Itemset {1} {2} {3} {4} Anzahl 3 6 4 5 Tabelle 5.2: Häufige 1-Itemsets Es sind somit alle vier vorkommenden 1-Itemsets der Transaktionsdatenbank häufig. Die sich daraus ergebenden Kombinationen von 2-Itemsets sind mit ihrem Support in Tabelle 5.3 dargestellt. Itemset {1, 2} {1, 3} {1, 4} {2, 3} {2, 4} {3, 4} Support 3 1 2 3 4 3 Tabelle 5.3: Häufige 2-Itemsets Von diesen sechs 2-Itemsets sind jedoch nur vier häufig. Das einzige sich daraus ergebende 3-Itemset, welches nicht aufgrund nicht-häufiger Teilmengen verworfen wird, ist in Tabelle 5.4 abgebildet. Da es jedoch nur einen Support von zwei hat wird es verworfen. Itemset {2, 3, 4} Support 2 Tabelle 5.4: 3-Itemset Kandidaten Auf dieser Transaktionsdatenbank werden nun der MapReduce-Partition- sowie der Parallel-FP-Growth-Algorithmus ausgeführt. Hierbei zeigt sich, dass der MR-PartitionAlgorithmus die gleichen häufigen Itemsets der Länge eins und zwei liefert, die in Tabelle 5.2 und 5.3 aufgeführt sind. Die Ausführung des Parallel-FP-Growth liefert keine Ausgabe; die Ursache hierfür ließ sich nicht klären. Aus diesem Grund wird eine theoretische Betrachtung des Algorithmus mit der Transaktionsdatenbank (Tabelle 5.1) als Eingabe vorgenommen. Es wird zunächst festgelegt, dass der Algorithmus vier Gruppen verwendet; alle Items werden folglich durch die HashFunktion verschiedenen Gruppen zugewiesen. 102 Zunächst wird die Menge aller häufigen 1-Itemsets bestimmt und diese nach ihrem Support absteigend sortiert. Diese sind bereits in Tabelle 5.2 aufgelistet und ergeben, absteigend nach der Anzahl der Vorkommen sortiert die Liste {2, 4, 3, 1}. Transaktion {2, 4, 3, 1} {2, 4, 1} {2, 1} {2, 4, 3} {2, 3} {4, 3} {2, 4} G-1 {2, 4, 3, 1} {2, 4, 1} {2, 1} G-2 {2} {2} {2} {2} {2} G-3 {2, 4, 3} G-4 {2, 4} {2, 4} {2, 4, 3} {2, 3} {4, 3} {2, 4} {2} {4} {2, 4} Tabelle 5.5: Gruppenabhängige Shards . Mit dieser Liste wird nun die Transaktionsdatenbank nach Algorithmus 3.8 in vier gruppenabhängige Shards transformiert transformiert; diese ist in Tabelle 5.5 dargestellt. Ein Berechnen der frequent Itemsets für jeden gruppenabhängigen Shard von Hand liefert exakt das gleiche Ergebnis, welches beispielsweise eine Berechnung mit Hilfe des Apriorioder FP-Growth-Algorithmus auf der vollständigen Transaktionsdatenbank ergibt. Dies zeigt, dass der Parallel-FP-Growth zumindestens für dieses kleine Beispiel ein exaktes Ergebniss liefert. Es werden alle vorhandenen frequent Itemsets mit dem exakten Support-Wert gefunden. Ein zweiter Versuch wird mit dem synthetischen Testdatensatz T10I4D100K.dat durchgeführt. Hierfür wird ein relativer Support von 0.0001, was einem absoluten Support von 10 Transaktionen bei 100.000 Transaktionen in der Transaktionsdatenbank entspricht, verwendet. Zusätzlich soll der PFP-Algorithmus die 100 häufigsten Itemsets für jedes Item ausgegeben und die vom Parallel FP-Growth vorgegebene standardmäßige Anzahl von 50 Gruppen verwenden. Partition 411.365 Itemsets Parallel FP-Growth 362 Itemsets Tabelle 5.6: Anzahl gefundener Itemsets Bei den Anzahl der gefundenen Itemsets für diesen Versuch (Tabelle 5.6) ist zunächst erkennbar, dass sich die Anzahl der gefundenen Itemsets stark unterscheidet. Die Menge der vom PFP-Algorithmus erhöht sich auch nicht, wenn man die Anzahl der auszugebenden frequent Itemsets für ein Item massiv erhöht, beispielsweise auf 10.000. Eine Stichprobenartige Prüfung zeigte, dass die gefundenen frequent Itemsets des Parallel FPGrowth-Algorithmus auch in den Ergebnissen des MapReduce-Partition-Algorithmus auftreten. Jedoch stimmten die berechneten Supportwerte beider Ergebnisse nicht überein. 103 Hinsichtlich der Anzahl gefundener Itemsets ist die Vermutung naheliegend, dass auf die Ausgabe der Teilmengen gefundener frequent Itemsets verzichtet wird. Eine tiefergehende Analyse des Quelltextes bestätigt diese Vermutung. Itemsets werden in der Implementierung des Maximum-Heaps, der vom Parallel FP-Growth-Algorithmus verwendet wird, nur in den Maximum-Heap eingefügt, wenn es keine Teilmenge eines bereits vorhandenen Itemsets ist oder aber einen größeren Support als dieses besitzt. Außerdem werden Itemsets, die Teilmenge eines einzufügenden Itemsets sind und einen geringeren oder gleichen Support als dieses besitzen, ersetzt. Abschließend lässt sich als Fazit festhalten, dass der Parallel FP-Growth offensichtlich nicht immer eine exakte Lösung hinsichtlich der Supportwerte liefert. Sofern sich dies im Rahmen dieses Versuchs sagen lässt, beinhaltet die Lösung jedoch sehr wohl die tatsächlichen existierenden frequent Itemsets. 5.1.2 Untersuchung der Performance des Partition-Algorithmus In diesem Versuch soll für die MapReduce-Implementierung des Partition-Algorithmus mit verschiedenen Datensätzen die Leistungsfähigkeit untersucht werden. Hierfür werden die drei Datensätze T10I4D100K.dat, T40I10D100K.dat und accidents.dat verwendet. Aufgrund der geringen Datengröße werden diese jeweils in vier Teile aufgeteilt, damit der Algorithmus mehrere Eingabe-Teile erhält und eine der Menge der Eingabedateien entsprechende Anzahl von Map-Tasks erzeugt, die parallel verarbeitet werden. Genutzt werden alle drei Knoten des Clusters. Getestet wird die Laufzeit des Algorithmus mit verschiedenen Eingabedaten sowie minimalen Support-Werten. minimaler Support 0,01 (1000 Transaktionen) 0,001 (100 Transaktionen) 0,0001 (10 Transaktionen) T10I4D100K 01:16 11:28 Fehler T40I10D100K Fehler Fehler Fehler accidents Fehler Fehler Fehler Tabelle 5.7: Testresultate für den MR-Partition-Algorithmus Die Ergebnisse der Test (Tabelle 5.7) sind sehr ernüchternd. Die Berechnung der frequent Itemsets gelingt nur bei dem kleinsten Testdatensatz, und selbst bei einem sehr kleinen minimalen Support scheitert die Berechnung. Die Ursache hierfür liegt in der Tatsache begründet, das der Algorithmus keine komprimierte Darstellung der Transaktionsdatenbank für die Suche nach frequent Itemsets verwendet. In jeder Iteration ist es erforderlich, dass die gefundenen Itemsets der vorherigen Iteration mit den zugehörigen Transaktionslisten vorgehalten werden, um aus diesen durch die Kombination zweier Itemsets ein neues zu generieren; es ist folglich in jedem Schritt notwendig, die gefundenen frequent Itemsets von zwei Iterationen zusammen mit den TID-Listen zu 104 speichern. Aus dem vorherigen Versuch ist bereits bekannt, dass eine Ausführung des MapReduce-Partition-Algorithmus auf den Datensatz T10I4D100K.dat bei einem minimalen Support von 0,0001 bereits etwa 400.000 frequent Itemsets findet (Tabelle 5.6. Verstärkend kommt in diesen Versuchen noch hinzu, dass durch Aufteilung der Eingabedaten in vier Teile der minimale Support ebenfalls durch vier geteilt wird und somit 0,000025 beträgt, also noch weiter reduziert wird, und folglich zu noch mehr gefundenen Itemsets führt. Somit lässt sich an dieser Stelle bereits festhalten, dass der MapReduce-Partition-Algorithmus für eine Anwendung auf große Transaktionsdatenbanken völlig ungeeignet ist, da der aus der Kandidatengenerierung resultierende Speicherbedarf viel zu groß ist. 5.1.3 Untersuchung der Performance des Parallel FP-GrowthAlgorithmus Für den Parallel FP-Growth-Algorithmus sollen in dieser Versuchsreihe zwei verschiedene Aspekte betrachtet werden. Zum einen soll die Laufzeit des Algorithmus bei einer verschiedenen Anzahl von Rechenknoten ermittelt werden. Zum anderen soll erprobt werden, wie sich die drei Steuerparameter des Algorithmus, die Gruppenzahl, der minimale Support und die Anzahl zu auszugebender frequent Itemsets für jedes Item, auf die Laufzeit auswirken. Knotenzahl Für diesen Versuch wurde der Testdatensatz webdocs.dat verwendet. Als Parameter für den Algorithmus wurde eine Gruppenzahl von 10, ein minimaler Support von 10 sowie die Ausgabe der 500 häufigsten Itemsets gewählt. Die Ergebnisse für die Gesamtlaufzeit (Abbildung 5.1) zeigen, dass der Algorithmus für den verwendeten Datensatz keinen linearen Speed-Up besitzt und dieser beim Übergang von zwei auf drei Knoten sogar noch stärker abflacht. Eine Betrachtung der Laufzeiten der einzelnen Phasen des Algorithmus zeigt, dass das Zählen der häufigen 1-Itemsets die größte Zeit des Algorithmus in Anspruch nimmt. Bereits in diesen ist kein linearer Speed-Up erkennbar. Das Mining von frequent Itemsets aus den gruppenabhängigen Transaktionsdatenbanken zeigt hier jedoch einen näherungsweisen linearen Speed-Up. Der letzte Schritt hingegen, die Aggregierung der Ergebnisse, ist von der Laufzeit her annähernd konstant und verändert sich bei einer veränderten Knotenzahl nicht signifikant. 105 Abbildung 5.1: Laufzeit PFP-Algorithmus (Gruppen: 10, Support: 10, K: 50) Steuerparameter Betrachtet werden soll in diesem Test, wie sich die verschiedenen Steuerparameter des Algorithmus auf die Laufzeit auswirken. Die Tests werden mit allen drei Knoten des Clusters ausgeführt und es kommt der Datensatz webdocs.dat zur Anwendung. Zuerst wird die Laufzeit mit einer verschieden großen Zahl von gruppenabhängigen Transaktionsdatenbanken getestet. Wie in Tabelle 5.8 zu sehen ist, hat die Variation der Anzahl zu bildender Gruppen bei der verwendeten Knotenzahl und Datensatzgröße keinen keine gravierenden Veränderung der Laufzeit zur Folge. Zwar steigt bei einer Anzahl von 5000 Gruppen die Laufzeit für die Zählung der häufigen 1-Itemsets leicht an, da diese Phase des Algorithmus noch nicht von der Anzahl der Gruppen beeinflusst wird ist dieses aller Wahrscheinlichkeit eine Folge des Nichtdeterminismus der Programmausführung auf dem Cluster. Gruppen 100 500 2.500 5.000 Gesamtzeit 03:46 03:29 03:24 03:53 Zählen 03:03 02:46 02:45 03:15 Suchen 00:27 00:25 00:24 00:25 Aggregierung 00:16 00:18 00:15 00:13 Tabelle 5.8: Laufzeit bei variabler Gruppenzahl (minSup = 10, K = 500) In einer weiteren Versuchsreihe, bei der mit unterschiedlichen Parameterausprägungen des minimalen Supports getestet wird, zeigt sich, dass der minimale Support keine 106 Abbildung 5.2: Laufzeiten der einzelnen Teile des PFP-Algorithmus messbaren Einfluss auf die Performance des Verfahrens hat (Tabelle 5.9). Support 10 100 1.000 10.000 Laufzeit 03:37 03:38 03:58 03:41 Zählen 02:48 02:59 03:17 03:01 Suche 00:32 00:24 00:24 00:28 Aggregierung 00:17 00:15 00:17 00:12 Tabelle 5.9: Laufzeit bei unterschiedlichem minimalen Support (100 Gruppen, K = 500) Das gleiche trifft auf die Laufzeiten zu, wenn die Anzahl der häufigen Itemsets, die für ein Item ausgegeben werden sollen, variieren. Wie in Tabelle 5.10 zu sehen ist, gibt es auch hier keinen deutlichen Unterschied zwischen Laufzeiten der einzelnen Versuche. Insgesamt haben die Steuerparameter für den gewähltem Datensatz keine signifikante Auswirkung auf die Laufzeit des Algorithmus. Dieses mag jedoch der dem geringen Umfang des Testdatensatzes geschuldet sein. 107 K 50 250 500 2.500 Laufzeit 03:41 03:26 03:46 03:36 Zählen 02:54 02:42 03:00 02:50 Suchen 00:29 00:26 00:31 00:31 Aggregierung 00:18 00:18 00:15 00:15 Tabelle 5.10: Laufzeit bei einer verschiedenen Anzahl auszugebener frequent Itemsets pro Item (100 Gruppen, minSup = 10) 5.1.4 Performancevergleich zwischen MR-Partitition und PFPGrowth Effektiv ist ein Vergleich der beiden Verfahren auf großen Datenmengen nicht möglich. Die Ausführung des MapPartition-Algorithmus mit dem Datensatz webdocs.dat bricht bereits nach wenigen Minuten wegen Speichermangels ab. Mit einer sequentiellen Implementierung des Partition-Algorithmus wurde ermittelt, wie viele frequent Itemsets in den einzelnen Iterationen entstehen (Tabelle 5.11). Bereits bei sehr kleinen Datenmengen werden unheimlich große Mengen (> 400.000) frequent Itemsets gefunden, wenn ein sehr kleiner minimaler Support (hier 0,0001) gewählt. Itemset-Länge 1 2 3 4 5 6 7 8 9 10 11 12 13 14 Gesamt Laufzeit Anzahl Itemsets 867 110.793 152.805 80.710 39.954 16.172 6.225 2.473 954 318 80 13 1 0 411.365 749 Sekunden Tabelle 5.11: Anzahl gefundener frequent Itemsets für T10I4D100K.dat Eine Suche mit demselben minimalen Support von 0,0001 mit dem Datensatz T40I10D100K.dat wurde nach 7 Stunden und 3 Minuten abgebrochen. Zu diesem Zeitpunkt hatte der Algorithmus bereits 924 Itemsets der Länge 1, 179.740 der Länge 2 und 1.383.971 108 Itemsets der Länge 3 geliefert. Ein Ausführung des Algorithmus ist also generell nur bei einem relativ großen Support möglich. Im Gegensatz dazu steht die Ausführung der Versuche aus Abschnitt 5.1.2, die mit dem MR-Partition-Algorithmus durchgeführt wurden, mit dem Parallel FP-GrowthAlgorithmus. minimaler Support 0,01 (1.000 Transaktionen) 0,001 (100 Transaktionen) 0,0001 (10 Transaktionen) T10I4D100K 00:51 00:44 00:54 T40I10D100K 00:49 00:47 00:49 accidents 00:53 00:54 00:49 Tabelle 5.12: PFP-Algorithmus - Test mit verschiedenen Datensätzen Die Ergebnisse (Tabelle 5.12) zeigen auch, dass die Laufzeit des Algorithmus auch bei diesen Versuchen weitestgehend unabhängig von der Parameterzahl ist und die Datensätze, im Gegensatz zum problemlos verarbeitet werden können. 5.2 Clusteranalyse In den Versuchsreihen mit den beiden Clustering-Algorithen, dem MapReduce-kMeansund MapReduce-DBSCAN-Algorithmus, sollen die Verhaltensweisen der Algorithmen unter verschiedenen Einflüssen getestet werden. Es soll zum einen untersucht werden, wie sich unterschiedliche Blockgrößen der Eingabedaten und Knotenzahlen auf den MapReduce-kMeans-Algorithmus auswirken, für den MapReduce-DBSCAN-Algorithmus sollen das Laufzeitverhalten bei verschiedenen Steuerparameterausprägungen und Knotenzahlen untersucht werden. Abschließend werden beide Algorithmen in einem Versuch gegeneinander getestet. Als Testdaten für das Clustering kommen zwei verschiedene Geodatensätze von OpenStreetMap zum Einsatz. Bei diesen handelt es sich um die Datensätze für Deutschland und Europa, die von der Geofabrik GmbH5 bereitgestellt werden. Aus diesen Datensätzen werden lediglich die ”Road”-Objekte genutzt. Aus dessen Geometrien werden die Punkt-Koordinaten extrahiert und von Duplikaten befreit. Als Format für diese wird das SequenceFileFormat genutzt, so dass als Schlüssel die Zeilennummer und als Wert der Punkt in Form eines zweidimensionalen Vektors verwendet wird. Die Eingabedaten besitzen die folgenden Charakteristika: • DE - Deutschland, 7.949.433 Punkte (248 MB) • EU - Europa, 64.684.856 Punkte (2,3 GB) 5 http://download.geofabrik.de/ 109 5.2.1 Auswirkungen der Blockgröße und Knotenanzahl auf die Laufzeit am Beispiel des MR-kMeans-Algorithmus In diesem Versuch wird untersucht, wie sich eine variable Blockgröße für im HDFS gespeicherten Dateien sowie eine verschiedene Anzahl von Rechenknoten auf die Laufzeit der MapReduce-Implementierung auswirkt. Für diese Betrachtung wird explizit der MRkMeans-Algorithmus genutzt, da er so gut wie keine sequentiellen Komponenten enthält und somit ideal für einen Vergleich verschiedener Knotenzahlen ist. Getestet werden zum einen die Laufzeiten, wenn nur bestimmte Knoten des Clusters verwendet werden sowie die Auswirkungen auf die Laufzeit bei Blockgrößen von 32 MB, 64 MB und 128 MB. Für den Versuch werden die Straßendaten von Europa (Testdatensatz EU) verwendet. Blockgröße 32 MB 64 MB 128 MB 1 Knoten 16:02 min 13:35 min 12:18 min 2 Knoten 06:41 min 04:53 min 05:16 min 3 Knoten 02:39 min 02:21 min 02:23 min Tabelle 5.13: Ergebnisse der Versuche mit dem MR-kMeans-Algorithmus für variable Knotenzahl und Blockgröße Die Ergebnisse in Tabelle 5.13 zeigen hinsichtlich der Blockgröße, dass diese nur bei der Verwendung von einem Rechenknoten gravierende Änderungen bewirken, bzw. bei zwei Rechenknoten nur beim Sprung von 32 MB auf 64 MB eine signifikante Laufzeitverbesserung bewirken. Bei der Verwendung von drei Rechenknoten ist bereits keine Unterscheidung der Laufzeit bei verschiedenen Blockgrößen mehr möglich. Die starken Unterschiede der Laufzeit bei unterschiedlichen Blockgrößen bei einem Knoten erklären sich durch die Tatsache, dass für jeden Teil der Eingabe ein Map-Tasks erzeugt wird. Eine geringe Blockgröße bedeutet folglich, dass eine größere Anzahl von Map-Tasks generiert wird, was dann zu einem größeren Overhead führt für das Starten und Beenden von JVMs, die die Tasks ausführen. In Bezug auf die Laufzeit bei Verwendung einer verschiedenen Anzahl von Rechenknoten lässt sich unabhängig von der Blockgröße festhalten, dass der Speed-Up annähernd linear ist; eine Verdoppelung der Rechenkapazität führt grob zu einer Halbierung der Laufzeit, was anschaulich in Abbildung 5.3 zu sehen ist. 5.2.2 Einfluss der Parameterwahl und Knotenzahl auf den MRDBSCAN-Algorithmus In diesem Abschnitt wird eine experimentelle Untersuchung des MR-DBSCAN-Algorithmus durchgeführt. Hierbei stehen die drei Parameter zur Steuerung der maximalen 110 Abbildung 5.3: Einfluss verschiedener Blockgrößen und Knotenzahlen auf die Laufzeit Partitionsgröße, der Nachbarschaftsgröße sowie des für Kernpunkte notwendigen Nachbarschaftsumfangs im Mittelpunkt. Die Untersuchung wird in drei verschiedene Versuche aufgeteilt. Im ersten Versuch wird der Einfluss der Partitionsgröße auf die Laufzeit des Algorithmus untersucht. Im zweiten Versuch wird betrachtet, welche Auswirkungen die Wahl der Nachbarschaftsgröße sowie die Anzahl der für Kernpunkte notwendigen Punktzahl sich auf die Laufzeit auswirken. Eine Untersuchung des Speed-Up wird in einem dritten Versuch vorgenommen. Hierbei wird bei einer festen Parameterwahl und variabler Knotenzahl untersucht, wie sich eine unterschiedliche Anzahl von Rechenknoten auf die Gesamtlaufzeit auswirkt. Die Versuche werden auf drei Rechenknoten ausgeführt. Map- und Reduce-Tasks stehen 1GB Speicher pro Tasks zur Verfügung, wovon 768MB für den Heap der JVM, die den Task ausführt, bereitgestellt werden. Verwendet wird der Datensatz DE mit den Straßendaten von OpenStreetMap für Deutschland. Partitionsgröße In einem ersten Test mit des MR-DBSCAN-Algorithmus soll die Auswirkung der Partitionsgröße auf die Laufzeit des Algorithmus getestet werden. Der Test wird mit 3 Rechenknoten auf dem DE-Datensatz ausgeführt für minP ts = 350 und ε = 1500, wobei für jeden Task 1GB Speicher bereitgestellt wird, von welchem 768MB als Heap für die ausführende Java-VM verwendet werden. 111 Abbildung 5.4: Laufzeit in den einzelnen Phasen des MR-DBSCAN-Algorithmus für den DE-Datensatz Was auf den ersten Blick in den Ergebnissen aus Abbildung 5.4 ins Auge fällt sind die starken Ausreißer für das Clustering und das Relabeling der Daten für eine maximale Partitionsgröße von 5000 Punkten. Es ist klar, dass bei einer derart geringen Anzahl von Punkten pro Partition eine große Anzahl von Partitionen gebildet wird. Für jede von diesen wird ein Reduce-Tasks erzeugt, so dass die schiere Anzahl von abzuarbeitenden Tasks und dem daraus resultierenden Overhead für die lange Laufzeit verantwortlich sind. Die Ursache für die lange Laufzeit des Relabelings sind der Tatsache geschuldet, dass es durch die große Anzahl von Partitionen eine entsprechende hohe Anzahl von aneinander angrenzenden Paaren von Partitionen gibt. Folglich existiert eine große Anzahl von Punkten in Überlappungsbereichen, so dass für eine größere Anzahl von Punkten mindestens zwei verschiedene Klassifikationen und Clusteridentifikatoren vorliegen, die vereinheitlicht müssen. Zusätzlich ist in den Ergebnissen gut zu sehen, dass bereits ab einer Partitionsgröße von 100.000-250.000 Punkten für den verwendeten Testdatensatz eine Reduzierung der Laufzeit mehr festzustellen ist. Abbildung 5.5 zeigt die Laufzeit der einzelnen Testläufe noch einmal reduziert auf die Gesamtlaufzeit. Es zeigt sich, dass die Laufzeit des Algorithmus für den gewählten Datensatz bis zu einer Partitionsgröße von 250.000 Punkten mit jeder Verdoppelung der Größe etwa um ein Drittel reduziert wird. 112 Abbildung 5.5: Gesamtlaufzeit des MR-DBSCAN-Algorithmus für den DE-Datensatz mit 3 Knoten, minP ts = 50, ε = 250 Anzahl der Rechenknoten In diesem Versuch wird die Laufzeit des MR-DBSCAN-Algorithmus untersucht, wenn eine unterschiedliche Anzahl von Rechenknoten für die Berechnung verwendet. Als Parameter wurden hier eps = 1500 und minP ts = 350 und eine Partitionsgröße von 250.000 Punkten verwendet. Die Betrachtung der Gesamtlaufzeit (Abbildung 5.6) zeigt, das das Verfahren im Rahmen dieses Versuches einen linearen Speed-Up besitzt; eine Verdoppelung der Rechenleistung bedeutet also eine Halbierung der Laufzeit. Eine genauere Betrachtung der Laufzeit (Abbildung 5.7) der einzelnen Phasen des Algorithmus zeigt, dass besonders das Clustering und das Relabeling von einer Erhöhung der Knotenzahl profitieren. Bei diesen führt eine Verdopplung der Rechenleistung hier annähernd zu einer Halbierung der Laufzeit. Wenig Einfluss hat die Knotenzahl hingegen auf die Berechnung des MER des Datensatzes, der Partitionierung und der Vereinigungsabbildung. Für die Berechnung des MERs ist eine signifikante Auswirkung erst zu erwarten, wenn die zu verarbeitende Datenmenge stark ansteigt, so dass das parallele Lesen der Eingabedaten davon profitiert. Ähnliches gilt für die Partitionierung, da die Berechnung 113 Abbildung 5.6: Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Knotenzahl der Teilergebnisse zur Bestimmung eines Rasters mit der Punktdichteverteilung des Datensatzes so auf mehrere Rechenknoten verteilt werden kann. Nicht betroffen hiervon ist jedoch die eigentliche Berechnung der Partitionen, die rein sequentiell auf einem Knoten ausgeführt; hier ist für eine Erhöhung der Knotenzahl keine Laufzeitreduktion zu erwarten. Für das Erzeugung der Vereinigungsabbildung ist nur bei einer großen Menge von Punkten in Überlappungsbereichen mit einer Beschleunigung der Map-Phase zu rechnen, in dessen Folge mehrere Map-Tasks für die Berechnung der Punkt-Paare aus überlappenden Partitionsrändern erzeugt werden. Die Reduce-Phase ist hiervon unbeeinflusst, da die Erzeugung der eigentlichen Abbildung sequentiell in einem Reduce-Task ausgeführt wird. Variation der Nachbarschaftsgröße In diesem Versuch wird untersucht, wie sich der Algorithmus verhält, wenn die Größe der Nachbarschaft variiert. Verwendet wurden 3 Rechenknoten, die Partitionsgröße liegt bei 250.000 Punkten und die Kernpunktklassifikation beträgt 350. Die Betrachtung der Gesamtlaufzeiten in Abbildung 5.8 zeigt recht anschaulich, dass Nachbarschaftsgrößen von weniger als 500 zu einer massiven Verlängerung der Laufzeit führen. Die zentrale Ursache hierfür liegt in der Tatsache, dass der gesamte Datensatz in Raster unterteilt wird, in der jede Rasterzelle die Kantenlänge ε, also die Nachbarschaftsgröße, besitzt. Eine kleine Nachbarschaft bedeutet also zwangsläufig, dass eine Rasterisierung mit vielen Zellen entstehen. Hieraus entstehen gravierende Folgen für die Berechnung der Partitionierung. Dies bedeutet zum einen, dass für die binäre Rechteckaufteilung viele Schnittlinien und folglich viele verschiedene Varianten von Rechteck- 114 Abbildung 5.7: Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler Knotenzahl Unterteilungen in Frage kommen, die zu testen sind. Zum anderen bedeutet dies, dass für die Kostenberechnung der Rechtecke eine große Menge von Zellen betrachtet werden muss, die von den resultierenden Rechtecken überdeckt werden. Festzuhalten ist weiterhin, dass für eine Nachbarschaftsgröße von 1000 oder 2000 in der Gesamtlaufzeit keine Unterschiede vorhanden sind. Eine Betrachtung der aufgeschlüsselten Laufzeiten (Abbildung 5.9) der einzelnen Phasen zeigt jedoch, dass eine Verdoppelung der Nachbarschaftsgröße von 1000 auf 2000 zu einer Verdoppelung der Laufzeit des Clusterings führt. Diese wird jedoch ausgeglichen durch eine starke Reduktion der Laufzeit der Partitionierung, da durch die größere Nachbarschaft weniger Rasterzellen und folglich weniger Schnittlinien entstehen, die für die binäre Rechteckunterteilung betrachtet werden müssen. Gleichzeitig bedeutet eine größere Nachbarschaft auch, dass die Überlappungsbereiche der Partitionen größer werden. In der Folge gibt es mehr Punktpaare in diesen Überlappungsbereichen, die für die Berechnung der Vereinigungsabbildung verarbeitet werden müssen. Dies hat ebenfalls zur Folge, dass für die Umbennung mehr Punkte verarbeitet 115 Abbildung 5.8: Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Nachbarschaftsgröße werden müssen, für die mehr als ein Paar von Klassifikation und Cluster-Identifikator existiert. Variation der Kernpunktklassifikation In diesem Versuch wird untersucht, wie sich die Anzahl der Punkte, die für eine Klassifikation als Kernpunkt notwendig sind, auf die Laufzeit des Algorithmus auswirkt. Ein Blick auf die Gesamtlaufzeit (Abbildung 5.10) zeigt, dass die Kernpunktklassifikation keine messbaren Einfluss auf die Gesamtlaufzeit des Algorithmus hat. Wird jedoch die detaillierte Laufzeit der einzelnen Phasen (Abbildung 5.11) betrachtet, so zeigt sich, dass die Laufzeit des Clusterings bei einer größeren Anzahl notwendiger Kernpunkte etwas ansteigt. Außerdem bedeutet dies auch, das weniger Punkte in den Randbereichen als Kernpunkte klassifiziert werden, so dass sich gleichzeitig die Laufzeit zur Berechnung der Vereinigungsabbildung reduziert, da weniger Punkte in den überlappenden Partitionsbereichen betrachtet werden müssen. 116 Abbildung 5.9: Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler Nachbarschaftsgröße Ein Beispiel für ein Clustering des DE-Datensatzes mit MapReduce-DBSCAN-Algorithmus ist in Abbildung 5.12 dargestellt. Dieses wurde mit einer Kernpunktklassifikation von 350 Punkten, einer Nachbarschaftsgröße von 1500 und einer Partitionsgröße von 250.000 Punkten durchgeführt. 5.2.3 MapReduce-kMeans und MapReduce-DBSCAN im Vergleich In diesem Versuch werden die beiden Cluster-Algorithmen direkt gegenübergestellt. Diese werden auf dem DE und EU Datensatz ausgeführt. Für den MR-kMeans-Algorithmus werden 50 Zentroide zufällig aus der Datenmenge ausgewählt und eine Iteration ausgeführt. Für den MR-DBSCAN-Algorithmus wird eine Nachbarschaftsgröße von 3.000, eine Kernpunktklassifikation von 1.000 und eine Partitionsgröße von 750.000 Punkten verwendet. Die Hadoop-Konfiguration wurde für diesen Versuch dahingehend verändert, dass jedem Container 1,5 GB Speicher zu Verfügung stehen, von denen 1,2 GB von der JVM als Heap genutzt werden können. Die Ergebnisse in Tabelle zeigen recht deutlich, dass der MR-kMeans-Algorithmus eine Iteration bedeutend schneller ausführt als der MR-DBSCAN-Algorithmus. Trotz einer etwa 8-mal größeren Punktmenge steigt die Laufzeit nicht im selben Umfang an. Die Laufzeit des MR-DBSCAN-Algorithmus steigt um etwa das 18-fache. 117 Abbildung 5.10: Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Kernpunktklassifikation MR-DBSCAN MR-kMeans Europa 03:14:17 00:02:12 Deutschland 00:11:15 00:00:47 Tabelle 5.14: Laufzeitvergleich MR-DBCSCAN und MR-kMeans MBR Partitionierung Clustering Merge-Mapping Umbenennung Gesamtzeit DE 18s 20s 9m 7s 30s 1m 11m 15s EU 1m 4s 2h 58m 3s 5m 9s 2m 21s 7m 40s 3h 14m 17s Tabelle 5.15: Einzellaufzeiten des MR-DBSCAN-Algorithmus Eine Betrachtung der Laufzeiten der einzelnen Phasen (Tabelle 5.15) zeigt, dass die Partitionierung für den massiven Laufzeitanstieg verantwortlich ist. Hier treten die selben Effekte auf, die bereits in den Tests des MR-DBSCAN-Algorithmus auf dem DEDatensatz mit verschiedenen Nachbarschaftsgrößen in Abschnitt 5.2.2 beobachtet wurden. In diesem Fall treten die Effekte jedoch auch bei großen Nachbarschaften wegen 118 Abbildung 5.11: Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler Kernpunktklassifikation der großen räumlichen Ausdehnung der Daten auf. Die Ursache hierfür liegt in der Verrasterung des minimal-einschließenden Rechtecks des Datensatzes, welches zu einer sehr großen Anzahl von Rasterzellen mit einer Kantenlänge, die dem Radius der Nachbarschaftsgröße entspricht, führt. Entsprechend groß ist folglich die Anzahl der Zellen, die für jede Kostenberechnung eines Rechtecks während der Partitionierung betrachtet werden muss. Ein interessanter Effekt ist weiterhin, dass die Laufzeit des Clusterings für den DE-Datensatz fast doppelt so lange dauert wie für den EU-Datensatz. Fazit Auf den ersten Blick erscheint es, als sei der MapReduce-kMeans-Algorithmus für das Clustering räumlicher Daten die deutlich bessere Lösung. Nicht vergessen werden darf jedoch, dass in der Regel mehrere Iteration notwendig sind, bis die finalen Clusterschwerpunkte gefunden worden sind. Zusätzlich sind gegebenenfalls mehrere Anwendungen des Verfahrens oder eine Vorabanalyse der Daten notwendig, um eine passende Anzahl sowie Position initialer Clusterschwerpunkte zu bestimmen. Die reine Reduktion auf die Laufzeit einer Iteration ist folglich nicht geeignet, um ein Urteil über die grundsätzliche Tauglichkeit oder Untauglichkeit eines Verfahrens zu treffen, zumal die Wahl des Verfahrens maßgeblich von der Aufgabenstellung bestimmt wird. 119 Abbildung 5.12: Visualisierung des Clusterings für die Versuche in 5.2.2 120 Kapitel 6 Fazit Die Versuche, die in Kapitel 5 durchgeführt wurden, zeigen, dass das MapReduceParadigma nicht für jedes Problem das Mittel der Wahl und nicht jeder Algorithmus geeignet ist. Verfahren wie der Partition-Algorithmus, die bereits bei sequentieller Ausführung mit kleinen Datenmengen Probleme habe, lassen sich auch durch MapReduceImplementierungen nicht effizient nutzen. Grenzwertig sind Verfahren wie der MapReduce-DBSCAN-Algorithmus, welche innerhalb der Prozessausführung sequentielle Phasen durchlaufen. So wird der Algorithmus durch die sequentielle Berechnung einer adaptiven Partitionierung des Datensatzes für Datensätze mit einer großen räumlichen Ausdehnung massiv verlangsamt. Wie bereits in Abschnitt 5.2.3 dargelegt, ist eine gezielte Optimierung von Engpässen notwendig, um eine Ausführung in akzeptabler Zeit zu gewährleisten. Dieses gilt hier insbesondere vor dem Hintergrund, dass bestimmte Problemstellungen möglicherweise einen kleinen Nachbarschaftsradius erfordern; auf einem Datensatz mit großer räumlicher Ausdehnung könnte dies zur Folge, dass eine Berechnung der Partitionierung eventuell gar nicht möglich ist, da der Speicherbedarf für die Repräsentation des Rasters bereits den verfügbaren Speicher überschreitet. Folglich besteht an dieser Stelle Optimierungsbedarf. Vorstellbar ist hierfür eine Entkopplung der Rasterzellengröße vom Radius der Nachbarschaftsanfragen, so dass die Größe der Rasterzellen separat definiert wird. Ganz anders sieht es hingegen beim MapReduce-kMeans- und Parallel FP-GrowthAlgorithmus aus. Diese kommen weitestgehend ohne sequentielle Phasen aus und profitieren somit von der Parallelisierungsmöglichkeiten, die sich aus dem MapReduceParadigma ergibt. Bei den Versuchen hat sich ebenfalls gezeigt, dass die Konfiguration der RessourcenNutzung aufwändig sein kann. Es ist gegebenenfalls eine Justierung der Speicherkonfiguration in Try-and-Error-Manier notwendig, bis die Ausführung einer MapReduceAnwendung gelingt, ohne dass einzelne Tasks wegen Speichermangels von der ausführen- 121 den JVM terminiert werden. Dieses tritt insbesondere verstärkt in Erscheinung, wenn Anwendungsparameter wie die Nachbarschafts- oder Partitionsgröße des MapReduceDBSCAN-Algorithmus starken Einfluss auf den Speicherbedarf der einzelnen Jobs einer Anwendung haben. Zusätzlich hat sich beim Aufsetzen des Hadoop-Clusters das geringe Alter und die hohe momentane Entwicklungsgeschwindigkeit von Hadoop bemerkbar gemacht. Die offizielle Dokumentation für aktuelle Versionen ist teilweise sehr dürftig, während sich inoffizielle Dokumentation beispielsweise in Blogs oftmals auf veraltete Versionen bezieht nicht ohne weiteres auf neue Versionen übertragbar. Zusammenfassend lässt sich sagen, dass das MapReduce-Paradigma eine sehr gute Grundlage für die Parallelisierung von Data Mining-Algorithmen ist. Dieses liegt insbesondere in der Tatsache begründet, dass für die Implementierung von Verfahren keinerlei Kenntnisse über parallele Softwareentwicklung vorhanden sein muss und somit während der Entwicklung der Aufwand zur Fehlersuche von beispielsweise Race-Conditions und ähnlichen Problemen, die in parallelen Programmen auftreten können, wegfällt. Zusätzlich ist die Erweiterung eines Hadoop-Clusters um zusätzliche Rechenknoten sehr einfach; es muss im wesentlichen nur Hadoop und die Knotenkonfiguration auf eine neue Maschine kopiert werden. Im Gegenzug sind natürlich tiefgehende Kenntnisse über die Funktionsweise von Hadoop, bzw. der jeweils verwendeten MapReduce-Implementierung, notwendig, um effiziente Programme zu schreiben, die alle Möglichkeiten zur Parallelisierung ausreizen. Zwar ist das MapReduce-Paradigma und die zugehörigen Implementierungen noch verhältnismäßig jung, jedoch ist zu erwarten, dass dieses in Zukunft einen festen Platz einnehmen wird, wenn es um massiv-parallele Datenanalyse mittels Data Mining-Algorithmen geht. 122 Abbildungsverzeichnis 2.1 Der KDD-Prozess . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 10 2.2 Beispiel eines Frequent-Pattern Baums . . . . . . . . . . . . . . . . . . . 17 2.3 Konstruktion eines FP-Baumes . . . . . . . . . . . . . . . . . . . . . . . 19 2.4 Beispiel für die Generierung von Itemsets aus einem FP-Baum . . . . . . 21 2.5 Hierarchisches Clustering - Beispiel . . . . . . . . . . . . . . . . . . . . . 23 2.6 beispielhafte Darstellung des k-Means Algorithmus . . . . . . . . . . . . 25 2.7 -Nachbarschaft . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 26 2.8 DBSCAN - Beispiel . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 29 2.9 MapReduce-Beispiel . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 34 2.10 Der MapReduce-Prozess . . . . . . . . . . . . . . . . . . . . . . . . . . . 35 2.11 Ablauf der Ausführung einer MapReduce-Anwendung mit dem YARNFramework [Whi12] . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 39 3.1 Ablaufdiagramm des PFP-Algorithmus (nach [LWZ+ 08]) . . . . . . . . . 51 3.2 Die 90-10-Regel im hierarchischen Clustering . . . . . . . . . . . . . . . . 56 3.3 POP-Algorithmus - Überlappende Partitionen . . . . . . . . . . . . . . . 57 3.4 Beispielhafte Abbildung der Phasen des DSDBSCAN-Algorithmus (nach [PPA+ 12]) . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 62 3.5 Ablauf des MR-DBSCAN-Algorithmus [HTL+ 14] . . . . . . . . . . . . . 66 3.6 Beispiel für kostenbasierte Partitionierung [HTL+ 14] . . . . . . . . . . . 69 3.7 Beispiel einer Partitionierung mit ε-Randbereichen [HTL+ 14] . . . . . . . 71 3.8 Beispiel einer minimal vollständigen Partitionierungsmenge [HTL+ 14] . . 72 3.9 Beispiel eines Vereinigungs-Mappings . . . . . . . . . . . . . . . . . . . . 77 3.10 Graph-Repräsentation des Vereinigungs-Mappings . . . . . . . . . . . . . 78 123 3.11 Beispiel für den Ablauf der Vereinigung sechs lokaler Dendrogramme zu einem globalen Dendrogramm (nach [HPA+ 12]) . . . . . . . . . . . . . . 80 3.12 SHRINK-Algorithmus - Beispiel . . . . . . . . . . . . . . . . . . . . . . . 84 3.13 MapReduce k-Means - Datenfluss . . . . . . . . . . . . . . . . . . . . . . 85 4.1 Ablauf der MapReduce-Implementierung des Partition-Algorithmus . . . 91 4.2 Ablauf der Implementierung des MR-DBSCAN-Algorithmus . . . . . . . 94 5.1 Laufzeit PFP-Algorithmus (Gruppen: 10, Support: 10, K: 50) 5.2 Laufzeiten der einzelnen Teile des PFP-Algorithmus . . . . . . . . . . . . 107 5.3 Einfluss verschiedener Blockgrößen und Knotenzahlen auf die Laufzeit . . 111 5.4 Laufzeit in den einzelnen Phasen des MR-DBSCAN-Algorithmus für den DE-Datensatz . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 112 5.5 Gesamtlaufzeit des MR-DBSCAN-Algorithmus für den DE-Datensatz mit 3 Knoten, minP ts = 50, ε = 250 . . . . . . . . . . . . . . . . . . . . . . 113 5.6 Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Knotenzahl . 114 5.7 Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler Knotenzahl . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 115 5.8 Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Nachbarschaftsgröße . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 116 5.9 Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler Nachbarschaftsgröße . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 117 . . . . . . 106 5.10 Gesamtlaufzeit des MR-DBSCAN-Algorithmus bei variabler Kernpunktklassifikation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 118 5.11 Aufgeschlüsselte Laufzeiten des MR-DBSCAN-Algorithmus bei variabler Kernpunktklassifikation . . . . . . . . . . . . . . . . . . . . . . . . . . . . 119 5.12 Visualisierung des Clusterings für die Versuche in 5.2.2 . . . . . . . . . . 120 124 Tabellenverzeichnis 3.1 Übersicht und Notation der verschiedenen Itemset-Mengen . . . . . . . . 44 5.1 Beispiel-Transaktionsdatenbank . . . . . . . . . . . . . . . . . . . . . . . 101 5.2 Häufige 1-Itemsets . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 102 5.3 Häufige 2-Itemsets . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 102 5.4 3-Itemset Kandidaten . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 102 5.5 Gruppenabhängige Shards . . . . . . . . . . . . . . . . . . . . . . . . . . 103 5.6 Anzahl gefundener Itemsets . . . . . . . . . . . . . . . . . . . . . . . . . 103 5.7 Testresultate für den MR-Partition-Algorithmus . . . . . . . . . . . . . . 104 5.8 Laufzeit bei variabler Gruppenzahl (minSup = 10, K = 500) . . . . . . . 106 5.9 Laufzeit bei unterschiedlichem minimalen Support (100 Gruppen, K = 500)107 5.10 Laufzeit bei einer verschiedenen Anzahl auszugebener frequent Itemsets pro Item (100 Gruppen, minSup = 10) . . . . . . . . . . . . . . . . . . . 108 5.11 Anzahl gefundener frequent Itemsets für T10I4D100K.dat . . . . . . . . . 108 5.12 PFP-Algorithmus - Test mit verschiedenen Datensätzen . . . . . . . . . . 109 5.13 Ergebnisse der Versuche mit dem MR-kMeans-Algorithmus für variable Knotenzahl und Blockgröße . . . . . . . . . . . . . . . . . . . . . . . . . 110 5.14 Laufzeitvergleich MR-DBCSCAN und MR-kMeans . . . . . . . . . . . . 118 5.15 Einzellaufzeiten des MR-DBSCAN-Algorithmus . . . . . . . . . . . . . . 118 125 Literaturverzeichnis [AHU83] A. V. Aho, J. E. Hopcroft, J. D. Ullman. Data Structures And Algorithms. Addison-Wesley Publishing Company, 1983. ISBN 0-201-00023-7. [AS94] R. Agrawal, R. Srikant. Fast Algorithms for Mining Association Rules in Large Databases. In J. B. Bocca, M. Jarke, C. Zaniolo, Hg., Proc. VLDB. Morgan Kaufmann, 1994. ISBN 1-55860-153-8, 487–499. [BCC10] E. Baralis, T. Cerquitelli, S. Chiusano. A persistent HY-Tree to efficiently support itemset mining on large datasets. In S. Y. Shin, S. Ossowski, M. Schumacher, M. J. Palakal, C.-C. Hung, Hg., SAC. ACM, 2010. ISBN 978-1-60558-639-7, 1060–1064. [BCCG13] E. Baralis, T. Cerquitelli, S. Chiusano, A. Grand. P-Mine: Parallel itemset mining on large datasets. In C. Y. Chan, J. Lu, K. Nørvåg, E. Tanin, Hg., ICDE Workshops. IEEE Computer Society, 2013. ISBN 978-1-4673-5303-8, 266–271. [Bre13] J. Brehm. Parallelrechner. Vorlesungsfolien, 2013. [DG04] J. Dean, S. Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. In Proc. OSDI. USENIX Association, 2004, 137–150. [DL01] M. Dash, H. Liu. Efficient Hierarchical Clustering Algorithms Using Partially Overlapping Partitions. In D. W.-L. Cheung, G. J. Williams, Q. Li, Hg., PAKDD, Bd. 2035 von Lecture Notes in Computer Science. Springer, 2001. ISBN 3-540-41910-1, 495–506. [DPS07] M. Dash, S. Petrutiu, P. Scheuermann. pPOP: Fast yet accurate parallel hierarchical clustering using partitioning. Data Knowl. Eng., 61(3), 2007, 563–578. [EKSX96] M. Ester, H.-P. Kriegel, J. Sander, X. Xu. A Density-Based Algorithm for Discovering Clusters in Large Spatial Databases with Noise. In E. Simoudis, J. Han, U. M. Fayyad, Hg., Proc. KDD. AAAI Press, 1996. ISBN 1-57735004-9, 226–231. 126 [ES00] M. Ester, J. Sander. Knowledge Discovery in Databases. Springer-Verlag, 2000. ISBN 3-540-67328-8. [HKP11] J. Han, M. Kamber, J. Pei. Data Mining - Concepts and Techniques. Morgan Kaufmann, 2011. [HPA+ 12] W. Hendrix, M. M. A. Patwary, A. Agrawal, W. Liao, A. N. Choudhary. Parallel hierarchical clustering on shared memory platforms. In Proc. HiPC. IEEE, 2012. ISBN 978-1-4673-2372-7, 1–9. [HPY00] J. Han, J. Pei, Y. Yin. Mining Frequent Patterns without Candidate Generation. In W. Chen, J. F. Naughton, P. A. Bernstein, Hg., SIGMOD Conference. ACM, 2000. ISBN 1-58113-218-2, 1–12. [HTL+ 14] Y. He, H. Tan, W. Luo, S. Feng, J. Fan. MR-DBSCAN: a scalable MapReduce-based DBSCAN algorithm for heavily skewed data. Frontiers of Computer Science, 8(1), 2014, 83–99. [KPP03] W. A. Kosters, W. Pijls, V. Popova. Complexity Analysis of Depth First and FP-Growth Implementations of APRIORI. In P. Perner, A. Rosenfeld, Hg., MLDM, Bd. 2734 von Lecture Notes in Computer Science. Springer, 2003. ISBN 3-540-40504-6, 284–292. [LWZ+ 08] H. Li, Y. Wang, D. Zhang, M. Zhang, E. Y. Chang. Pfp: parallel fp-growth for query recommendation. In P. Pu, D. G. Bridge, B. Mobasher, F. Ricci, Hg., Proc. RecSys. ACM, 2008. ISBN 978-1-60558-093-7, 107–114. [Mei13] A. Meier. Efficient Algorithms. Vorlesungsskript, 2013. [OH11] R. O. Obe, L. S. Hsu. PostGIS In Action. Manning Publications Co., 2011. [PPA+ 12] M. A. Patwary, D. Palsetia, A. Agrawal, W.-k. Liao, F. Manne, A. Choudhary. A New Scalable Parallel DBSCAN Algorithm Using the Disjoint-set Data Structure. In In Proceedings of the International Conference on High Performance Computing, Networking, Storage and Analysis, SC ’12. IEEE Computer Society Press, Los Alamitos, CA, USA, 2012. ISBN 978-1-46730804-5, 62:1–62:11. [RDFU12] M. Riondato, J. A. DeBrabant, R. Fonseca, E. Upfal. PARMA: A Parallel Randomized Algorithm for Approximate Association Rules Mining in MapReduce. In Proceedings of the 21st ACM International Conference on Information and Knowledge Management, CIKM ’12. ACM, New York, NY, USA, 2012. ISBN 978-1-4503-1156-4, 85–94. [RU11] A. Rajaraman, J. D. Ullman. Mining of Massive Datasets. Cambridge University Press, 2011. 127 [Sib73] R. Sibson. SLINK: An Optimally Efficient Algorithm for the Single-Link Cluster Method. Comput. J., 16(1), 1973, 30–34. [SON95] A. Savasere, E. Omiecinski, S. B. Navathe. An Efficient Algorithm for Mining Association Rules in Large Databases. In U. Dayal, P. M. D. Gray, S. Nishio, Hg., Proc. VLDB. Morgan Kaufmann, 1995. ISBN 1-55860-379-4, 432–444. [TSK05] P. Tan, M. Steinbach, V. Kumar. Introduction to Data Mining. AddisonWesley, 2005. [WD11] S. Wang, H. Dutta. PARABLE: A PArallel RAndom-partition Based HierarchicaL ClustEring Algorithm for the MapReduce Framework. Techn. Ber., Columbia University, 2011. [Whi12] T. White. Hadoop - The Definitive Guide. O’Reilly, 2012. [XJK99] X. Xu, J. Jäger, H.-P. Kriegel. A Fast Parallel Clustering Algorithm for Large Spatial Databases. Data Min. Knowl. Discov., 3(3), 1999, 263–290. [ZEHL01] O. R. Zaı̈ane, M. El-Hajj, P. Lu. Fast Parallel Association Rule Mining without Candidacy Generation. In N. Cercone, T. Y. Lin, X. Wu, Hg., ICDM. IEEE Computer Society, 2001. ISBN 0-7695-1119-8, 665–668. [ZMH09] W. Zhao, H. Ma, Q. He. Parallel K-Means Clustering Based on MapReduce. In Proceedings of the 1st International Conference on Cloud Computing, CloudCom ’09. Springer-Verlag, Berlin, Heidelberg, 2009. ISBN 978-3-64210664-4, 674–679. 128 Erklärung Hiermit versichere ich, dass ich die vorliegende Arbeit und die zugehörige Implementierung selbstständig verfasst und dabei nur die angegebeben Quellen und Hilfsmittel verwendet habe. Hannover, 24.04.2014 Oliver Pabst 129