Arbeitsgruppe Programmiersprachen und Übersetzerkonstruktion Institut für Informatik Christian-Albrechts-Universität zu Kiel Seminararbeit Towards Haskell in the Cloud Lennart Spitzner WS 2012/2013 Inhaltsverzeichnis 1. Einführung 2 2. Erlang 2.1. Beispielprogramm in Erlang . . . . . . . . . . . . . . . . . . . . . . . . . . 2.2. Knoten . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 2.3. Robuste Programmierung . . . . . . . . . . . . . . . . . . . . . . . . . . . 4 5 6 7 3. Herausforderungen 3.1. Vorausschau . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 8 9 4. Das Interface von Cloud Haskell 4.1. Knoten und Prozesse . . . . . . . . . . . . . . . . . . . . . . . . . 4.2. Das Starten von Prozessen . . . . . . . . . . . . . . . . . . . . . . 4.2.1. runProcess . . . . . . . . . . . . . . . . . . . . . . . . . . 4.2.2. spawn . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 4.3. Robuste Programmierung . . . . . . . . . . . . . . . . . . . . . . 4.4. Ungetypte Nachrichten . . . . . . . . . . . . . . . . . . . . . . . . 4.5. Matchen von Nachrichten - feiner gesteuertes Empfangen . . . . . 4.5.1. Gleichzeitiges Empfangen mehrere Typen von Nachrichten 4.5.2. Bedingtes Empfangen von Nachrichten eines Typs . . . . 4.5.3. Empfangen mit Timeout . . . . . . . . . . . . . . . . . . . 4.6. Getypte Channel . . . . . . . . . . . . . . . . . . . . . . . . . . . 5. Closures 5.1. Static . . . . . . . . 5.2. Closure . . . . . . . 5.3. Das Wörterbuch . . 5.4. Die Template-Haskell . . . . . . . . . . . . . . . . . . . . . . . . . . . Hilfsfunktionen . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 10 10 11 11 11 12 13 13 14 14 14 15 . . . . 16 17 18 19 19 6. Beispielprogramm nun in Haskell 21 7. Performanz 24 8. Zusammenfassung 25 A. Übersicht über das Cloud Haskell Interface 26 B. Beispiel für Senden und Empfangen 28 ii Inhaltsverzeichnis C. Beispiel für Closures 30 D. Ein etwas komplexeres Beispiel 32 iii Vorbemerkung In dem Paper, das dieser Arbeit zugrunde liegt [EBPJ11], wird das vorgestellte Framework mit „Cloud Haskell“ bezeichnet. Die Entwicklung des Projekts ist seit Erscheinen des Papers vorangeschritten: unter anderem hat sich die Bezeichnung des Repositories geändert zu „distributed-process“. Zitat der Projekt-Readme: „This repository holds an implementation of Cloud Haskell“. Dieser Begriff wird nun also als Name des Gesamtkonzeptes (abgegrenzt gegenüber konkreten Implementierungen) verwendet. Ich werde in dieser Arbeit - parallel zum Paper - den Begriff allgemein verwenden. Die entsprechenden aktuellen Links sind: 1. die Frontpage: http://haskell-distributed.github.com/ 2. die Repositories: https://github.com/haskell-distributed/distributed-process/ https://github.com/haskell-distributed/distributed-static/ https://github.com/haskell-distributed/distributed-process-simplelocalnet/ 3. und die Hackage-Pakete: http://hackage.haskell.org/package/distributed-process http://hackage.haskell.org/package/distributed-static http://hackage.haskell.org/package/distributed-process-simplelocalnet Die genauen Versionen der Pakete, mit denen ich die Beispiele erstellt habe, sind: • distributed-process: 0.4.1 • distributed-static: 0.2.1.1 • distributed-process-simplelocalnet: 0.2.0.8 Man beachte, dass diese teilweise schon nicht mehr die neuesten Versionen sind. 1 1. Einführung Cloud Haskell hat das Ziel, für Haskell ein hochsprachliches Interface für die Programmierung verteilter Systeme bereitzustellen. Mit einem verteilten System ist hier eine große Anzahl von Prozessoren gemeint, welche keinen Arbeitsspeicher teilen und welche durch ein Netzwerk verbunden sind. Es soll zum Beispiel möglich sein, ein Programm zu implementieren, welches von einem Rechner aus eine rechenintensive Aufgabe löst und dabei Teilaufgaben an andere Rechner verteilt, welche parallel bearbeitet werden. Hochsprachlich bedeutet unter Anderem, dass die Kommunikation zwischen den Systemen typsicher (in Haskells Typsystem) ist und der Programmierer sich nicht etwa auf die Ebene konkreter TCP-Verbindungen herabbegeben muss. Das Programmiermodell von Cloud Haskell lehnt sich stark an die Programmiersprache Erlang [AVWW93] an. Dies bedeutet, dass Kommunikation zwischen den Komponenten nicht über irgendeine Form von geteilten Speicher stattfinden, sondern alleinig durch das (explizite) Senden von Nachrichten an andere Komponenten. Des Weiteren existiert keine implizite Abhängigkeit zwischen den Komponenten, d.h. falls auf einem System ein Fehler auftritt, beeinflusst dies nicht direkt andere Systeme1 . Nach [EBPJ11] ist ein System mit geteilten Speicher ungeeignet für Cloud-Programmierung. Bezüglich der Performanz sei der Hauptkostenpunkt die Datenübertragung welche folglich explizit sein sollte. Dies ist der Grund dafür, dass Cloud Haskell das Konzept der Nachrichtenübertragung als alleiniges Mittel der Kommunikation zwischen Prozessen übernimmt. Vorläufer zu diesem Aspekt sind Erlang für zuverlässige Echtzeitanwendungen und MPI [GLS94] für Hochperformanz-Berechnungen. Cloud Haskell ist implementiert als eine DSL für Haskell2 . Das zentrale Problem, das bei der Implementierung dieser Art von verteilter Programmierung in Haskell auftritt, ist die Delegierung von Code (Funktionsaufrufen) an entfernte Systeme. Da sich Funktionen in Haskell nativ nicht serialisieren lassen, ist es einer der zentralen Errungenschaften von Cloud Haskell, eine Möglichkeit dafür aufzuzeigen und zu implementieren. Ohne den Compiler von Haskell zu verändern wird ein Interface bereitgestellt, das in der Bedienung durch den Programmierer sicher und angenehm ist. Eine andere wichtige Eigenschaft verteilter Systeme sollte es sein, Fehler in den Teilkomponenten zu verkraften. Denn sogar in dem (unwahrscheinlichen) Fall, dass das Programm frei von jeglichen Bugs ist, sind bei einer genügenden Anzahl von beteiligten 1 Aber das aktive verteilte Programm wird natürlich generell auf den Ausfall eines Systems reagieren müssen. 2 Genauer gesagt eine embedded DSL, also eine DSL, die Haskell selbst als Umgebung nutzt, um dem Programmierer Abstraktionen bereitzustellen. 2 1. Einführung Knoten Hardwareprobleme unvermeidbar. Auch in diesem Punkt wird Erlangs bewährtes Konzept übernommen, siehe dazu den Abschnitt 4.3. Da sich der Aufbau von Cloud Haskell sehr stark an Erlang anlehnt, möchte ich diese Sprache und ihre Konstrukte zur nebenläufigen Programmierung im Folgenden beschreiben, um dann eine Abgrenzung zu Haskell machen zu können und um darzustellen, welche Herausforderungen sich ergeben. Danach werde ich auf Cloud Haskell selbst eingehen. 3 2. Erlang Zunächst einige Stichpunkte zur Sprache: Erlang .. • ist eine funktionale Sprache mit strikter Auswertung • ist schwach/dynamisch typisiert • hat garbage-collection • übernimmt syntaktisch viel von Prolog • ist aber in keiner Weise eine logische Programmiersprache Ich werde hier nicht die gesamte Syntax von Erlang versuchen einzuführen; aber als kleines Codebeispiel die Fakultätsfunktion einmal in Haskell und daneben in Erlang: Haskell fac :: Int -> Int fac 0 = 1 fac n = n * fac (n -1) Erlang % dynamisch getypt fac (0) -> 1; fac ( N ) -> N * fac (N -1). Das Kernkonstrukt für nebenläufige Programmierung in Erlang sind Prozesse. Diese sind in Erlang leichtgewichtig, was unter Anderem durch einen eigenen Scheduler ermöglicht wird. Prozesse bestehen aus einem Term, der gerade ausgewertet wird (man könnte auch Stack sagen), aus einem Prozess-Identifier und einer Mailbox. Man kann ohne Probleme Tausende starten. Die Syntax1 dafür ist spawn ( Modulname , Funktionsname , ListeDerParameter ) welche als Rückgabewert einen Prozess-Identifier liefert. Um also beispielsweise einen sequentiellen Aufruf example1 . worker ( fac , 5 , OwnPID , worker1 ) als eigenen Prozess auszuführen, müsste man Folgendes schreiben: OtherPid = spawn ( example1 , worker , [ fac , 5 , OwnPID , worker1 ]) Identifier und Mailbox haben beide den Zweck, die Kommunikation zwischen Prozessen zu ermöglichen: Der Prozess-Identifier ist eindeutig einem Prozess zuzuordnen und dient (vorwiegend) als Adresse beim Senden von Nachrichten. Diesem Zweck ist in Erlang ein eigener Operator gewidmet, !, „bang“ genannt. Auf dessen linker Seite steht die Adresse (eine Prozess-Identifier), auf der rechten Seite die Nachricht (ein beliebiger Wert): 1 Es gibt eigentlich mehrere Varianten der spawn-Funktion, aber dies ist die allgemeinste. 4 2. Erlang OtherPid ! 42 Die Mailbox enthält die empfangenen Nachrichten und kann nur genau von dem Prozess ausgelesen werden, dem sie zugeordnet ist. Es gibt also keinen „Prozess-freien“ Mailboxen. Das entsprechende Konstrukt ist das receive: receive Pattern1 [ when GuardSeq1 ] -> Body1 ; ...; PatternN [ when GuardSeqN ] -> BodyN end Es können also Werte in der Mailbox durch Pattern-Matching (und Guards) gesucht und ausgewählt werden; bei Erfolg wird dann der entsprechende Rumpf ausgewertet. Die Kommunikation ist N-zu-1, da verschiedene (beliebige2 ) Prozesse an die gleiche Mailbox senden, aber nur genau ein Prozess diese ausliest. 2.1. Beispielprogramm in Erlang Das folgende Programm soll zeigen, wie in Erlang Berechnungen nebenläufig ausgeführt werden können. Dazu werden mit spawn Prozesse gestartet, welche das Ergebnis der Berechnung an den aufrufenden Prozess per Nachricht melden. - module ( example1 ). - export ( [ worker /4 , master /0 ] ). % worker ( to be started in its on process ) that takes as % parameters a function , a parameter , a processID and % an identifier for the worker , so the master can % identify our result . % It applies the function to the parameter and sends % the result to the other process . worker (F , X , ResultPID , WorkerID ) -> Result = F ( X ) , ResultPID ! { WorkerID , Result } . % factorial fac (0) -> 1; fac ( N ) -> N * fac (N -1). % master that starts two workers to calculate two factorials . master () -> % use erlang ’s self function to get the ID of the current % process , so that the workers have an address when sending 2 mit der Einschränkung, dass der Prozess-Identifier bekannt ist 5 2. Erlang % the result . OwnPID = self () , % start a new process to calculate fac (5) spawn ( example1 , worker , [ fac , 5 , OwnPID , worker1 ] ) , % and another process to calculate fac (3) spawn ( example1 , worker , [ fac , 3 , OwnPID , worker2 ] ) , % sequentially , receive ( from our mailbox ) the two messages % the workers ( will ) send us . receive % worker1 is not a variable : it is an atom . This % pattern matches only on messages where the first % element is worker1 , i . e .: the result of fac (5). { worker1 , Result1 } -> receive % same as above : matches on the result of fac (3). { worker2 , Result2 } -> % we got two results , print them . putStrLn ( show ( Result1 ) ++ " ␣ " ++ show ( Result2 )) end end . Was zeigt dieses Beispiel? • Es ist keine explizite Synchronisation nötig: Die Mailbox ist threadsafe. Es sei des Weiteren angemerkt, dass mit dem gegebenen Pattern-Matching es kein Problem ist, falls der zweite Worker vor dem ersten seine Nachricht sendet - Nachrichten müssen nicht „in order“ aus der Mailbox genommen werden3 . • Parallel zu der leichtgewichtigen Implementierung für Prozesse ist auch der syntaktische Aufwand für das Erzeugen von Prozessen und das Senden und Empfangen von Nachrichten gering. • Es ist möglich, Funktionen als Parameter an andere Prozesse zu senden. Dies mag hier noch wenig überraschen; allerdings wäre es durchaus auch möglich, die Worker auf fremden Systemen (d.h. exakter: auf fremden Erlang-Knoten) zu starten, ohne dass irgendwelche Probleme aufträten. 2.2. Knoten Ein Begriff/Konzept aus Erlang sollte noch angesprochen werden: Der Knoten (engl. node). Knoten stellen eine Abstraktionsebene des verteilten Systems oberhalb der Prozesse dar. Ein Knoten arbeitet auf einem (physikalischen) System (allerdings können durchaus 3 Die genaue Semantik des receive ist nicht ganz einfach (vorallem wenn auch Pattern-Matching benutzt wird), bei Interesse sei auf die Erlang-Dokumentation [erlb] verwiesen. 6 2. Erlang mehrere Knoten auf dem selben laufen). Er dient als Umgebung für die Prozesse. Jeder Prozess läuft also auf einem Knoten. Knoten teilen bestimmte Resourcen und können so bestimmte Aufgaben effizienter lösen, beispielsweise die Kommunikation zwischen Prozessen auf dem gleichen Knoten. 2.3. Robuste Programmierung Eine der Möglichkeiten, mit Fehlschlägen (abstürzenden Prozessen, ausfallenden Knoten, Verbindungsproblemen) umzugehen, ist das Linken von Prozessen. Prozesse werden damit (bidirektional) verbunden. Gelinkte Prozesse terminieren, falls die andere Seite terminiert oder nicht mehr erreichbar ist. Außerdem können Prozesse sich gegenseitig überwachen. Bei Erreignissen, die den anderen Prozess betreffen, bekommt der überwachende Prozess dann bestimmte Nachrichten gesendet. 7 3. Herausforderungen Die Zielsetzung von Cloud Haskell ist es, Konstrukte in Haskell zur Verfügung zu stellen, die möglichst viel von dem unterstützen, was in Erlang möglich ist. Als zentrale Schwierigkeiten dieses Vorhabens lassen sich zwei Punkte identifizieren: • Haskell ist streng getypt - schon die (ungetypten) Mailboxen scheinen damit inkompatibel zu sein; des Weiteren z.B. das Senden von Nachrichten ein Seiteneffekt, der als solcher in Haskell speziell gehandhabt werden muss. • Das Übermitteln (oder auch nur die Serialisierung) beliebiger Werte ist in Haskell nicht möglich. Als Stichwort seien hier Funktionen genannt: Solche an andere Prozesse senden zu können ist wichtiger Bestandteil des Nachrichtensystems von Erlang. In [EBPJ11] werden folgende Lösungen für diese Schwierigkeiten vorgestellt: • Die einstellige Typklasse Serializable vereinigt die Typklassen Binary und Typeable1 . Werte von Binary-Instanzen lassen sich in einen ByteString konvertieren, welcher „ungetypt“ an andere Prozesse versendet (und dort wieder dekodiert) werden kann. Die Typklasse Typeable bietet eine Form von Reflektion von Typen. Sie enthält genau eine Methode: typeOf :: a -> TypeRep. Der Parameter ist dabei nur zur Festlegung des Typs vorhanden; der Wert soll nicht benutzt werden (er dürfte also auch undefiniert sein). Der Rückgabewert ist eine Repräsentation des Typs. Dieser kann an andere Prozesse gesendet werden2 und erlaubt es dann auf Empfangsseite, den Typ der einzelnen Objekte zu identifizieren, wodurch auf bestimmte Typen gematcht werden kann. class ( Binary a , Typeable a ) = > Serializable a instance ( Binary a , Typeable a ) = > Serializable a • Zusätzlich können getypte Nachrichtenkanäle (Channel s) angelegt werden • Es wird eine Monade bereitgestellt (Process3 ), auf der alle Operationen (mit Seiteneffekten) des Frameworks arbeiten. instance Monad Process instance MonadIO Process 1 Binary und Typeable sind Teil der Haskell-Standardbibliothek. TypeRep ist nicht Binary-Instanz, er erlaubt aber die Zuordnung zu einem eindeutigen 128-Bit-Wert. 3 in [EBPJ11] noch ProzessM 2 8 3. Herausforderungen • Es wird (mit einigen Tricks) ermöglicht, Funktionen (inklusive der Closure, also den Werten der in dem Funktionsausdruck freien Variablen) zu versenden. Dieses Thema benötigt eine eingehendere Erklärung und wird deshalb in dem Kapitel 5 genauer betrachtet. Es sei angemerkt, dass bei dem Übertragen von Funktionen nicht ganz die Fähigkeiten von Erlang erreicht werden. Der Grund ist alleinig, dass Erlang die Übertragung von Funktionen nativ unterstützt und Haskell nicht. In Cloud Haskell wird in keiner Form Funktionscode übertragen, sondern alleinig ein Identifier, der auf der Gegenseite dem dortigen Code zugeordnet wird. Die logische Folgerung ist, dass jede zu versendende Funktion schon auf dem entfernten Knoten vorhanden sein muss. Dies bedeutet (im Moment), dass auf den miteinander kommunizierenden Knoten das gleiche Programm ausgeführt wird, allerdings mit unterschiedlichen Parametern (durch welche der Anwender beispielsweise den Knoten Rollen zuordnen kann)4 . Diese Einschränkung existiert so in Erlang nicht - es ist möglich, Funktionen an Knoten zu senden, die diese Funktion noch überhaupt nicht „kannten“. 3.1. Vorausschau In den folgenden Abschnitten wird nun zunächst das Interface von Cloud Haskell genauer eingeführt: • die Handhabung von Knoten und Prozessen (und deren Ids) • robuste Programmierung: Linken und Monitoring • das Senden und Empfangen von „ungetypten“ Nachrichten • getypte Channels Darauf folgend wird die Vorgehensweise für die Übermittlung von Funktionen bzw. den Closures dargestellt. 4 Das letzte Beispiel im Anhang zeigt, wie ein Programm mit Rollen eines Master und mehrerer Slaves aussehen könnte. 9 4. Das Interface von Cloud Haskell 4.1. Knoten und Prozesse Knoten stellen die äußerste Strukturierungseinheit des Systems; sie sind lokal zu einem physikalischen System. Auf einem Knoten läuft eine beliebige Anzahl von Prozessen. Knoten und Prozesse besitzen Ids, die unter Anderem zur Nachrichtenübertragung genutzt werden können. Cloud Haskell erlaubt es innerhalb eines Prozesses, noch nebenläufige Programmierung in Haskell zu betreiben (Concurrent Haskell, mit forkIO usw.). Die Kommunikation zwischen den einzelnen Haskell-Threads kann dann auch über Shared-Memory-Konstrukte wie MVars erfolgen. Insofern können traditionelle verteilte Programmierung in Haskell Prozess A Prozess B Prozess C Prozess D Nachrichte Versand Mailbox Prozess E Knoten Thread 2 Thread 1 Prozess Thread 3 Abbildung 4.1.: Die Prozesse A und B teilen keinen Speicher, obwohl sie auf dem gleichen System (und Knoten) operieren. Prozess E nutzt intern Concurrent Haskell; die andere Threads können dabei allerdings keine Nachrichten senden oder empfangen. 10 4. Das Interface von Cloud Haskell und Cloud Haskell zusammen genutzt werden. Für die Kommunikation zwischen Prozessen sollen ausschließlich Nachrichten genutzt werden. Deshalb wird (durch passende Typisierung) verhindert, dass Shared-MemoryKonstrukte zwischen Prozessen ausgetauscht werden (auch wenn diese Prozesse beispielsweise auf dem gleichen Knoten laufen). Genauer gesagt wird dies gesteuert durch die Serializable-Typklasse, von welcher z.B. MVar und ThreadIds keine Instanzen sind. 4.2. Das Starten von Prozessen Es gibt zwei leicht unterschiedliche Szenarien, in denen der Programmierer neue Prozesse anlegen möchte: 1. Beim Starten des Programms, wenn man beispielsweise einen Knoten angelegt hat, aber noch keinen Prozess. In diesem Fall befindet man sich also in der IO-Monade und möchte einen Prozess anlegen (und dann in der Process-Monade weiterarbeiten). 2. Man ist schon in der Process-Monade, und möchte weitere Prozesse starten. Dieser Fall entspricht dem spawn in Erlang. Man ist hier mit hoher Wahrscheinlichkeit interessiert an der ProcessId des neu gestarteten Prozesses, um beispielsweise Nachrichten an den neuen Prozess senden zu können. 4.2.1. runProcess Für den ersten Fall stellt das Interface von Cloud Haskell mehrere Varianten zur Verfügung, von denen ich hier nur eine vorstellen möchte: runProcess :: LocalNode -> Process () -> IO () Eine LocalNode erhält man dabei mit der Funktion newLocalNode1 . Diese Funktion erlaubt es sozusagen, aus der IO-Monade in die Process-Monade zu wechseln. runProcess ist nicht asynchron; terminiert also wenn der Prozess terminiert. Die Einschränkung dieser Funktion ist, dass sie nur lokal anwendbar ist - LocalNode ist nicht serialisierbar (im Gegensatz zu NodeId). Dafür ist der Typ aber auch entsprechend einfach. 4.2.2. spawn Im Gegensatz zu runProcess soll es bei spawn nun möglich sein, Prozesse auf beliebigen (also auch entfernten) Knoten zu starten. Deshalb sollte ein Parameter von spawn nun eine NodeId sein. Der andere Parameter sollte angeben, was ausgeführt wird also Code in der Process-Monade. Der Rückgabetyp ist dann eine ProcessId, bzw. Process ProcessId, da spawn in der Process-Monade arbeiten soll. 1 Dabei braucht man wieder zwei Parameter: ein backend und eine RemoteTable. Dies führt aber zu sehr in Implementierungsdetails; An den ersten beiden Beispiele im Anhang kann man sich die Nutzung ein wenig klar machen. 11 4. Das Interface von Cloud Haskell Ein erster Versuch für Signatur wäre also -- noch falsch ! spawn :: NodeId -> Process () -> Process ProcessId Das Problem hier ist, dass der auszuführende Code ggf. übertragen werden muss und wie schon angesprochen ist dies nur mit Tricks möglich, in diesem Fall mit dem Typkonstruktor Closure. Die korrekte Signatur ist spawn :: NodeId -> Closure ( Process ()) -> Process ProcessId Closure a beschreibt also eine übertragbare Variante des Typs a. Eine Closure enthält in irgendeiner Form also einen Wert (eine Funktion) und dessen Umgebung - genaueres dazu folgt später. 4.3. Robuste Programmierung Auch in Haskell treten Programmierfehler auf und gerade ein verteiltes System muss auf solche Fehler geeignet reagieren können (und nicht etwa Fehler einzelner Systeme zum Absturz des gesamten verteilten Programms werden lassen). Cloud Haskell stellt mehrere Methoden bereit, um auf gewollte oder ungewollte Terminierung anderer Prozesse zu reagieren. Die wichtigsten zwei2 sind: link :: ProcessId -> Process () monitor :: ProcessId -> Process MonitorRef Beide Methoden starten eine Überwachung des anderen Prozesses durch den aktuellen Prozess; sie blockieren also nicht. Die Verbindung ist in beiden Fällen unidirektional3 . Der Unterschied der zwei Methoden ist die Art, wie auf ein Erreignis (Terminierung, Absturz oder Verbindungsproblem) reagiert wird: • Bei link wird im überwachenden Prozess eine Exception geworfen. link ist so gedacht, dass die Exception nicht gefangen wird, sondern zur Terminierung des Prozesses führt - Terminierung wird also propagiert. • dagegen ist monitor mehr für den Fall gedacht, wenn spezifischer reagiert werden soll: Es wird eine Nachricht an die Mailbox des Prozesses gesendet, die wie andere Nachrichten empfangen werden kann. Die Nachricht enthält in diesem Fall dann zusätzliche Informationen wie z.B. den genauen Grund. Der Rückgabewert von monitor dient als Handle für das Beenden einer Überwachung; auf diese weiteren Methoden werde ich hier aber nicht eingehen. 2 An diesem Punkt gibt es leichte Abweichungen zwischen [EBPJ11] und dem aktuellen Interface von Cloud Haskell; dort werden zwei Methoden angeführt, welche zwar ähnlich zu den hier vorgestellten Methoden, aber doch semantisch leicht unterschiedlich sind. 3 dies ist ein Unterschied zu den gleichnamigen Methoden in Erlang: link ist dort bidirektional, siehe [erla] 12 4. Das Interface von Cloud Haskell 4.4. Ungetypte Nachrichten Jeder Prozess hat einen ihm zugeordneten, ungetypten Channel, der genau der Mailbox aus Erlang entspricht. Nachrichten sind asynchron, zuverlässig und gepuffert. Die entsprechenden Operationen arbeiten auf der Process-Monade; d.h. sie können auch nur in dieser Monade genutzt werden. Die Signatur für das Senden und Empfangen ist: send :: Serializable a = > ProcessId -> a -> Process () expect :: Serializable a = > Process a Beim Senden wird das Objekt serialisiert und zusammen mit der Repräsentation seines Typs versendet. Beim Empfangen mit expect wird solange blockiert, bis eine der Nachrichten in der Queue den erwarteten (ggf. inferierten) Typ hat. Dann wird diese Nachricht aus der Queue entfernt und das entsprechende Object deserialisiert und zurückgegeben. Wie in Erlang muss die zurückgegebene Nachricht nicht die erste sein; es ist also nicht zwingend, in Sende-Reihenfolge auszulesen. Als Beispiel betrachten wir die folgende einfache Kommunikation: Prozess 1 sendet zwei Nachrichten an Prozess 2: -- p ist die ProzessId von Prozess 2. send p ( " hello " :: String ) send p (3:: Int ) Und Prozess 2 empfängt diese nun (in umgekehrter Reihenfolge) und gibt beide aus: ( i :: Int ) <- expect ( s :: String ) <- expect liftIO $ print i -- liftIO , da wir uns liftIO $ print s -- in der Process - Monade befinden . Die Ausgabe ist 3 und "hello". Dieses Beispiel soll zum einen das Empfangen in umgekehrter Reihenfolge veranschaulichen, zum anderen die „Ungetyptheit“: Es wurde hier zum besseren Verständnis die Typen der Nachrichten mit angegeben, aber dies wäre natürlich wie gewöhnlich in Haskell nicht nötig (solange die Typen inferiert werden können). Im Anhang findet sich das komplette Programm zu den Codeauschnitten oben. 4.5. Matchen von Nachrichten - feiner gesteuertes Empfangen In dem Beispiel oben waren die Typen der zu empfangenden Nachrichten (Int und String) fest gesetzt. In echten Programmen wird man allerdings oft mehr wollen: Gleichzeitig mehrere unterschiedliche Nachrichten empfangen. Mit den bis hierhin vorgestellten Methoden ist die Granularität des Empfanges aber gleich mit den Typen der Nachrichten: • Gleichzeitig kann nur genau ein Typ von Nachricht empfangen werden. • Es ist nicht möglich, nur bestimmte Nachrichten eines Typs zu empfangen. 13 4. Das Interface von Cloud Haskell Diese Einschränkungen werden in bestimmten Situationen unschön sein; deshalb bietet Cloud Haskell zusätzliche Methoden, die in diese beiden Richtungen mehr Flexibilität bringen. 4.5.1. Gleichzeitiges Empfangen mehrere Typen von Nachrichten Als erstes wird die Funktion match eingeführt: match :: Serializable a = > ( a -> Process b ) -> Match b Diese tut nicht mehr, als den Code, der beim Empfangen von einer Nachricht des Typs a ausgeführt werden soll, zu verpacken. Das Ergebnis ist ein neuer Datentyp4 , der als allgemeines Interface für Matching verwendet wird. Die Funktion receiveWait erlaubt es nun, mehrere Matches zu kombinieren: receiveWait :: [ Match b ] -> Process b Dabei werden die Matches in der gegebenen Reihenfolge getestet und der erste passende wird ausgeführt (durch den Parameter a -> Process b). Solange keine Nachricht passt, blockiert die Methode. Das „einfache“ expect lässt sich - nebenbei bemerkt - durch diese beiden Funktionen ausdrücken: expect :: Serializable a = > Process a expect = receiveWait [ match return ] 4.5.2. Bedingtes Empfangen von Nachrichten eines Typs Die Methode matchIf besitzt im Vergleich zu match ein zusätzliches Prädikat als Parameter, durch welches nur bestimmte Nachrichten eines Typs empfangen werden können. matchIf :: Serializable a = > ( a -> Bool ) -> ( a -> Process b ) -> Match b Zurückgegeben wird ein Match, der sich wie gehabt mit receiveWait nutzen (und mit anderen Matches kombinieren) lässt. 4.5.3. Empfangen mit Timeout Die Funktion receiveWait wartet unendlich auf passende Nachrichten. In bestimmten Situationen möchte man als Programmierer aber ein Maximum für die Wartezeit angeben. Deshalb bietet Cloud Haskell eine entsprechende Variante5 : receiveTimeout :: Int -> [ Match b ] -> Process ( Maybe b ) Der erste Parameter ist dabei das Timeout in Millisekunden. Durch receiveTimeout lässt sich mit einem Timeout von 0 natürlich auch der Effekt eines „tryReceive“, also einer nicht-blockierenden Variante, erreichen. 4 In [EBPJ11] war der entsprechende Typkonstruktor MatchM zweistellig und Monadeninstanz; hier ist das aktuelle Interface also etwas einfacher geworden. 5 Das parallele Konstrukt in Erlang ist receive ... after x -> ... end 14 4. Das Interface von Cloud Haskell 4.6. Getypte Channel Wenn schon ein strenges Typsystem zur Verfügung steht, wäre es unschön, nur ungetypte Kommunikation in Form der Mailboxen zu ermöglichen. Daher bietet Cloud Haskell die Möglichkeit, Channels anzulegen, die fest typisiert sind: newChan :: Serializable a = > Process ( SendPort a , ReceivePort a ) sendChan :: Serializable a = > SendPort a -> a -> Process () receiveChan :: Serializable a = > ReceivePort a -> Process a Getypte Channels behalten die Form der N-zu-1-Kommunikation bei. Dies ist realisiert dadurch, dass SendPorts serialisierbar sind und ReceivePorts nicht. Folglich kann ein ReceivePort nur von dem Prozess genutzt werden, in welchem der Channel angelegt worden ist. Es gibt des Weiteren Funktionen, um Channels des gleichen Typs zu vereinen: mergePortsBiased :: Serializable a [ ReceivePort mergePortsRR :: Serializable a [ ReceivePort => a ] -> Process ( ReceivePort a ) => a ] -> Process ( ReceivePort a ) Die beiden Varianten unterscheiden sich dabei im internen Scheduling: Bei der ersteren wird bei jedem receiveChan die Liste der Ports in Reihenfolge abgefragt, was dazu führen kann, dass nur Nachrichten aus dem ersten Port benutzt werden (falls dieser Port andauernd Nachrichten enthält). mergePortsRR ist „gerechter“: Durch ein Robin-RoundVerfahren wird ausgeschlossen, dass einzelne Ports nie gelesen werden. 15 5. Closures Bisher können wir bestimmte Arten von Werten, wie z.B. Funktionen oder Prozesse, nicht serialisieren. Betrachte als Beispiel folgende Moduldefinition: module M1 where f :: Int -> Int f x = x + 1 Angenommen, es soll die Funktion f übertragen werden. Wie schon anfangs verraten ist die Grundidee, intern den String „M1.f“ zu übertragen. Auf Empfangsseite kann dann mit Hilfe eines Wörterbuchs, d.h. einer Zuordnung von Strings zu Funktionen (bzw. zu beliebigen Werten), dieser String zugeordnet werden zu der dortigen Funktion. Klappt dies für beliebige Funktionen? Die Antwort ist Nein: p1 :: Process () p1 = do ( x :: Int ) <- expect let g y = x + y send p2 g -- nur um die Absicht klar zu machen , -- Typmaessig passt das so ja nicht . Bei dem Versuch, die Funktion g zu übertragen, tritt nämlich folgendes Problem auf: Damit g in das Wörterbuch hinzufügt werden kann, muss die Funktion auf Top-Level definiert sein. g kann nicht einfach auf Top-Level-Ebene verschoben werden, da die Variable x dann frei und ungebunden wäre. Und wenn wir g dabei entsprechend abändern, um die Abhängigkeit explizit zu machen? g x y = x + y p1 = do ( x :: Int ) <- expect send p2 ( g x ) Dadurch haben wir nichts gewonnen, denn jetzt müsste nicht mehr g übertragen werden, sondern ein Ausdruck (die partielle Applikation von g und x ). Das Problem ist in beiden Fällen dasselbe: Es sind die freien (und nicht auf Top-Level gebundenen) Variablen innerhalb des Ausdrucks, der übertragen werden soll: g y = x + y -- x ist frei ! (g x) -- g und x sind frei , -- aber g ist auf Top - Level gebunden 16 5. Closures Dem entsprechend sind es gerade die Werte der freien Variablen, die auf Empfangsseite fehlen, wenn ausschließlich g übertragen wird. Die triviale Konsequenz ist es, diese Werte hinzuzunehmen - unsere Überlegung ist bei Closures angelangt. Closures sind nichts anderes als die Kombination der Repräsentationen • eines (Funktions-)Ausdrucks, und • der Werte der freien Variablen innerhalb des Ausdrucks (bezeichnet als „Umgebung“). Unter Nutzung von Closures lässt sich nun der Ausdruck oben übertragen: Für die Funktion g wird ein Identifikator übertragen, das auf der Gegenseite zugeordnet werden kann; als Umgebung wird der Wert von x übertragen. Wichtig zu beachten ist, dass alle Werte der Umgebung bei dieser Vorgehensweise serialisierbar sein müssen. Soweit zu der Überlegung, was grundsätzlich möglich ist. Es sind nun (in Richtung auf die tatsächliche Implementierung) zunächst zwei Typkonstruktoren, Static und Closure, einzuführen. 5.1. Static Der erste, einstellige Typkonstruktur, Static, dient zur Kennzeichnung von Identifikatoren für Werte (etwas in der Art vov „M1.f“), in Abgrenzung zu „echten“ Werten. Ein solcher Identifikator für einen Wert vom Typ a hat den Typ Static a. Wichtige Eigenschaft des Typs Static a ist, dass er serialisierbar ist, also dass er Instanz der Typklasse Serializable ist; und dies unabhängig von a: Es geht ja allgemein darum, bestimmte Ausdrücke, die von sich aus nicht Serializable sind, doch übertragbar zu machen. Eine Einschränkung wird allerdings doch gemacht: a muss Instanz von Typeable sein. Dadurch können bestimmte Typfehler im Zusammenspiel mit Static erkannt werden1 . Um dies noch etwas konkreter zu machen: • Der Typ a kann eine Funktion sein, z.B. a = b → c • Der Typ a kann ein Datenkonstruktor sein, z.B. a = [Int] • Der Typ a kann sogar selbst Serializable sein. Dies macht z.B. Sinn, wenn der Wert groß (oder sogar unendlich) ist. Wenn dieser Wert auf der Empfangsseite eigentlich schon vorliegt, ist es u.U. effizienter, eine Referenz zu übertragen. Die nächste Frage, die sich stellt, ist, welche Ausdrücke sich in einen Static verpacken lassen. Wie oben erkannt, hängt dies von den freien Variablen des Ausdrucks ab. Es lassen sich (mindestens) Ausdrücke, die ausschließlich aus einer Variablen bestehen, welche auf Top-Level gebunden ist, in Static verpacken. Aber es ist durchaus möglich, bestimmte komplexere Ausdrücke zuzulassen: Wenn alle freien Variablen des Ausdrucks 1 allerdings erst zur Laufzeit, siehe den Abschnitt Dynamic Type Checking auf [chs]. 17 5. Closures auf Top-Level gebunden sind, sollte es prinzipiell möglich sein, den gesamten Ausdruck zu verpacken. Die konkreten Methoden, die Cloud Haskell zur Verfügung stellt, um Static-Werte zu erzeugen, werden in Abschnitt 5.4 vorgestellt. 5.2. Closure Eine Closure besteht aus einem Wert und einer Umgebung. Die offene Frage ist, welche Typen dies genau sind bzw. welche Signaturen die entsprechenden Methoden des Interfaces haben. Zunächst können wir den einfachen Fall betrachten, dass die Umgebung leer ist. Dann besteht die Closure nur aus einem Wert, welcher vom Typ Static a ist für einen Typ a. Dieser Typ macht auch gerade den Typ der Closure aus - somit wäre Closure ein einstelliger Typkonstruktor. data Closure a -- Methode , um Closures zu erzeugen , -- deren Umgebung leer ist staticClosure :: Typeable a = > Static a -> Closure a Tatsächlich entspricht das schon der wirklichen Closure-Deklaration; auch die Signatur der Methode staticClosure ist so dem echten Interface entnommen. Betrachten wir nun den Fall, dass die Umgebung nicht leer ist. Als erste Vereinfachung ist es möglich, das Problem einzuschränken auf den Fall, dass die Umgebung nur genau eine Bindung enthält. Notfalls ist dieser eine Wert ein Tupel von mehreren anderen Werten. Dann könnte eine Methode zum Erzeugen vom Closures so aussehen: -- noch falsch -- Methode zur Erzeugung einer Closure closure :: Serializable env = > Static ( env -> a ) -> env -> Closure a Die Methode ist also polymorph im Typ env. Es mag hier die Idee auftreten, den Typkonstruktor Closure zweistellig zu machen. Dies wäre aber nicht wirklich das, was wir wollen. Betrachte dazu ein altes Beispiel: spawn :: NodeId -> Closure ( Process ()) -> Process ProcessId Der Typ der Closure ist ein Prozess. Auf Empfangsseite werden wir die Closure entpacken, und sind dann auch nur noch an dem Prozess interessiert. Der Typ der Umgebung sollte also explizit nicht in die den Typen einer Closure einfließen. Der Typ env darf aber auch nicht universell sein, da er dadurch nicht mehr beim Serialisieren und Deserialisieren der Closure bekannt wäre. Es wäre also gar nicht mehr möglich, die Serializable-Instanz für Closure a zu schreiben. Die Lösung besteht in einer weiteren Vereinfachung: Die Serialisierung der Umgebung wird vorgezogen. Serialisierung 18 5. Closures ist nichts anderes als die Umwandlung in einen festen Typ: ByteString. Die Signatur für das Anlegen von Closures mit Umgebung ist also folgende: closure :: Static ( ByteString -> a ) -> ByteString -> Closure a Durch diese Vereinfachung ist der Typ einer Closure hinreichend einfach. Intern besteht eine Closure aus genau diesen beiden Werten. Das Unschöne ist allerdings, dass der Programmierer für alle Arten von Umgebungen, die er in eine Closure verpacken möchte, die entsprechende Funktion zum Auslesen des ByteStrings definieren müsste (und genauso müsste er beim Verpacken die Umwandlung in den ByteString ausführen). Daher bietet Cloud Haskell bestimmte Methoden (implementiert in Template Haskell), die automatisch solche Funktionen erzeugen. Dieses wird in Kapitel 5.4 vorgestellt. 5.3. Das Wörterbuch Das Wörterbuch zum Nachschlagen von den serialisierten Static-Werten ist das einzige Konstrukt, was noch fehlt. Der entsprechende Datentyp ist data RemoteTable Dieses Wörterbuch enthält Zuordnungen, sodass sich Static-Werte wieder „entpacken“ lassen. Die entsprechenden Methoden sind: unstatic :: Typeable a = > RemoteTable -> Static a -> Either String a unclosure :: Typeable a = > RemoteTable -> Closure a -> Either String a Dabei benutzt unclosure intern unstatic. Der Rückgabewert ist ein Either, der durch den String einen Fehler kennzeichnen kann. Für diese beiden Methoden existiert jeweils auch noch eine Variante, die in der ProcessMonade arbeiten und das Wörterbuch indirekt aus dem Prozess bezieht: unStatic :: Typeable a = > Static a -> Process a unClosure :: Typeable a = > Closure a -> Process a Der Rückgabetyp ist hier kein Either mehr; im Fehlerfall wird eine Exception geworfen. Wie gleich beschrieben, kann dieses Wörterbuch weitgehend automatisiert angelegt werden. Es ist allerdings auch möglich, das Wörterbuch per Hand zu füllen; insgesamt reichen die bis hierhin eingeführten Konstrukte aus, um in Cloud Haskell Closures zu benutzen. Da aber die Hilfsfunktionen, die durch Template-Haskell geboten werden, sowohl knapperen als auch weniger fehleranfälligen Code erlauben, werde ich mich in der Erklärung auf das Template-Haskell-Interface beschränken. 5.4. Die Template-Haskell Hilfsfunktionen Eine Anmerkung zu Template-Haskell: Zur Nutzung in GHC ist das Compiler-Flag „TemplateHaskell“ notwendig. 19 5. Closures Die Funktion remotable bildet die Basis für die Automatisierung. Sie muss auf ModulEbene genutzt werden; die Parameter sind dann eine Liste von Werten (Funktionen) des aktuellen Moduls, welche umwandelbar in Static-Werte sein sollen. Die Funktionsnamen werden dabei durch ein Apostroph als Präfix escaped, am Besten wird dies durch ein Beispiel klar: { - # LANGUAGE TemplateHaskell # -} module Main where printHello :: Process () printHello = liftIO $ putStrLn $ " Hello " remotable [’printHello ] Die Anwendung von remotable erzeugt bestimmte neue Funktionen und Werte; beispielsweise die Funktionen, welche die Umgebung deserialisieren (diese waren nötig, um die Closures zu erzeugen). Ein anderer wichtiger Wert, der angelegt wird, ist __remoteTable. Dieses ist die automatisiert gefüllte RemoteTable. Um Closures (und die enthaltenen Static-Werte) zu erzeugen, gibt es zwei Varianten2 : • bei leerer Umgebung kann die (schon erwähnte) Methode staticClosure kombiniert werden mit mkStatic. Letzteres ist ein Makro, der durch folgende Syntax benutzt wird: $(mkStatic f). • bei nicht-leerer Umgebung dient der Makro mkClosure. Auch hier ein Beispiel: printHello :: Process () printHello = liftIO ( putStrLn " Hello " ) printInteger :: Int -> Process () printInteger i = liftIO ( putStrLn ( show i )) remotable [’printHello , ’printInteger ] closureList :: [ Closure ( Process ())] closureList = [ staticClosure $ ( mkStatic ’printHello ) , $ ( mkClosure ’printInteger ) 42 ] Diese beiden Closures hätten auch gerade den Typ, der bei spawn verlangt ist, sodass letztlich printHello und (printInteger 42) als eigene Prozesse gestartet werden könnten. Das zweite Beispiel im Anhang ist ein komplettes Programm, in dem verschiedene Arten des Anlegens von Closures benutzt werden. 2 Die genauen Signaturen der Template-Haskell Funktionen stimmten nicht mehr mit den in [EBPJ11] vorgestellten überein; insgesamt scheint es auch jetzt noch häufiger leichte Änderungen zu geben. 20 6. Beispielprogramm nun in Haskell Als Demonstration der Fähigkeit von Cloud Haskell folgt nun eine Haskell-Variante des Beispielprogramms in Erlang aus Kapitel 2.1. Ich habe bei der Übertragung darauf geachtet, möglichst nah an dem Original zu bleiben. Dennoch sind einige Unterschiede unvermeidbar: 1. Der Worker ist nicht ganz so frei in seinem Typ: Der Rückgabewert wird fest auf Int gesetzt. 2. Statt einer Funktion und einem Parameter wird direkt eine Closure übertragen. Dies ist ganz einfach knapper und wäre so in Erlang nicht möglich, da dort nicht unterschieden werden kann zwischen einem Wert und der Closure des Wertes (d.h. man könnte die Auswertung der Funktionsapplikation nicht verhindern). 3. Der Worker nimmt als Parameter ein Tupel, der die eigentlichen drei Werte enthält. Grund dafür ist, dass der mkClosure-Makro genau einen Parameter erwartet. Würde man die Closure per Hand generieren, könnte man so etwas umgehen. 4. Es ist (natürlich) gewisser Overhead nötig, um in die Process-Monade zu gelangen (und die verschiedenen nötigen Imports sind schon weggelassen). 5. Beim receive muss das etwas längere matchIf verwendet werden, da PatternMatching in dieser Form in Cloud Haskell nicht möglich ist. -- worker mit drei Parametern : Ein Closure , die eine -- Berechnung eines Ints enthaelt ; die ProcessId des -- Prozesses der das Ergebnis zugesendet bekommen soll ; und -- ein Int welcher diesen worker identifiziert . worker :: ( Closure Int , ProcessId , Int ) -> Process () worker (f , resultPid , workerId ) = do -- unClosure enthaelt die Funktionsapplikation die in Erlang -- explizit war . result <- unClosure f -- der Typ der Nachricht ist ein Tupel , genauso wie in -- Erlang . send resultPid ( workerId , result ) fac :: Int -> Int fac 0 = 1 fac n = n * fac (n -1) 21 6. Beispielprogramm nun in Haskell remotable [ ’ worker , ’ fac ] main :: IO () main = do -- starte lokalen Knoten und Prozess auf diesem backend <- initializeBackend " localhost " " 0 " rtable node <- newLocalNode backend runProcess node $ do -- beziehe beide Ids ( fuer Knoten und Prozess ) nodeId <- getSelfNode selfPid <- getSelfPid -- starte ersten " worker " , der ( fac 5) berechnet . spawn nodeId ( $ ( mkClosure ’ worker ) ( ( $ ( mkClosure ’ fac ) (5:: Int )) , selfPid , 1:: Int ) ) -- starte ersten " worker " , der ( fac 3) berechnet . spawn nodeId ( $ ( mkClosure ’ worker ) ( ( $ ( mkClosure ’ fac ) (3:: Int )) , selfPid , 2:: Int ) ) -- warte auf beide Ergebnisse . Nutze das bedingte -- Matching , um zuerst das Ergebnis des ersten Workers zu -- empfangen . receiveWait [ matchIf (\( workerId1 :: Int , result1 :: Int ) -> workerId1 == 1) (\( workerId1 :: Int , result1 :: Int ) -> receiveWait [ match (\( _ :: Int , result2 :: Int ) -> liftIO ( putStrLn ( show result1 ++ " ␣ " ++ show result2 ) ) ) 22 6. Beispielprogramm nun in Haskell ] ) ] where rtable = __remoteTable initRemoteTable 23 7. Performanz Als Test für die Performanz von Cloud Haskell diente der k-means-Algorithmus, welcher eine Verallgemeinerung von MapReduce [DG08] darstellt. Als Referenz wurde die Apache Mahout Implementierung genutzt, welche auf dem Hadoop-Framwork aufbaut1 . Die beiden Implementierungen zeigen ungefähr gleiche Performanz - weder Cloud Haskell noch Hadoop kann als allgemein besser bezeichnet werden. Cloud Haskell ist bei geringer Anzahl von verwendeten Knoten langsamer (bei nur einem Knoten etwa um den Faktor 2.5). Während Hadoop ab einer Knotenanzahl von 20 keine Laufzeitverbesserung mehr zeigt, profitiert Cloud Haskell bis hin zu dem gemessenen Maximum mit 80 Knoten von den zusätzlichen Knoten und hat dann auch eine deutlich geringere Laufzeit als Hadoop. 1 die genauen Versuchsparameter entnehme man bitte [EBPJ11] 24 8. Zusammenfassung Cloud Haskell erlaubt es, in Haskell verteilte Programme im Stil von Erlang zu schreiben. Die gesamte Kommunikation wird dabei von Cloud Haskell implementiert und in Form der Nachrichtenübertragung dem Programmierer zur Verfügung gestellt. Bei der Übertragung der Konzepte aus dem dynamisch typisierten Erlang traten zwei zentrale Herausforderungen auf: der Umgang mit Haskells strengen Typsystem und die Übertragung von (Funktions)Closures an andere Knoten. In beiden Punkten wurde eine annehmbare Lösung gefunden und implementiert. Zur Handhabung der Seiteneffekte wurde die Process-Monade eingeführt, in der die meisten der von Cloud Haskell bereitgestellten Funktionen arbeiten. Dadurch kann gleichzeitig auch gewähleistet werden, dass diese Funktionen nicht missbraucht werden - sie sind nur innerhalb der Process-Monade nutzbar. Mit Hilfe der Haskell-Typklassen Binary und Typeable lassen sich dynamisch getypte Nachrichten realisieren. Statt von Funktionen werden Identifikatoren für diese übertragen, die auf Empfangsseite wieder den eigentlichen Werten zugeordnet werden. Dieser Trick hat gewisse Einschränkungen. Closures wurden eingeführt, um Funktionen um eine Umgebung zu erweitern. Die Realisierung des entsprechenden Interfaces nutzt Template Haskell und kommt dann ohne die Modifikation des Haskell-Compilers aus. Die Autoren haben allerdings die Absicht ausgedrückt, den GHC-Compiler leicht zu modifizieren, um beispielsweise die Typklasse Static direkt in die Sprache zu verankern und damit die Implementierung und Nutzung noch einfacher zu gestalten. 25 A. Übersicht über das Cloud Haskell Interface allgemeine Klassen und Instanzen instance Monad Process instance MonadIO Process class ( Binary a , Typeable a ) = > Serializable a Knoten und Prozesse newLocalNode :: Transport -> RemoteTable -> IO LocalNode localNodeId :: LocalNode -> NodeId runProcess :: LocalNode -> Process () -> IO () spawn :: NodeId -> Closure ( Process () ) -> Process ProcessId call :: Serializable a = > Static ( SerializableDict a ) -> NodeId -> Closure ( Process a ) -> Process a terminate :: Process a getSelfNode :: Process NodeId getSelfPid :: Process ProcessId robuste Programmierung link :: ProcessId -> Process () monitor :: ProcessId -> Process MonitorRef ungetypte Nachrichten send :: Serializable a = > ProcessId -> a -> Process () expect :: Serializable a = > Process a match :: Serializable a = > ( a -> Process b ) -> Match b matchif :: Serializable a = > ( a -> Bool ) -> ( a -> Process b ) -> Match b receiveWait :: [ Match b ] -> Process b receiveTimeout :: Int -> [ Match b ] -> Process b getypte Channel newChan :: Serializable a = > Process ( SendPort a , ReceivePort a ) sendChan :: Serializable a = > SendPort a -> a -> Process () receiveChan :: Serializable a = > ReceivePort a -> Process a mergePortsBiased :: Serializable a = > [ ReceivePort a ] -> Process ( ReceivePort a ) mergePortsRR :: Serializable a = > [ ReceivePort a ] -> Process ( ReceivePort a ) 26 A. Übersicht über das Cloud Haskell Interface Static und Closure data Static a instance Typeable1 Static instance Typeable a = > Binary ( Static a ) staticLabel :: String -> Static a staticApply :: Static ( a -> b ) -> Static a -> Static b data Closure a staticClosure :: Typeable a = > Static a -> Closure a closure :: Static ( ByteString -> a ) -> ByteString -> Closure a data RemoteTable initRemoteTable :: RemoteTable unstatic :: Typeable a = > RemoteTable -> Static a -> Either String a unclosure :: Typeable a = > RemoteTable -> Closure a -> Either String a unStatic :: Typeable a = > Static a -> Process a unClosure :: Typeable a = > Closure a -> Process a Template Haskell Magic remotable :: [ Name ] -> Q [ Dec ] mkStatic :: Name -> Q Exp mkClosure :: Name -> Q Exp 27 B. Beispiel für Senden und Empfangen { - # LANGUAGE TemplateHaskell , ScopedTypeVariables # -} { - - Beispiel fuer Cloud Haskell Senden und Empfangen von der ( ungetypten ) Mailbox Die main - Funktion enthaelt einiges an Setup - Code und benutzt Closures ; dies geht inhaltlich ueber die Nachrichten hinaus . Wirklich relevant fuer die N ac hr ic h te nu eb e rt ra g un g sind nur die Funktionen senderInt , senderString und receiveAndPrint . - -} module Main where import System . Environment ( getArgs ) import Control . Distributed . Process ( Process , ProcessId , send , spawn , expect , liftIO , getSelfNode , getSelfPid ) import Control . Distributed . Static ( staticClosure ) import Control . Distributed . Process . Closure ( mkClosure , mkStatic , remotable , functionTDict ) import Control . Distributed . Process . Node ( initRemoteTable , runProcess ) import Control . Distributed . Process . Backend . SimpleLocalnet import Control . Concurrent ( threadDelay ) -- ein ( Worker -) Prozess , der einen Int sendet senderInt :: ProcessId -> Process () senderInt p = send p (3:: Int ) -- ein ( Worker -) Prozess , der einen String sendet senderString :: ProcessId -> Process () senderString p = send p ( " hello " :: String ) remotable [ ’ senderInt , ’ senderString ] -- der interessante Teil des masters : -es werden zwei Werte ( unterschiedlichen Typs ) empfangen und 28 B. Beispiel für Senden und Empfangen -ausgegeben receiveAndPrint :: Process () receiveAndPrint = do -- zwei Werte empfangen ( wahrscheinlich nicht in Sende - Reihenfolge , -aber das hat so oder so keine Auswirkung ) ( i :: Int ) <- expect ( s :: String ) <- expect -- Ausgabe liftIO $ print i liftIO $ print s -- enthaelt viel Setup - Code ; Haupteffekt ist , dass zwei -- Worker - Prozesse gestartet werden ; dann wird die obige Funktion -- receiveAndPrint aufgerufen . main :: IO () main = do -- starte lokalen Knoten und Prozess auf diesem backend <- initializeBackend " localhost " " 0 " rtable node <- newLocalNode backend runProcess node $ do -- beziehe beide Ids ( fuer Knoten und Prozess ) nodeId <- getSelfNode selfPid <- getSelfPid -- starte ersten " worker " ( der einen String sendet ) spawn nodeId ( $ ( mkClosure ’ senderString ) selfPid ) -- eine Verzoegerung , um ein Empfangen in umgekehrter -- Sendereihenfolge zu provozieren . Provozieren ist keine -- Garantie , aber es soll nur dem Leser das Prinzip klar -- gemacht werden . liftIO $ threadDelay 1000000 -- starte zweiten " worker " ( der einen Int sendet ) spawn nodeId ( $ ( mkClosure ’ senderInt ) selfPid ) receiveAndPrint -- tut was der Name vermuten laesst where rtable = __remoteTable initRemoteTable 29 C. Beispiel für Closures { - # LANGUAGE TemplateHaskell , ScopedTypeVariables # -} { - - Beispiel fuer Cloud Haskell Static - Werte und das Anlegen und Benutzen von Closures Dieses Beispiel enthaelt drei Wege , um eine Closure anzulegen : 1. Aus einem ( einfachen ) Static - Wert , also mit einer leeren Umgebung 2. Aus einem Static - Wert , welcher sich als Komposition zweier Static - Werte ergibt ( mit der Funktion staticApply ) 3. Mit einem Static - Wert und einer Umgebung . Um das Beispiel einfach zu halten , ist fuer die Nachrichten dieses Beispiels der Sender gleich dem Empfaenger , d . h . der Prozess sendet sich selbst Nachrichten . Es wuerde sich aber nichts aendern , wenn der Empfaenger ein anderer Prozess waere . - -} module Main where import System . Environment ( getArgs ) import Control . Distributed . Process ( Process , ProcessId , send , spawn , expect , liftIO , getSelfNode , getSelfPid , unClosure ) import Control . Distributed . Static ( Closure , staticClosure , staticApply ) import Control . Distributed . Process . Closure ( mkClosure , mkStatic , remotable , functionTDict ) import Control . Distributed . Process . Node ( initRemoteTable , runProcess ) import Control . Distributed . Process . Backend . SimpleLocalnet plusTwo :: Int -> Int plusTwo = (+) 2 30 C. Beispiel für Closures plus :: Int -> Int -> Int plus = (+) two :: Int two = 2 remotable [ ’ plus , ’two , ’ plusTwo ] sendT hreeClosures :: Process () sendT hreeClosures = do selfPid <- getSelfPid -- Option Nr 1: Closure aus einfachem Static - Wert send selfPid ( staticClosure $ ( mkStatic ’ plusTwo ) ) -- Option Nr 2: Closure aus ( komplexen ) Static - Wert , mit staticApply send selfPid ( staticClosure ( staticApply $ ( mkStatic ’ plus ) $ ( mkStatic ’ two ) ) ) -- Option Nr 3: Closure mit Environment ( als Static + Environment ) send selfPid ( $ ( mkClosure ’ plus ) (3 -1:: Int ) ) main :: IO () main = do backend <- initializeBackend " localhost " " 0 " rtable node <- newLocalNode backend runProcess node $ do sendThreeClosures -- empfange drei Closures ( Int -> Int ) functionClosures <- sequence ( replicate 3 ( expect :: Process ( Closure ( Int -> Int ) ) ) ) -- wende auf alle unClosure an , -- sodass wir nun einen Liste von Funktionen haben . functions <- mapM unClosure functionClosures -- benutze alle Funktionen und gib jeweils das Ergebnis aus . liftIO ( mapM_ ( print . ( $ 40) ) functions ) where rtable = __remoteTable initRemoteTable 31 D. Ein etwas komplexeres Beispiel { - # LANGUAGE TemplateHaskell , DeriveDataTypeable # -} { - - Beispiel fuer etwas komplexeres unter schiedlichen Architektur , fuer bereitstellt . Cloud Haskell Beispiel mit mehreren Knoten ( potentiell auf Systemen ) . Benutzt wird eine Master - Slaves welche Cloud Haskell bestimmte Methoden Verteilt berechnet werden mit diesem Programm Wurzeln ( hui hui ) . Die Slaves melden sich dabei beim Master , wenn sie nichts zu tun haben . Der Master wartet fuer jede Zahl , deren Wurzel berechnet werden soll , dass sich ein Slave meldet . Dann schickt er die Berechnung ab . Die Ergebnisse sammeln sich in der Mailbox des Masters an ; schliesslich liest dieser alle aus und printet sie . FUER DIE AUSFUEHRUNG Angenommen diese Datei ist abgespeichert als example . hs - compilieren : ghc - threaded example . hs - starte einige Slaves ( in mehreren Konsolen auf dem selben Rechner ) ./ example slave localhost 8080 & ./ example slave localhost 8081 & ./ example slave localhost 8082 & ./ example slave localhost 8083 & ( oder auch im LAN , dann statt localhost die jeweilige eigene LAN - IP des Rechners ) - starte den Master , und uebergebe hinter dem Port ( hier 8084) eine Liste von Zahlen ./ example master localhost 8084 36 25 16 9 4 1 42 - -} module Main where import System . Environment ( getArgs ) import Control . Distributed . Process import Control . Distributed . Static ( staticClosure ) import Control . Distributed . Process . Closure ( mkClosure , mkStatic , remotable , functionTDict , ) 32 D. Ein etwas komplexeres Beispiel import import import import import Control . Distributed . Process . Node ( initRemoteTable ) Control . Distributed . Process . Backend . SimpleLocalnet Control . Concurrent ( threadDelay ) Data . Binary ( Binary , put , get , Get , Word8 ) Data . Typeable ( Typeable (..) ) -- Datentyp , der Befehle an die Slaves darstellt data Command = CalcSqrt ProcessId Float | SayHello deriving ( Typeable ) -- Binary - Instanz , damit Command Serializable ist . instance Binary Command where put SayHello = put (0:: Word8 ) put ( CalcSqrt p f ) = put (1:: Word8 ) >> put p >> put f get = do t <- get :: Get Word8 case t of 0 -> return SayHello 1 -> do p <- get f <- get return ( CalcSqrt p f ) slaveProcess :: ProcessId -> Process () slaveProcess master = do getSelfPid > >= send master receiveWait [ matchIf (\ x -> case x of SayHello -> True ; _ -> False ) (\ SayHello -> do liftIO $ putStrLn " hello " ) , match (\( CalcSqrt pid x ) -> do liftIO $ putStrLn $ " calculating ␣ the ␣ root ␣ of ␣ " ++ show x send pid $ (x , sqrt x ) ) ] slaveProcess master remotable [ ’ slaveProcess ] master :: Backend -> [ Float ] -> [ NodeId ] -> Process () master backend numbers slaveNodes@ ( s : _ ) = do self <- getSelfPid -- Do something interesting with the slaves liftIO $ putStrLn $ " Slaves : ␣ " ++ show slaveNodes let slaveClosure = ( $ ( mkClosure ’ slaveProcess ) self ) -- starte die Slaves slaves <- mapM (\ s -> spawn s slaveClosure ) slaveNodes -- warte , dass alle Slaves bereit sind mapM_ (\ _ -> expect :: Process ProcessId ) slaves -- sende SayHello - Befehle mapM_ (\ s -> send s SayHello ) slaves 33 D. Ein etwas komplexeres Beispiel -- fuehre die Aufgaben aus ; verteile diese dabei jeweils auf -- verfuegbare Slaves mapM_ (\ f -> do -- warte jeweils auf einen bereiten Slave slaveId <- expect -- gib ihm die Aufgabe send slaveId ( CalcSqrt self f ) ) numbers -- mache eine Ausgabe fuer alle Ergebnisse -- ( in falscher Reihenfolge , h oechst wahrs cheinl ich ) mapM_ (\ _ -> do (x , xroot ) <- expect :: Process ( Float , Float ) liftIO $ putStrLn $ " the ␣ root ␣ of ␣ " ++ show x ++ " ␣ is ␣ " ++ show xroot return () ) numbers -- beende die Slaves te rminateAllSlaves backend -- Initialisierungen main :: IO () main = do args <- getArgs case args of ( " master " : host : port : numbers ) -> do backend <- initializeBackend host port rtable startMaster backend ( master backend ( map read numbers ) ) [ " slave " , host , port ] -> do backend <- initializeBackend host port rtable startSlave backend where rtable = __remoteTable initRemoteTable 34 Literaturverzeichnis [AVWW93] Armstrong, Joe ; Virding, Robert ; Wikström, Claes ; Williams, Mike: Concurrent Programming in ERLANG. 1993 [chs] Cloud Haskell Dokumentation zum Paket Control.Distributed.Static. http://hackage.haskell.org/packages/archive/distributed-static/ 0.2.1.1/doc/html/Control-Distributed-Static.html#t:Static, [DG08] Dean, Jeffrey ; Ghemawat, Sanjay: MapReduce: simplified data processing on large clusters. In: Commun. ACM 51 (2008), Januar, Nr. 1, 107–113. http://dx.doi.org/10.1145/1327452.1327492. – DOI 10.1145/1327452.1327492 [EBPJ11] Epstein, Jeff ; Black, Andrew P. ; Peyton-Jones, Simon: Towards Haskell in the cloud. In: SIGPLAN Not. 46 (2011), September, Nr. 12, 118–129. http://dx.doi.org/10.1145/2096148.2034690. – DOI 10.1145/2096148.2034690 [erla] Erlang Dokumentation zu Link. erlang.html#link-1, [erlb] Erlang Dokumentation zu Receive. http://erlang.org/doc/reference_ manual/expressions.html#receive, [GLS94] Gropp, William ; Lusk, Ewing ; Skjellum, Anthony: Using MPI: Portable Parallel Programming with the Message-Passing Interface. Cambridge, MA : MIT Press, 1994. – xx + 307 S. 35 http://www.erlang.org/doc/man/