Towards Haskell in the Cloud - AG Programmiersprachen und

Werbung
Arbeitsgruppe Programmiersprachen und Übersetzerkonstruktion
Institut für Informatik
Christian-Albrechts-Universität zu Kiel
Seminararbeit
Towards Haskell in the Cloud
Lennart Spitzner
WS 2012/2013
Inhaltsverzeichnis
1. Einführung
2
2. Erlang
2.1. Beispielprogramm in Erlang . . . . . . . . . . . . . . . . . . . . . . . . . .
2.2. Knoten . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
2.3. Robuste Programmierung . . . . . . . . . . . . . . . . . . . . . . . . . . .
4
5
6
7
3. Herausforderungen
3.1. Vorausschau . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
8
9
4. Das Interface von Cloud Haskell
4.1. Knoten und Prozesse . . . . . . . . . . . . . . . . . . . . . . . . .
4.2. Das Starten von Prozessen . . . . . . . . . . . . . . . . . . . . . .
4.2.1. runProcess . . . . . . . . . . . . . . . . . . . . . . . . . .
4.2.2. spawn . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
4.3. Robuste Programmierung . . . . . . . . . . . . . . . . . . . . . .
4.4. Ungetypte Nachrichten . . . . . . . . . . . . . . . . . . . . . . . .
4.5. Matchen von Nachrichten - feiner gesteuertes Empfangen . . . . .
4.5.1. Gleichzeitiges Empfangen mehrere Typen von Nachrichten
4.5.2. Bedingtes Empfangen von Nachrichten eines Typs . . . .
4.5.3. Empfangen mit Timeout . . . . . . . . . . . . . . . . . . .
4.6. Getypte Channel . . . . . . . . . . . . . . . . . . . . . . . . . . .
5. Closures
5.1. Static . . . . . . . .
5.2. Closure . . . . . . .
5.3. Das Wörterbuch . .
5.4. Die Template-Haskell
. . . . . . . . .
. . . . . . . . .
. . . . . . . . .
Hilfsfunktionen
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
10
10
11
11
11
12
13
13
14
14
14
15
.
.
.
.
16
17
18
19
19
6. Beispielprogramm nun in Haskell
21
7. Performanz
24
8. Zusammenfassung
25
A. Übersicht über das Cloud Haskell Interface
26
B. Beispiel für Senden und Empfangen
28
ii
Inhaltsverzeichnis
C. Beispiel für Closures
30
D. Ein etwas komplexeres Beispiel
32
iii
Vorbemerkung
In dem Paper, das dieser Arbeit zugrunde liegt [EBPJ11], wird das vorgestellte Framework mit „Cloud Haskell“ bezeichnet. Die Entwicklung des Projekts ist seit Erscheinen
des Papers vorangeschritten: unter anderem hat sich die Bezeichnung des Repositories
geändert zu „distributed-process“. Zitat der Projekt-Readme: „This repository holds an
implementation of Cloud Haskell“. Dieser Begriff wird nun also als Name des Gesamtkonzeptes (abgegrenzt gegenüber konkreten Implementierungen) verwendet. Ich werde in
dieser Arbeit - parallel zum Paper - den Begriff allgemein verwenden.
Die entsprechenden aktuellen Links sind:
1. die Frontpage:
http://haskell-distributed.github.com/
2. die Repositories:
https://github.com/haskell-distributed/distributed-process/
https://github.com/haskell-distributed/distributed-static/
https://github.com/haskell-distributed/distributed-process-simplelocalnet/
3. und die Hackage-Pakete:
http://hackage.haskell.org/package/distributed-process
http://hackage.haskell.org/package/distributed-static
http://hackage.haskell.org/package/distributed-process-simplelocalnet
Die genauen Versionen der Pakete, mit denen ich die Beispiele erstellt habe, sind:
• distributed-process: 0.4.1
• distributed-static: 0.2.1.1
• distributed-process-simplelocalnet: 0.2.0.8
Man beachte, dass diese teilweise schon nicht mehr die neuesten Versionen sind.
1
1. Einführung
Cloud Haskell hat das Ziel, für Haskell ein hochsprachliches Interface für die Programmierung verteilter Systeme bereitzustellen. Mit einem verteilten System ist hier eine große
Anzahl von Prozessoren gemeint, welche keinen Arbeitsspeicher teilen und welche durch
ein Netzwerk verbunden sind.
Es soll zum Beispiel möglich sein, ein Programm zu implementieren, welches von einem
Rechner aus eine rechenintensive Aufgabe löst und dabei Teilaufgaben an andere Rechner
verteilt, welche parallel bearbeitet werden. Hochsprachlich bedeutet unter Anderem, dass
die Kommunikation zwischen den Systemen typsicher (in Haskells Typsystem) ist und der
Programmierer sich nicht etwa auf die Ebene konkreter TCP-Verbindungen herabbegeben
muss.
Das Programmiermodell von Cloud Haskell lehnt sich stark an die Programmiersprache
Erlang [AVWW93] an. Dies bedeutet, dass Kommunikation zwischen den Komponenten
nicht über irgendeine Form von geteilten Speicher stattfinden, sondern alleinig durch
das (explizite) Senden von Nachrichten an andere Komponenten. Des Weiteren existiert
keine implizite Abhängigkeit zwischen den Komponenten, d.h. falls auf einem System ein
Fehler auftritt, beeinflusst dies nicht direkt andere Systeme1 .
Nach [EBPJ11] ist ein System mit geteilten Speicher ungeeignet für Cloud-Programmierung. Bezüglich der Performanz sei der Hauptkostenpunkt die Datenübertragung welche folglich explizit sein sollte. Dies ist der Grund dafür, dass Cloud Haskell das
Konzept der Nachrichtenübertragung als alleiniges Mittel der Kommunikation zwischen
Prozessen übernimmt. Vorläufer zu diesem Aspekt sind Erlang für zuverlässige Echtzeitanwendungen und MPI [GLS94] für Hochperformanz-Berechnungen.
Cloud Haskell ist implementiert als eine DSL für Haskell2 . Das zentrale Problem, das
bei der Implementierung dieser Art von verteilter Programmierung in Haskell auftritt, ist
die Delegierung von Code (Funktionsaufrufen) an entfernte Systeme. Da sich Funktionen
in Haskell nativ nicht serialisieren lassen, ist es einer der zentralen Errungenschaften
von Cloud Haskell, eine Möglichkeit dafür aufzuzeigen und zu implementieren. Ohne den
Compiler von Haskell zu verändern wird ein Interface bereitgestellt, das in der Bedienung
durch den Programmierer sicher und angenehm ist.
Eine andere wichtige Eigenschaft verteilter Systeme sollte es sein, Fehler in den Teilkomponenten zu verkraften. Denn sogar in dem (unwahrscheinlichen) Fall, dass das Programm frei von jeglichen Bugs ist, sind bei einer genügenden Anzahl von beteiligten
1
Aber das aktive verteilte Programm wird natürlich generell auf den Ausfall eines Systems reagieren
müssen.
2
Genauer gesagt eine embedded DSL, also eine DSL, die Haskell selbst als Umgebung nutzt, um dem
Programmierer Abstraktionen bereitzustellen.
2
1. Einführung
Knoten Hardwareprobleme unvermeidbar. Auch in diesem Punkt wird Erlangs bewährtes Konzept übernommen, siehe dazu den Abschnitt 4.3.
Da sich der Aufbau von Cloud Haskell sehr stark an Erlang anlehnt, möchte ich diese
Sprache und ihre Konstrukte zur nebenläufigen Programmierung im Folgenden beschreiben, um dann eine Abgrenzung zu Haskell machen zu können und um darzustellen, welche
Herausforderungen sich ergeben. Danach werde ich auf Cloud Haskell selbst eingehen.
3
2. Erlang
Zunächst einige Stichpunkte zur Sprache:
Erlang ..
• ist eine funktionale Sprache mit strikter Auswertung
• ist schwach/dynamisch typisiert
• hat garbage-collection
• übernimmt syntaktisch viel von Prolog
• ist aber in keiner Weise eine logische Programmiersprache
Ich werde hier nicht die gesamte Syntax von Erlang versuchen einzuführen; aber als
kleines Codebeispiel die Fakultätsfunktion einmal in Haskell und daneben in Erlang:
Haskell
fac :: Int -> Int
fac 0 = 1
fac n = n * fac (n -1)
Erlang
% dynamisch getypt
fac (0) -> 1;
fac ( N ) -> N * fac (N -1).
Das Kernkonstrukt für nebenläufige Programmierung in Erlang sind Prozesse. Diese
sind in Erlang leichtgewichtig, was unter Anderem durch einen eigenen Scheduler ermöglicht wird. Prozesse bestehen aus einem Term, der gerade ausgewertet wird (man könnte
auch Stack sagen), aus einem Prozess-Identifier und einer Mailbox.
Man kann ohne Probleme Tausende starten. Die Syntax1 dafür ist
spawn ( Modulname , Funktionsname , ListeDerParameter )
welche als Rückgabewert einen Prozess-Identifier liefert. Um also beispielsweise einen
sequentiellen Aufruf
example1 . worker ( fac , 5 , OwnPID , worker1 )
als eigenen Prozess auszuführen, müsste man Folgendes schreiben:
OtherPid = spawn ( example1 , worker , [ fac , 5 , OwnPID , worker1 ])
Identifier und Mailbox haben beide den Zweck, die Kommunikation zwischen Prozessen
zu ermöglichen: Der Prozess-Identifier ist eindeutig einem Prozess zuzuordnen und dient
(vorwiegend) als Adresse beim Senden von Nachrichten. Diesem Zweck ist in Erlang ein
eigener Operator gewidmet, !, „bang“ genannt. Auf dessen linker Seite steht die Adresse
(eine Prozess-Identifier), auf der rechten Seite die Nachricht (ein beliebiger Wert):
1
Es gibt eigentlich mehrere Varianten der spawn-Funktion, aber dies ist die allgemeinste.
4
2. Erlang
OtherPid ! 42
Die Mailbox enthält die empfangenen Nachrichten und kann nur genau von dem Prozess
ausgelesen werden, dem sie zugeordnet ist. Es gibt also keinen „Prozess-freien“ Mailboxen.
Das entsprechende Konstrukt ist das receive:
receive
Pattern1 [ when GuardSeq1 ] -> Body1 ;
...;
PatternN [ when GuardSeqN ] -> BodyN
end
Es können also Werte in der Mailbox durch Pattern-Matching (und Guards) gesucht und
ausgewählt werden; bei Erfolg wird dann der entsprechende Rumpf ausgewertet.
Die Kommunikation ist N-zu-1, da verschiedene (beliebige2 ) Prozesse an die gleiche
Mailbox senden, aber nur genau ein Prozess diese ausliest.
2.1. Beispielprogramm in Erlang
Das folgende Programm soll zeigen, wie in Erlang Berechnungen nebenläufig ausgeführt
werden können. Dazu werden mit spawn Prozesse gestartet, welche das Ergebnis der
Berechnung an den aufrufenden Prozess per Nachricht melden.
- module ( example1 ).
- export ( [ worker /4 , master /0 ] ).
% worker ( to be started in its on process ) that takes as
% parameters a function , a parameter , a processID and
% an identifier for the worker , so the master can
% identify our result .
% It applies the function to the parameter and sends
% the result to the other process .
worker (F , X , ResultPID , WorkerID ) ->
Result = F ( X ) ,
ResultPID ! { WorkerID , Result } .
% factorial
fac (0) -> 1;
fac ( N ) -> N * fac (N -1).
% master that starts two workers to calculate two factorials .
master () ->
% use erlang ’s self function to get the ID of the current
% process , so that the workers have an address when sending
2
mit der Einschränkung, dass der Prozess-Identifier bekannt ist
5
2. Erlang
% the result .
OwnPID = self () ,
% start a new process to calculate fac (5)
spawn ( example1 , worker , [ fac , 5 , OwnPID , worker1 ] ) ,
% and another process to calculate fac (3)
spawn ( example1 , worker , [ fac , 3 , OwnPID , worker2 ] ) ,
% sequentially , receive ( from our mailbox ) the two messages
% the workers ( will ) send us .
receive
% worker1 is not a variable : it is an atom . This
% pattern matches only on messages where the first
% element is worker1 , i . e .: the result of fac (5).
{ worker1 , Result1 } ->
receive
% same as above : matches on the result of fac (3).
{ worker2 , Result2 } ->
% we got two results , print them .
putStrLn ( show ( Result1 ) ++ " ␣ " ++ show ( Result2 ))
end
end .
Was zeigt dieses Beispiel?
• Es ist keine explizite Synchronisation nötig: Die Mailbox ist threadsafe. Es sei des
Weiteren angemerkt, dass mit dem gegebenen Pattern-Matching es kein Problem
ist, falls der zweite Worker vor dem ersten seine Nachricht sendet - Nachrichten
müssen nicht „in order“ aus der Mailbox genommen werden3 .
• Parallel zu der leichtgewichtigen Implementierung für Prozesse ist auch der syntaktische Aufwand für das Erzeugen von Prozessen und das Senden und Empfangen
von Nachrichten gering.
• Es ist möglich, Funktionen als Parameter an andere Prozesse zu senden. Dies mag
hier noch wenig überraschen; allerdings wäre es durchaus auch möglich, die Worker
auf fremden Systemen (d.h. exakter: auf fremden Erlang-Knoten) zu starten, ohne
dass irgendwelche Probleme aufträten.
2.2. Knoten
Ein Begriff/Konzept aus Erlang sollte noch angesprochen werden: Der Knoten (engl. node). Knoten stellen eine Abstraktionsebene des verteilten Systems oberhalb der Prozesse
dar. Ein Knoten arbeitet auf einem (physikalischen) System (allerdings können durchaus
3
Die genaue Semantik des receive ist nicht ganz einfach (vorallem wenn auch Pattern-Matching benutzt
wird), bei Interesse sei auf die Erlang-Dokumentation [erlb] verwiesen.
6
2. Erlang
mehrere Knoten auf dem selben laufen). Er dient als Umgebung für die Prozesse. Jeder
Prozess läuft also auf einem Knoten.
Knoten teilen bestimmte Resourcen und können so bestimmte Aufgaben effizienter
lösen, beispielsweise die Kommunikation zwischen Prozessen auf dem gleichen Knoten.
2.3. Robuste Programmierung
Eine der Möglichkeiten, mit Fehlschlägen (abstürzenden Prozessen, ausfallenden Knoten, Verbindungsproblemen) umzugehen, ist das Linken von Prozessen. Prozesse werden
damit (bidirektional) verbunden. Gelinkte Prozesse terminieren, falls die andere Seite
terminiert oder nicht mehr erreichbar ist. Außerdem können Prozesse sich gegenseitig
überwachen. Bei Erreignissen, die den anderen Prozess betreffen, bekommt der überwachende Prozess dann bestimmte Nachrichten gesendet.
7
3. Herausforderungen
Die Zielsetzung von Cloud Haskell ist es, Konstrukte in Haskell zur Verfügung zu stellen,
die möglichst viel von dem unterstützen, was in Erlang möglich ist.
Als zentrale Schwierigkeiten dieses Vorhabens lassen sich zwei Punkte identifizieren:
• Haskell ist streng getypt - schon die (ungetypten) Mailboxen scheinen damit inkompatibel zu sein; des Weiteren z.B. das Senden von Nachrichten ein Seiteneffekt,
der als solcher in Haskell speziell gehandhabt werden muss.
• Das Übermitteln (oder auch nur die Serialisierung) beliebiger Werte ist in Haskell nicht möglich. Als Stichwort seien hier Funktionen genannt: Solche an andere
Prozesse senden zu können ist wichtiger Bestandteil des Nachrichtensystems von
Erlang.
In [EBPJ11] werden folgende Lösungen für diese Schwierigkeiten vorgestellt:
• Die einstellige Typklasse Serializable vereinigt die Typklassen Binary und
Typeable1 . Werte von Binary-Instanzen lassen sich in einen ByteString konvertieren, welcher „ungetypt“ an andere Prozesse versendet (und dort wieder dekodiert)
werden kann. Die Typklasse Typeable bietet eine Form von Reflektion von Typen.
Sie enthält genau eine Methode: typeOf :: a -> TypeRep. Der Parameter ist dabei nur zur Festlegung des Typs vorhanden; der Wert soll nicht benutzt werden
(er dürfte also auch undefiniert sein). Der Rückgabewert ist eine Repräsentation
des Typs. Dieser kann an andere Prozesse gesendet werden2 und erlaubt es dann
auf Empfangsseite, den Typ der einzelnen Objekte zu identifizieren, wodurch auf
bestimmte Typen gematcht werden kann.
class ( Binary a , Typeable a ) = > Serializable a
instance ( Binary a , Typeable a ) = > Serializable a
• Zusätzlich können getypte Nachrichtenkanäle (Channel s) angelegt werden
• Es wird eine Monade bereitgestellt (Process3 ), auf der alle Operationen (mit Seiteneffekten) des Frameworks arbeiten.
instance Monad Process
instance MonadIO Process
1
Binary und Typeable sind Teil der Haskell-Standardbibliothek.
TypeRep ist nicht Binary-Instanz, er erlaubt aber die Zuordnung zu einem eindeutigen 128-Bit-Wert.
3
in [EBPJ11] noch ProzessM
2
8
3. Herausforderungen
• Es wird (mit einigen Tricks) ermöglicht, Funktionen (inklusive der Closure, also
den Werten der in dem Funktionsausdruck freien Variablen) zu versenden. Dieses
Thema benötigt eine eingehendere Erklärung und wird deshalb in dem Kapitel 5
genauer betrachtet.
Es sei angemerkt, dass bei dem Übertragen von Funktionen nicht ganz die Fähigkeiten von Erlang erreicht werden. Der Grund ist alleinig, dass Erlang die Übertragung
von Funktionen nativ unterstützt und Haskell nicht. In Cloud Haskell wird in keiner
Form Funktionscode übertragen, sondern alleinig ein Identifier, der auf der Gegenseite
dem dortigen Code zugeordnet wird. Die logische Folgerung ist, dass jede zu versendende Funktion schon auf dem entfernten Knoten vorhanden sein muss. Dies bedeutet (im
Moment), dass auf den miteinander kommunizierenden Knoten das gleiche Programm
ausgeführt wird, allerdings mit unterschiedlichen Parametern (durch welche der Anwender beispielsweise den Knoten Rollen zuordnen kann)4 .
Diese Einschränkung existiert so in Erlang nicht - es ist möglich, Funktionen an Knoten
zu senden, die diese Funktion noch überhaupt nicht „kannten“.
3.1. Vorausschau
In den folgenden Abschnitten wird nun zunächst das Interface von Cloud Haskell genauer
eingeführt:
• die Handhabung von Knoten und Prozessen (und deren Ids)
• robuste Programmierung: Linken und Monitoring
• das Senden und Empfangen von „ungetypten“ Nachrichten
• getypte Channels
Darauf folgend wird die Vorgehensweise für die Übermittlung von Funktionen bzw.
den Closures dargestellt.
4
Das letzte Beispiel im Anhang zeigt, wie ein Programm mit Rollen eines Master und mehrerer Slaves
aussehen könnte.
9
4. Das Interface von Cloud Haskell
4.1. Knoten und Prozesse
Knoten stellen die äußerste Strukturierungseinheit des Systems; sie sind lokal zu einem
physikalischen System. Auf einem Knoten läuft eine beliebige Anzahl von Prozessen. Knoten und Prozesse besitzen Ids, die unter Anderem zur Nachrichtenübertragung genutzt
werden können.
Cloud Haskell erlaubt es innerhalb eines Prozesses, noch nebenläufige Programmierung
in Haskell zu betreiben (Concurrent Haskell, mit forkIO usw.). Die Kommunikation zwischen den einzelnen Haskell-Threads kann dann auch über Shared-Memory-Konstrukte
wie MVars erfolgen. Insofern können traditionelle verteilte Programmierung in Haskell
Prozess A
Prozess B
Prozess C
Prozess D
Nachrichte
Versand
Mailbox
Prozess E
Knoten
Thread 2
Thread 1
Prozess
Thread 3
Abbildung 4.1.: Die Prozesse A und B teilen keinen Speicher, obwohl sie auf dem gleichen System (und Knoten) operieren. Prozess E nutzt intern Concurrent
Haskell; die andere Threads können dabei allerdings keine Nachrichten
senden oder empfangen.
10
4. Das Interface von Cloud Haskell
und Cloud Haskell zusammen genutzt werden.
Für die Kommunikation zwischen Prozessen sollen ausschließlich Nachrichten genutzt
werden. Deshalb wird (durch passende Typisierung) verhindert, dass Shared-MemoryKonstrukte zwischen Prozessen ausgetauscht werden (auch wenn diese Prozesse beispielsweise auf dem gleichen Knoten laufen). Genauer gesagt wird dies gesteuert durch die
Serializable-Typklasse, von welcher z.B. MVar und ThreadIds keine Instanzen sind.
4.2. Das Starten von Prozessen
Es gibt zwei leicht unterschiedliche Szenarien, in denen der Programmierer neue Prozesse
anlegen möchte:
1. Beim Starten des Programms, wenn man beispielsweise einen Knoten angelegt hat,
aber noch keinen Prozess. In diesem Fall befindet man sich also in der IO-Monade
und möchte einen Prozess anlegen (und dann in der Process-Monade weiterarbeiten).
2. Man ist schon in der Process-Monade, und möchte weitere Prozesse starten. Dieser Fall entspricht dem spawn in Erlang. Man ist hier mit hoher Wahrscheinlichkeit interessiert an der ProcessId des neu gestarteten Prozesses, um beispielsweise
Nachrichten an den neuen Prozess senden zu können.
4.2.1. runProcess
Für den ersten Fall stellt das Interface von Cloud Haskell mehrere Varianten zur Verfügung, von denen ich hier nur eine vorstellen möchte:
runProcess :: LocalNode -> Process () -> IO ()
Eine LocalNode erhält man dabei mit der Funktion newLocalNode1 .
Diese Funktion erlaubt es sozusagen, aus der IO-Monade in die Process-Monade zu
wechseln. runProcess ist nicht asynchron; terminiert also wenn der Prozess terminiert.
Die Einschränkung dieser Funktion ist, dass sie nur lokal anwendbar ist - LocalNode ist
nicht serialisierbar (im Gegensatz zu NodeId). Dafür ist der Typ aber auch entsprechend
einfach.
4.2.2. spawn
Im Gegensatz zu runProcess soll es bei spawn nun möglich sein, Prozesse auf beliebigen (also auch entfernten) Knoten zu starten. Deshalb sollte ein Parameter von spawn
nun eine NodeId sein. Der andere Parameter sollte angeben, was ausgeführt wird also Code in der Process-Monade. Der Rückgabetyp ist dann eine ProcessId, bzw.
Process ProcessId, da spawn in der Process-Monade arbeiten soll.
1
Dabei braucht man wieder zwei Parameter: ein backend und eine RemoteTable. Dies führt aber zu sehr
in Implementierungsdetails; An den ersten beiden Beispiele im Anhang kann man sich die Nutzung
ein wenig klar machen.
11
4. Das Interface von Cloud Haskell
Ein erster Versuch für Signatur wäre also
-- noch falsch !
spawn :: NodeId -> Process () -> Process ProcessId
Das Problem hier ist, dass der auszuführende Code ggf. übertragen werden muss und wie schon angesprochen ist dies nur mit Tricks möglich, in diesem Fall mit dem
Typkonstruktor Closure. Die korrekte Signatur ist
spawn :: NodeId -> Closure ( Process ()) -> Process ProcessId
Closure a beschreibt also eine übertragbare Variante des Typs a. Eine Closure enthält
in irgendeiner Form also einen Wert (eine Funktion) und dessen Umgebung - genaueres
dazu folgt später.
4.3. Robuste Programmierung
Auch in Haskell treten Programmierfehler auf und gerade ein verteiltes System muss auf
solche Fehler geeignet reagieren können (und nicht etwa Fehler einzelner Systeme zum
Absturz des gesamten verteilten Programms werden lassen). Cloud Haskell stellt mehrere
Methoden bereit, um auf gewollte oder ungewollte Terminierung anderer Prozesse zu
reagieren. Die wichtigsten zwei2 sind:
link :: ProcessId -> Process ()
monitor :: ProcessId -> Process MonitorRef
Beide Methoden starten eine Überwachung des anderen Prozesses durch den aktuellen
Prozess; sie blockieren also nicht. Die Verbindung ist in beiden Fällen unidirektional3 .
Der Unterschied der zwei Methoden ist die Art, wie auf ein Erreignis (Terminierung,
Absturz oder Verbindungsproblem) reagiert wird:
• Bei link wird im überwachenden Prozess eine Exception geworfen. link ist so gedacht, dass die Exception nicht gefangen wird, sondern zur Terminierung des Prozesses führt - Terminierung wird also propagiert.
• dagegen ist monitor mehr für den Fall gedacht, wenn spezifischer reagiert werden
soll: Es wird eine Nachricht an die Mailbox des Prozesses gesendet, die wie andere
Nachrichten empfangen werden kann. Die Nachricht enthält in diesem Fall dann
zusätzliche Informationen wie z.B. den genauen Grund.
Der Rückgabewert von monitor dient als Handle für das Beenden einer Überwachung;
auf diese weiteren Methoden werde ich hier aber nicht eingehen.
2
An diesem Punkt gibt es leichte Abweichungen zwischen [EBPJ11] und dem aktuellen Interface von
Cloud Haskell; dort werden zwei Methoden angeführt, welche zwar ähnlich zu den hier vorgestellten
Methoden, aber doch semantisch leicht unterschiedlich sind.
3
dies ist ein Unterschied zu den gleichnamigen Methoden in Erlang: link ist dort bidirektional, siehe
[erla]
12
4. Das Interface von Cloud Haskell
4.4. Ungetypte Nachrichten
Jeder Prozess hat einen ihm zugeordneten, ungetypten Channel, der genau der Mailbox
aus Erlang entspricht.
Nachrichten sind asynchron, zuverlässig und gepuffert. Die entsprechenden Operationen arbeiten auf der Process-Monade; d.h. sie können auch nur in dieser Monade genutzt
werden. Die Signatur für das Senden und Empfangen ist:
send :: Serializable a = > ProcessId -> a -> Process ()
expect :: Serializable a = > Process a
Beim Senden wird das Objekt serialisiert und zusammen mit der Repräsentation seines
Typs versendet. Beim Empfangen mit expect wird solange blockiert, bis eine der Nachrichten in der Queue den erwarteten (ggf. inferierten) Typ hat. Dann wird diese Nachricht
aus der Queue entfernt und das entsprechende Object deserialisiert und zurückgegeben.
Wie in Erlang muss die zurückgegebene Nachricht nicht die erste sein; es ist also nicht
zwingend, in Sende-Reihenfolge auszulesen.
Als Beispiel betrachten wir die folgende einfache Kommunikation: Prozess 1 sendet
zwei Nachrichten an Prozess 2:
-- p ist die ProzessId von Prozess 2.
send p ( " hello " :: String )
send p (3:: Int )
Und Prozess 2 empfängt diese nun (in umgekehrter Reihenfolge) und gibt beide aus:
( i :: Int ) <- expect
( s :: String ) <- expect
liftIO $ print i
-- liftIO , da wir uns
liftIO $ print s
-- in der Process - Monade befinden .
Die Ausgabe ist 3 und "hello". Dieses Beispiel soll zum einen das Empfangen in umgekehrter Reihenfolge veranschaulichen, zum anderen die „Ungetyptheit“: Es wurde hier
zum besseren Verständnis die Typen der Nachrichten mit angegeben, aber dies wäre natürlich wie gewöhnlich in Haskell nicht nötig (solange die Typen inferiert werden können).
Im Anhang findet sich das komplette Programm zu den Codeauschnitten oben.
4.5. Matchen von Nachrichten - feiner gesteuertes
Empfangen
In dem Beispiel oben waren die Typen der zu empfangenden Nachrichten (Int und
String) fest gesetzt. In echten Programmen wird man allerdings oft mehr wollen: Gleichzeitig mehrere unterschiedliche Nachrichten empfangen. Mit den bis hierhin vorgestellten
Methoden ist die Granularität des Empfanges aber gleich mit den Typen der Nachrichten:
• Gleichzeitig kann nur genau ein Typ von Nachricht empfangen werden.
• Es ist nicht möglich, nur bestimmte Nachrichten eines Typs zu empfangen.
13
4. Das Interface von Cloud Haskell
Diese Einschränkungen werden in bestimmten Situationen unschön sein; deshalb bietet
Cloud Haskell zusätzliche Methoden, die in diese beiden Richtungen mehr Flexibilität
bringen.
4.5.1. Gleichzeitiges Empfangen mehrere Typen von Nachrichten
Als erstes wird die Funktion match eingeführt:
match :: Serializable a = > ( a -> Process b ) -> Match b
Diese tut nicht mehr, als den Code, der beim Empfangen von einer Nachricht des Typs
a ausgeführt werden soll, zu verpacken. Das Ergebnis ist ein neuer Datentyp4 , der als
allgemeines Interface für Matching verwendet wird.
Die Funktion receiveWait erlaubt es nun, mehrere Matches zu kombinieren:
receiveWait :: [ Match b ] -> Process b
Dabei werden die Matches in der gegebenen Reihenfolge getestet und der erste passende
wird ausgeführt (durch den Parameter a -> Process b). Solange keine Nachricht passt,
blockiert die Methode.
Das „einfache“ expect lässt sich - nebenbei bemerkt - durch diese beiden Funktionen
ausdrücken:
expect :: Serializable a = > Process a
expect = receiveWait [ match return ]
4.5.2. Bedingtes Empfangen von Nachrichten eines Typs
Die Methode matchIf besitzt im Vergleich zu match ein zusätzliches Prädikat als Parameter, durch welches nur bestimmte Nachrichten eines Typs empfangen werden können.
matchIf :: Serializable a = >
( a -> Bool ) -> ( a -> Process b ) -> Match b
Zurückgegeben wird ein Match, der sich wie gehabt mit receiveWait nutzen (und mit
anderen Matches kombinieren) lässt.
4.5.3. Empfangen mit Timeout
Die Funktion receiveWait wartet unendlich auf passende Nachrichten. In bestimmten
Situationen möchte man als Programmierer aber ein Maximum für die Wartezeit angeben. Deshalb bietet Cloud Haskell eine entsprechende Variante5 :
receiveTimeout :: Int -> [ Match b ] -> Process ( Maybe b )
Der erste Parameter ist dabei das Timeout in Millisekunden.
Durch receiveTimeout lässt sich mit einem Timeout von 0 natürlich auch der Effekt
eines „tryReceive“, also einer nicht-blockierenden Variante, erreichen.
4
In [EBPJ11] war der entsprechende Typkonstruktor MatchM zweistellig und Monadeninstanz; hier ist
das aktuelle Interface also etwas einfacher geworden.
5
Das parallele Konstrukt in Erlang ist receive ... after x -> ... end
14
4. Das Interface von Cloud Haskell
4.6. Getypte Channel
Wenn schon ein strenges Typsystem zur Verfügung steht, wäre es unschön, nur ungetypte
Kommunikation in Form der Mailboxen zu ermöglichen. Daher bietet Cloud Haskell die
Möglichkeit, Channels anzulegen, die fest typisiert sind:
newChan
:: Serializable a = >
Process ( SendPort a , ReceivePort a )
sendChan
:: Serializable a = >
SendPort a -> a -> Process ()
receiveChan :: Serializable a = >
ReceivePort a -> Process a
Getypte Channels behalten die Form der N-zu-1-Kommunikation bei. Dies ist realisiert
dadurch, dass SendPorts serialisierbar sind und ReceivePorts nicht. Folglich kann ein
ReceivePort nur von dem Prozess genutzt werden, in welchem der Channel angelegt
worden ist. Es gibt des Weiteren Funktionen, um Channels des gleichen Typs zu vereinen:
mergePortsBiased :: Serializable a
[ ReceivePort
mergePortsRR
:: Serializable a
[ ReceivePort
=>
a ] -> Process ( ReceivePort a )
=>
a ] -> Process ( ReceivePort a )
Die beiden Varianten unterscheiden sich dabei im internen Scheduling: Bei der ersteren
wird bei jedem receiveChan die Liste der Ports in Reihenfolge abgefragt, was dazu
führen kann, dass nur Nachrichten aus dem ersten Port benutzt werden (falls dieser Port
andauernd Nachrichten enthält). mergePortsRR ist „gerechter“: Durch ein Robin-RoundVerfahren wird ausgeschlossen, dass einzelne Ports nie gelesen werden.
15
5. Closures
Bisher können wir bestimmte Arten von Werten, wie z.B. Funktionen oder Prozesse,
nicht serialisieren.
Betrachte als Beispiel folgende Moduldefinition:
module M1 where
f :: Int -> Int
f x = x + 1
Angenommen, es soll die Funktion f übertragen werden. Wie schon anfangs verraten
ist die Grundidee, intern den String „M1.f“ zu übertragen. Auf Empfangsseite kann dann
mit Hilfe eines Wörterbuchs, d.h. einer Zuordnung von Strings zu Funktionen (bzw. zu
beliebigen Werten), dieser String zugeordnet werden zu der dortigen Funktion.
Klappt dies für beliebige Funktionen? Die Antwort ist Nein:
p1 :: Process ()
p1 = do
( x :: Int ) <- expect
let g y = x + y
send p2 g -- nur um die Absicht klar zu machen ,
-- Typmaessig passt das so ja nicht .
Bei dem Versuch, die Funktion g zu übertragen, tritt nämlich folgendes Problem auf:
Damit g in das Wörterbuch hinzufügt werden kann, muss die Funktion auf Top-Level
definiert sein. g kann nicht einfach auf Top-Level-Ebene verschoben werden, da die Variable x dann frei und ungebunden wäre. Und wenn wir g dabei entsprechend abändern,
um die Abhängigkeit explizit zu machen?
g x y = x + y
p1 = do
( x :: Int ) <- expect
send p2 ( g x )
Dadurch haben wir nichts gewonnen, denn jetzt müsste nicht mehr g übertragen werden,
sondern ein Ausdruck (die partielle Applikation von g und x ). Das Problem ist in beiden Fällen dasselbe: Es sind die freien (und nicht auf Top-Level gebundenen) Variablen
innerhalb des Ausdrucks, der übertragen werden soll:
g y = x + y -- x ist frei !
(g x)
-- g und x sind frei ,
-- aber g ist auf Top - Level gebunden
16
5. Closures
Dem entsprechend sind es gerade die Werte der freien Variablen, die auf Empfangsseite
fehlen, wenn ausschließlich g übertragen wird. Die triviale Konsequenz ist es, diese Werte
hinzuzunehmen - unsere Überlegung ist bei Closures angelangt. Closures sind nichts
anderes als die Kombination der Repräsentationen
• eines (Funktions-)Ausdrucks, und
• der Werte der freien Variablen innerhalb des Ausdrucks (bezeichnet als „Umgebung“).
Unter Nutzung von Closures lässt sich nun der Ausdruck oben übertragen: Für die
Funktion g wird ein Identifikator übertragen, das auf der Gegenseite zugeordnet werden
kann; als Umgebung wird der Wert von x übertragen. Wichtig zu beachten ist, dass alle
Werte der Umgebung bei dieser Vorgehensweise serialisierbar sein müssen.
Soweit zu der Überlegung, was grundsätzlich möglich ist. Es sind nun (in Richtung
auf die tatsächliche Implementierung) zunächst zwei Typkonstruktoren, Static und
Closure, einzuführen.
5.1. Static
Der erste, einstellige Typkonstruktur, Static, dient zur Kennzeichnung von Identifikatoren für Werte (etwas in der Art vov „M1.f“), in Abgrenzung zu „echten“ Werten. Ein
solcher Identifikator für einen Wert vom Typ a hat den Typ Static a.
Wichtige Eigenschaft des Typs Static a ist, dass er serialisierbar ist, also dass er Instanz der Typklasse Serializable ist; und dies unabhängig von a: Es geht ja allgemein
darum, bestimmte Ausdrücke, die von sich aus nicht Serializable sind, doch übertragbar zu machen. Eine Einschränkung wird allerdings doch gemacht: a muss Instanz von
Typeable sein. Dadurch können bestimmte Typfehler im Zusammenspiel mit Static erkannt werden1 .
Um dies noch etwas konkreter zu machen:
• Der Typ a kann eine Funktion sein, z.B. a = b → c
• Der Typ a kann ein Datenkonstruktor sein, z.B. a = [Int]
• Der Typ a kann sogar selbst Serializable sein. Dies macht z.B. Sinn, wenn der Wert
groß (oder sogar unendlich) ist. Wenn dieser Wert auf der Empfangsseite eigentlich
schon vorliegt, ist es u.U. effizienter, eine Referenz zu übertragen.
Die nächste Frage, die sich stellt, ist, welche Ausdrücke sich in einen Static verpacken lassen. Wie oben erkannt, hängt dies von den freien Variablen des Ausdrucks ab.
Es lassen sich (mindestens) Ausdrücke, die ausschließlich aus einer Variablen bestehen,
welche auf Top-Level gebunden ist, in Static verpacken. Aber es ist durchaus möglich,
bestimmte komplexere Ausdrücke zuzulassen: Wenn alle freien Variablen des Ausdrucks
1
allerdings erst zur Laufzeit, siehe den Abschnitt Dynamic Type Checking auf [chs].
17
5. Closures
auf Top-Level gebunden sind, sollte es prinzipiell möglich sein, den gesamten Ausdruck
zu verpacken.
Die konkreten Methoden, die Cloud Haskell zur Verfügung stellt, um Static-Werte zu
erzeugen, werden in Abschnitt 5.4 vorgestellt.
5.2. Closure
Eine Closure besteht aus einem Wert und einer Umgebung. Die offene Frage ist, welche Typen dies genau sind bzw. welche Signaturen die entsprechenden Methoden des
Interfaces haben.
Zunächst können wir den einfachen Fall betrachten, dass die Umgebung leer ist. Dann
besteht die Closure nur aus einem Wert, welcher vom Typ Static a ist für einen Typ
a. Dieser Typ macht auch gerade den Typ der Closure aus - somit wäre Closure ein
einstelliger Typkonstruktor.
data Closure a
-- Methode , um Closures zu erzeugen ,
-- deren Umgebung leer ist
staticClosure :: Typeable a = > Static a -> Closure a
Tatsächlich entspricht das schon der wirklichen Closure-Deklaration; auch die Signatur
der Methode staticClosure ist so dem echten Interface entnommen.
Betrachten wir nun den Fall, dass die Umgebung nicht leer ist. Als erste Vereinfachung
ist es möglich, das Problem einzuschränken auf den Fall, dass die Umgebung nur genau
eine Bindung enthält. Notfalls ist dieser eine Wert ein Tupel von mehreren anderen
Werten. Dann könnte eine Methode zum Erzeugen vom Closures so aussehen:
-- noch falsch
-- Methode zur Erzeugung einer Closure
closure :: Serializable env = >
Static ( env -> a ) -> env -> Closure a
Die Methode ist also polymorph im Typ env. Es mag hier die Idee auftreten, den Typkonstruktor Closure zweistellig zu machen. Dies wäre aber nicht wirklich das, was wir
wollen. Betrachte dazu ein altes Beispiel:
spawn :: NodeId -> Closure ( Process ()) -> Process ProcessId
Der Typ der Closure ist ein Prozess. Auf Empfangsseite werden wir die Closure entpacken,
und sind dann auch nur noch an dem Prozess interessiert. Der Typ der Umgebung sollte
also explizit nicht in die den Typen einer Closure einfließen.
Der Typ env darf aber auch nicht universell sein, da er dadurch nicht mehr beim
Serialisieren und Deserialisieren der Closure bekannt wäre. Es wäre also gar nicht mehr
möglich, die Serializable-Instanz für Closure a zu schreiben. Die Lösung besteht in einer
weiteren Vereinfachung: Die Serialisierung der Umgebung wird vorgezogen. Serialisierung
18
5. Closures
ist nichts anderes als die Umwandlung in einen festen Typ: ByteString. Die Signatur für
das Anlegen von Closures mit Umgebung ist also folgende:
closure :: Static ( ByteString -> a ) -> ByteString -> Closure a
Durch diese Vereinfachung ist der Typ einer Closure hinreichend einfach. Intern besteht eine Closure aus genau diesen beiden Werten. Das Unschöne ist allerdings, dass
der Programmierer für alle Arten von Umgebungen, die er in eine Closure verpacken
möchte, die entsprechende Funktion zum Auslesen des ByteStrings definieren müsste
(und genauso müsste er beim Verpacken die Umwandlung in den ByteString ausführen).
Daher bietet Cloud Haskell bestimmte Methoden (implementiert in Template Haskell),
die automatisch solche Funktionen erzeugen. Dieses wird in Kapitel 5.4 vorgestellt.
5.3. Das Wörterbuch
Das Wörterbuch zum Nachschlagen von den serialisierten Static-Werten ist das einzige
Konstrukt, was noch fehlt. Der entsprechende Datentyp ist
data RemoteTable
Dieses Wörterbuch enthält Zuordnungen, sodass sich Static-Werte wieder „entpacken“
lassen. Die entsprechenden Methoden sind:
unstatic
:: Typeable a = >
RemoteTable -> Static a -> Either String a
unclosure :: Typeable a = >
RemoteTable -> Closure a -> Either String a
Dabei benutzt unclosure intern unstatic. Der Rückgabewert ist ein Either, der durch
den String einen Fehler kennzeichnen kann.
Für diese beiden Methoden existiert jeweils auch noch eine Variante, die in der ProcessMonade arbeiten und das Wörterbuch indirekt aus dem Prozess bezieht:
unStatic :: Typeable a = > Static a -> Process a
unClosure :: Typeable a = > Closure a -> Process a
Der Rückgabetyp ist hier kein Either mehr; im Fehlerfall wird eine Exception geworfen.
Wie gleich beschrieben, kann dieses Wörterbuch weitgehend automatisiert angelegt
werden. Es ist allerdings auch möglich, das Wörterbuch per Hand zu füllen; insgesamt
reichen die bis hierhin eingeführten Konstrukte aus, um in Cloud Haskell Closures zu
benutzen. Da aber die Hilfsfunktionen, die durch Template-Haskell geboten werden, sowohl knapperen als auch weniger fehleranfälligen Code erlauben, werde ich mich in der
Erklärung auf das Template-Haskell-Interface beschränken.
5.4. Die Template-Haskell Hilfsfunktionen
Eine Anmerkung zu Template-Haskell: Zur Nutzung in GHC ist das Compiler-Flag „TemplateHaskell“ notwendig.
19
5. Closures
Die Funktion remotable bildet die Basis für die Automatisierung. Sie muss auf ModulEbene genutzt werden; die Parameter sind dann eine Liste von Werten (Funktionen) des
aktuellen Moduls, welche umwandelbar in Static-Werte sein sollen. Die Funktionsnamen
werden dabei durch ein Apostroph als Präfix escaped, am Besten wird dies durch ein
Beispiel klar:
{ - # LANGUAGE TemplateHaskell # -}
module Main where
printHello :: Process ()
printHello = liftIO $ putStrLn $ " Hello "
remotable [’printHello ]
Die Anwendung von remotable erzeugt bestimmte neue Funktionen und Werte; beispielsweise die Funktionen, welche die Umgebung deserialisieren (diese waren nötig, um die
Closures zu erzeugen). Ein anderer wichtiger Wert, der angelegt wird, ist __remoteTable.
Dieses ist die automatisiert gefüllte RemoteTable.
Um Closures (und die enthaltenen Static-Werte) zu erzeugen, gibt es zwei Varianten2 :
• bei leerer Umgebung kann die (schon erwähnte) Methode staticClosure kombiniert werden mit mkStatic. Letzteres ist ein Makro, der durch folgende Syntax
benutzt wird: $(mkStatic f).
• bei nicht-leerer Umgebung dient der Makro mkClosure.
Auch hier ein Beispiel:
printHello :: Process ()
printHello = liftIO ( putStrLn " Hello " )
printInteger :: Int -> Process ()
printInteger i = liftIO ( putStrLn ( show i ))
remotable [’printHello , ’printInteger ]
closureList :: [ Closure ( Process ())]
closureList =
[ staticClosure $ ( mkStatic ’printHello ) ,
$ ( mkClosure ’printInteger ) 42 ]
Diese beiden Closures hätten auch gerade den Typ, der bei spawn verlangt ist, sodass
letztlich printHello und (printInteger 42) als eigene Prozesse gestartet werden könnten.
Das zweite Beispiel im Anhang ist ein komplettes Programm, in dem verschiedene
Arten des Anlegens von Closures benutzt werden.
2
Die genauen Signaturen der Template-Haskell Funktionen stimmten nicht mehr mit den in [EBPJ11]
vorgestellten überein; insgesamt scheint es auch jetzt noch häufiger leichte Änderungen zu geben.
20
6. Beispielprogramm nun in Haskell
Als Demonstration der Fähigkeit von Cloud Haskell folgt nun eine Haskell-Variante des
Beispielprogramms in Erlang aus Kapitel 2.1. Ich habe bei der Übertragung darauf geachtet, möglichst nah an dem Original zu bleiben. Dennoch sind einige Unterschiede
unvermeidbar:
1. Der Worker ist nicht ganz so frei in seinem Typ: Der Rückgabewert wird fest auf
Int gesetzt.
2. Statt einer Funktion und einem Parameter wird direkt eine Closure übertragen.
Dies ist ganz einfach knapper und wäre so in Erlang nicht möglich, da dort nicht
unterschieden werden kann zwischen einem Wert und der Closure des Wertes (d.h.
man könnte die Auswertung der Funktionsapplikation nicht verhindern).
3. Der Worker nimmt als Parameter ein Tupel, der die eigentlichen drei Werte enthält. Grund dafür ist, dass der mkClosure-Makro genau einen Parameter erwartet.
Würde man die Closure per Hand generieren, könnte man so etwas umgehen.
4. Es ist (natürlich) gewisser Overhead nötig, um in die Process-Monade zu gelangen
(und die verschiedenen nötigen Imports sind schon weggelassen).
5. Beim receive muss das etwas längere matchIf verwendet werden, da PatternMatching in dieser Form in Cloud Haskell nicht möglich ist.
-- worker mit drei Parametern : Ein Closure , die eine
-- Berechnung eines Ints enthaelt ; die ProcessId des
-- Prozesses der das Ergebnis zugesendet bekommen soll ; und
-- ein Int welcher diesen worker identifiziert .
worker :: ( Closure Int , ProcessId , Int ) -> Process ()
worker (f , resultPid , workerId ) = do
-- unClosure enthaelt die Funktionsapplikation die in Erlang
-- explizit war .
result <- unClosure f
-- der Typ der Nachricht ist ein Tupel , genauso wie in
-- Erlang .
send resultPid ( workerId , result )
fac :: Int -> Int
fac 0 = 1
fac n = n * fac (n -1)
21
6. Beispielprogramm nun in Haskell
remotable [ ’ worker , ’ fac ]
main :: IO ()
main = do
-- starte lokalen Knoten und Prozess auf diesem
backend <- initializeBackend " localhost " " 0 " rtable
node <- newLocalNode backend
runProcess node $ do
-- beziehe beide Ids ( fuer Knoten und Prozess )
nodeId <- getSelfNode
selfPid <- getSelfPid
-- starte ersten " worker " , der ( fac 5) berechnet .
spawn nodeId (
$ ( mkClosure ’ worker )
( ( $ ( mkClosure ’ fac ) (5:: Int )) ,
selfPid ,
1:: Int
)
)
-- starte ersten " worker " , der ( fac 3) berechnet .
spawn nodeId (
$ ( mkClosure ’ worker )
( ( $ ( mkClosure ’ fac ) (3:: Int )) ,
selfPid ,
2:: Int
)
)
-- warte auf beide Ergebnisse . Nutze das bedingte
-- Matching , um zuerst das Ergebnis des ersten Workers zu
-- empfangen .
receiveWait [
matchIf
(\( workerId1 :: Int , result1 :: Int ) -> workerId1 == 1)
(\( workerId1 :: Int , result1 :: Int ) ->
receiveWait [
match
(\( _ :: Int , result2 :: Int ) ->
liftIO (
putStrLn (
show result1 ++ " ␣ " ++ show result2
)
)
)
22
6. Beispielprogramm nun in Haskell
]
)
]
where
rtable = __remoteTable initRemoteTable
23
7. Performanz
Als Test für die Performanz von Cloud Haskell diente der k-means-Algorithmus, welcher
eine Verallgemeinerung von MapReduce [DG08] darstellt. Als Referenz wurde die Apache
Mahout Implementierung genutzt, welche auf dem Hadoop-Framwork aufbaut1 .
Die beiden Implementierungen zeigen ungefähr gleiche Performanz - weder Cloud Haskell noch Hadoop kann als allgemein besser bezeichnet werden. Cloud Haskell ist bei
geringer Anzahl von verwendeten Knoten langsamer (bei nur einem Knoten etwa um den
Faktor 2.5). Während Hadoop ab einer Knotenanzahl von 20 keine Laufzeitverbesserung
mehr zeigt, profitiert Cloud Haskell bis hin zu dem gemessenen Maximum mit 80 Knoten von den zusätzlichen Knoten und hat dann auch eine deutlich geringere Laufzeit als
Hadoop.
1
die genauen Versuchsparameter entnehme man bitte [EBPJ11]
24
8. Zusammenfassung
Cloud Haskell erlaubt es, in Haskell verteilte Programme im Stil von Erlang zu schreiben.
Die gesamte Kommunikation wird dabei von Cloud Haskell implementiert und in Form
der Nachrichtenübertragung dem Programmierer zur Verfügung gestellt.
Bei der Übertragung der Konzepte aus dem dynamisch typisierten Erlang traten zwei
zentrale Herausforderungen auf: der Umgang mit Haskells strengen Typsystem und die
Übertragung von (Funktions)Closures an andere Knoten. In beiden Punkten wurde eine
annehmbare Lösung gefunden und implementiert.
Zur Handhabung der Seiteneffekte wurde die Process-Monade eingeführt, in der die
meisten der von Cloud Haskell bereitgestellten Funktionen arbeiten. Dadurch kann gleichzeitig auch gewähleistet werden, dass diese Funktionen nicht missbraucht werden - sie sind
nur innerhalb der Process-Monade nutzbar. Mit Hilfe der Haskell-Typklassen Binary
und Typeable lassen sich dynamisch getypte Nachrichten realisieren.
Statt von Funktionen werden Identifikatoren für diese übertragen, die auf Empfangsseite wieder den eigentlichen Werten zugeordnet werden. Dieser Trick hat gewisse Einschränkungen. Closures wurden eingeführt, um Funktionen um eine Umgebung zu erweitern. Die Realisierung des entsprechenden Interfaces nutzt Template Haskell und kommt
dann ohne die Modifikation des Haskell-Compilers aus. Die Autoren haben allerdings die
Absicht ausgedrückt, den GHC-Compiler leicht zu modifizieren, um beispielsweise die
Typklasse Static direkt in die Sprache zu verankern und damit die Implementierung
und Nutzung noch einfacher zu gestalten.
25
A. Übersicht über das Cloud Haskell
Interface
allgemeine Klassen und Instanzen
instance Monad Process
instance MonadIO Process
class ( Binary a , Typeable a ) = > Serializable a
Knoten und Prozesse
newLocalNode :: Transport -> RemoteTable -> IO LocalNode
localNodeId :: LocalNode -> NodeId
runProcess
:: LocalNode -> Process () -> IO ()
spawn :: NodeId -> Closure ( Process () ) -> Process ProcessId
call :: Serializable a = >
Static ( SerializableDict a ) -> NodeId -> Closure ( Process a )
-> Process a
terminate :: Process a
getSelfNode :: Process NodeId
getSelfPid :: Process ProcessId
robuste Programmierung
link
:: ProcessId -> Process ()
monitor :: ProcessId -> Process MonitorRef
ungetypte Nachrichten
send
:: Serializable a = > ProcessId -> a -> Process ()
expect :: Serializable a = > Process a
match
:: Serializable a = > ( a -> Process b ) -> Match b
matchif :: Serializable a = > ( a -> Bool ) -> ( a -> Process b ) -> Match b
receiveWait
::
[ Match b ] -> Process b
receiveTimeout :: Int -> [ Match b ] -> Process b
getypte Channel
newChan
:: Serializable a = > Process ( SendPort a , ReceivePort a )
sendChan
:: Serializable a = > SendPort a -> a -> Process ()
receiveChan :: Serializable a = > ReceivePort a -> Process a
mergePortsBiased :: Serializable a = >
[ ReceivePort a ] -> Process ( ReceivePort a )
mergePortsRR
:: Serializable a = >
[ ReceivePort a ] -> Process ( ReceivePort a )
26
A. Übersicht über das Cloud Haskell Interface
Static und Closure
data Static a
instance Typeable1 Static
instance Typeable a = > Binary ( Static a )
staticLabel :: String -> Static a
staticApply :: Static ( a -> b ) -> Static a -> Static b
data Closure a
staticClosure :: Typeable a = > Static a -> Closure a
closure :: Static ( ByteString -> a )
-> ByteString
-> Closure a
data RemoteTable
initRemoteTable :: RemoteTable
unstatic :: Typeable a = > RemoteTable -> Static a -> Either String a
unclosure :: Typeable a = > RemoteTable -> Closure a -> Either String a
unStatic :: Typeable a = > Static a -> Process a
unClosure :: Typeable a = > Closure a -> Process a
Template Haskell Magic
remotable :: [ Name ] -> Q [ Dec ]
mkStatic :: Name -> Q Exp
mkClosure :: Name -> Q Exp
27
B. Beispiel für Senden und Empfangen
{ - # LANGUAGE TemplateHaskell , ScopedTypeVariables # -}
{ - - Beispiel fuer Cloud Haskell
Senden und Empfangen von der ( ungetypten ) Mailbox
Die main - Funktion enthaelt einiges an Setup - Code und benutzt Closures ;
dies geht inhaltlich ueber die Nachrichten hinaus . Wirklich relevant
fuer die N ac hr ic h te nu eb e rt ra g un g sind nur die Funktionen senderInt ,
senderString und receiveAndPrint .
- -}
module Main where
import System . Environment ( getArgs )
import Control . Distributed . Process (
Process ,
ProcessId ,
send ,
spawn ,
expect ,
liftIO ,
getSelfNode ,
getSelfPid )
import Control . Distributed . Static ( staticClosure )
import Control . Distributed . Process . Closure (
mkClosure ,
mkStatic ,
remotable ,
functionTDict )
import Control . Distributed . Process . Node ( initRemoteTable , runProcess )
import Control . Distributed . Process . Backend . SimpleLocalnet
import Control . Concurrent ( threadDelay )
-- ein ( Worker -) Prozess , der einen Int sendet
senderInt :: ProcessId -> Process ()
senderInt p = send p (3:: Int )
-- ein ( Worker -) Prozess , der einen String sendet
senderString :: ProcessId -> Process ()
senderString p = send p ( " hello " :: String )
remotable [ ’ senderInt , ’ senderString ]
-- der interessante Teil des masters :
-es werden zwei Werte ( unterschiedlichen Typs ) empfangen und
28
B. Beispiel für Senden und Empfangen
-ausgegeben
receiveAndPrint :: Process ()
receiveAndPrint = do
-- zwei Werte empfangen ( wahrscheinlich nicht in Sende - Reihenfolge ,
-aber das hat so oder so keine Auswirkung )
( i :: Int ) <- expect
( s :: String ) <- expect
-- Ausgabe
liftIO $ print i
liftIO $ print s
-- enthaelt viel Setup - Code ; Haupteffekt ist , dass zwei
-- Worker - Prozesse gestartet werden ; dann wird die obige Funktion
-- receiveAndPrint aufgerufen .
main :: IO ()
main = do
-- starte lokalen Knoten und Prozess auf diesem
backend <- initializeBackend " localhost " " 0 " rtable
node <- newLocalNode backend
runProcess node $ do
-- beziehe beide Ids ( fuer Knoten und Prozess )
nodeId <- getSelfNode
selfPid <- getSelfPid
-- starte ersten " worker " ( der einen String sendet )
spawn nodeId ( $ ( mkClosure ’ senderString ) selfPid )
-- eine Verzoegerung , um ein Empfangen in umgekehrter
-- Sendereihenfolge zu provozieren . Provozieren ist keine
-- Garantie , aber es soll nur dem Leser das Prinzip klar
-- gemacht werden .
liftIO $ threadDelay 1000000
-- starte zweiten " worker " ( der einen Int sendet )
spawn nodeId ( $ ( mkClosure ’ senderInt ) selfPid )
receiveAndPrint -- tut was der Name vermuten laesst
where
rtable = __remoteTable initRemoteTable
29
C. Beispiel für Closures
{ - # LANGUAGE TemplateHaskell , ScopedTypeVariables # -}
{ - - Beispiel fuer Cloud Haskell
Static - Werte und das Anlegen und Benutzen von Closures
Dieses Beispiel enthaelt drei Wege , um eine Closure anzulegen :
1. Aus einem ( einfachen ) Static - Wert , also mit einer leeren Umgebung
2. Aus einem Static - Wert , welcher sich als Komposition zweier
Static - Werte ergibt ( mit der Funktion staticApply )
3. Mit einem Static - Wert und einer Umgebung .
Um das Beispiel einfach zu halten , ist fuer die Nachrichten dieses
Beispiels der Sender gleich dem Empfaenger , d . h . der Prozess sendet
sich selbst Nachrichten . Es wuerde sich aber nichts aendern , wenn der
Empfaenger ein anderer Prozess waere .
- -}
module Main where
import System . Environment ( getArgs )
import Control . Distributed . Process (
Process ,
ProcessId ,
send ,
spawn ,
expect ,
liftIO ,
getSelfNode ,
getSelfPid ,
unClosure )
import Control . Distributed . Static (
Closure ,
staticClosure ,
staticApply )
import Control . Distributed . Process . Closure (
mkClosure ,
mkStatic ,
remotable ,
functionTDict )
import Control . Distributed . Process . Node ( initRemoteTable , runProcess )
import Control . Distributed . Process . Backend . SimpleLocalnet
plusTwo :: Int -> Int
plusTwo = (+) 2
30
C. Beispiel für Closures
plus :: Int -> Int -> Int
plus = (+)
two :: Int
two = 2
remotable [ ’ plus , ’two , ’ plusTwo ]
sendT hreeClosures :: Process ()
sendT hreeClosures = do
selfPid <- getSelfPid
-- Option Nr 1: Closure aus einfachem Static - Wert
send selfPid ( staticClosure $ ( mkStatic ’ plusTwo ) )
-- Option Nr 2: Closure aus ( komplexen ) Static - Wert , mit staticApply
send selfPid ( staticClosure ( staticApply
$ ( mkStatic ’ plus )
$ ( mkStatic ’ two ) ) )
-- Option Nr 3: Closure mit Environment ( als Static + Environment )
send selfPid ( $ ( mkClosure ’ plus ) (3 -1:: Int ) )
main :: IO ()
main = do
backend <- initializeBackend " localhost " " 0 " rtable
node <- newLocalNode backend
runProcess node $ do
sendThreeClosures
-- empfange drei Closures ( Int -> Int )
functionClosures <- sequence ( replicate 3 (
expect :: Process ( Closure ( Int -> Int ) ) ) )
-- wende auf alle unClosure an ,
-- sodass wir nun einen Liste von Funktionen haben .
functions <- mapM unClosure functionClosures
-- benutze alle Funktionen und gib jeweils das Ergebnis aus .
liftIO ( mapM_ ( print . ( $ 40) ) functions )
where
rtable = __remoteTable initRemoteTable
31
D. Ein etwas komplexeres Beispiel
{ - # LANGUAGE TemplateHaskell , DeriveDataTypeable # -}
{ - - Beispiel fuer
etwas komplexeres
unter schiedlichen
Architektur , fuer
bereitstellt .
Cloud Haskell
Beispiel mit mehreren Knoten ( potentiell auf
Systemen ) . Benutzt wird eine Master - Slaves
welche Cloud Haskell bestimmte Methoden
Verteilt berechnet werden mit diesem Programm Wurzeln ( hui hui ) .
Die Slaves melden sich dabei beim Master , wenn sie nichts zu tun
haben . Der Master wartet fuer jede Zahl , deren Wurzel berechnet
werden soll , dass sich ein Slave meldet . Dann schickt er die
Berechnung ab .
Die Ergebnisse sammeln sich in der Mailbox des Masters an ;
schliesslich liest dieser alle aus und printet sie .
FUER DIE AUSFUEHRUNG
Angenommen diese Datei ist abgespeichert als example . hs
- compilieren :
ghc - threaded example . hs
- starte einige Slaves ( in mehreren Konsolen auf dem selben
Rechner )
./ example slave localhost 8080 &
./ example slave localhost 8081 &
./ example slave localhost 8082 &
./ example slave localhost 8083 &
( oder auch im LAN , dann statt localhost die jeweilige
eigene LAN - IP des Rechners )
- starte den Master , und uebergebe hinter dem Port ( hier 8084)
eine Liste von Zahlen
./ example master localhost 8084 36 25 16 9 4 1 42
- -}
module Main where
import System . Environment ( getArgs )
import Control . Distributed . Process
import Control . Distributed . Static ( staticClosure )
import Control . Distributed . Process . Closure (
mkClosure ,
mkStatic ,
remotable ,
functionTDict ,
)
32
D. Ein etwas komplexeres Beispiel
import
import
import
import
import
Control . Distributed . Process . Node ( initRemoteTable )
Control . Distributed . Process . Backend . SimpleLocalnet
Control . Concurrent ( threadDelay )
Data . Binary ( Binary , put , get , Get , Word8 )
Data . Typeable ( Typeable (..) )
-- Datentyp , der Befehle an die Slaves darstellt
data Command = CalcSqrt ProcessId Float
| SayHello
deriving ( Typeable )
-- Binary - Instanz , damit Command Serializable ist .
instance Binary Command where
put SayHello = put (0:: Word8 )
put ( CalcSqrt p f ) = put (1:: Word8 ) >> put p >> put f
get = do
t <- get :: Get Word8
case t of
0 -> return SayHello
1 -> do
p <- get
f <- get
return ( CalcSqrt p f )
slaveProcess :: ProcessId -> Process ()
slaveProcess master = do
getSelfPid > >= send master
receiveWait [
matchIf
(\ x -> case x of SayHello -> True ; _ -> False )
(\ SayHello
-> do
liftIO $ putStrLn " hello " ) ,
match
(\( CalcSqrt pid x ) -> do
liftIO $ putStrLn $ " calculating ␣ the ␣ root ␣ of ␣ " ++ show x
send pid $ (x , sqrt x ) )
]
slaveProcess master
remotable [ ’ slaveProcess ]
master :: Backend -> [ Float ] -> [ NodeId ] -> Process ()
master backend numbers slaveNodes@ ( s : _ ) = do
self <- getSelfPid
-- Do something interesting with the slaves
liftIO $ putStrLn $ " Slaves : ␣ " ++ show slaveNodes
let slaveClosure = ( $ ( mkClosure ’ slaveProcess ) self )
-- starte die Slaves
slaves <- mapM (\ s -> spawn s slaveClosure ) slaveNodes
-- warte , dass alle Slaves bereit sind
mapM_ (\ _ -> expect :: Process ProcessId ) slaves
-- sende SayHello - Befehle
mapM_ (\ s -> send s SayHello ) slaves
33
D. Ein etwas komplexeres Beispiel
-- fuehre die Aufgaben aus ; verteile diese dabei jeweils auf
-- verfuegbare Slaves
mapM_ (\ f -> do
-- warte jeweils auf einen bereiten Slave
slaveId <- expect
-- gib ihm die Aufgabe
send slaveId ( CalcSqrt self f ) ) numbers
-- mache eine Ausgabe fuer alle Ergebnisse
-- ( in falscher Reihenfolge , h oechst wahrs cheinl ich )
mapM_
(\ _ -> do
(x , xroot ) <- expect :: Process ( Float , Float )
liftIO $ putStrLn $ " the ␣ root ␣ of ␣ " ++ show x
++ " ␣ is ␣ " ++ show xroot
return () )
numbers
-- beende die Slaves
te rminateAllSlaves backend
-- Initialisierungen
main :: IO ()
main = do
args <- getArgs
case args of
( " master " : host : port : numbers ) -> do
backend <- initializeBackend host port rtable
startMaster backend ( master backend ( map read numbers ) )
[ " slave " , host , port ] -> do
backend <- initializeBackend host port rtable
startSlave backend
where
rtable = __remoteTable initRemoteTable
34
Literaturverzeichnis
[AVWW93] Armstrong, Joe ; Virding, Robert ; Wikström, Claes ; Williams, Mike:
Concurrent Programming in ERLANG. 1993
[chs]
Cloud Haskell Dokumentation zum Paket Control.Distributed.Static.
http://hackage.haskell.org/packages/archive/distributed-static/
0.2.1.1/doc/html/Control-Distributed-Static.html#t:Static,
[DG08]
Dean, Jeffrey ; Ghemawat, Sanjay: MapReduce: simplified data processing on large clusters. In: Commun. ACM 51 (2008), Januar, Nr.
1, 107–113. http://dx.doi.org/10.1145/1327452.1327492. – DOI
10.1145/1327452.1327492
[EBPJ11]
Epstein, Jeff ; Black, Andrew P. ; Peyton-Jones, Simon: Towards
Haskell in the cloud. In: SIGPLAN Not. 46 (2011), September, Nr.
12, 118–129. http://dx.doi.org/10.1145/2096148.2034690. – DOI
10.1145/2096148.2034690
[erla]
Erlang Dokumentation zu Link.
erlang.html#link-1,
[erlb]
Erlang Dokumentation zu Receive. http://erlang.org/doc/reference_
manual/expressions.html#receive,
[GLS94]
Gropp, William ; Lusk, Ewing ; Skjellum, Anthony: Using MPI: Portable
Parallel Programming with the Message-Passing Interface. Cambridge, MA
: MIT Press, 1994. – xx + 307 S.
35
http://www.erlang.org/doc/man/
Herunterladen