Skript zur Vorlesung Nebenläufige und verteilte Programmierung WS 2012/2013 Priv. -Doz. Dr. Frank Huch 17. Juni 2014 Version: 1.1 Dieses Skript basiert auf einer studentischen Vorlesungsmitschrift von Lutz Seemann, welche er dem Dozenten dankenswerter Weise zur Verfügung gestellt hat. Inhaltsverzeichnis 1 Einleitung 1.1 Sequentielle Programmierung . . . . . . . . . . . . . . . . . . 1.2 Begriffe . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1.2.1 Nebenläufigkeit (Concurrency) . . . . . . . . . . . . . 1.2.2 Parallelität . . . . . . . . . . . . . . . . . . . . . . . . 1.2.3 Verteilte Programmierung (Distributed Programming) 1.3 Inhalt der Vorlesung . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1 1 1 1 1 2 2 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Verwendung von Semaphoren) . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 2 3 4 4 5 6 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 7 7 8 8 8 9 10 10 11 12 12 14 14 17 17 19 in Haskell . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 22 22 23 24 2 Nebenläufige Systeme 2.1 Kommunikation . . . . . . . . . . . . . . . . . 2.1.1 Semaphor (Dijkstra ’68) . . . . . . . . 2.1.2 Producer-/Consumer Problem (andere 2.1.3 Dinierende Philosophen . . . . . . . . 2.1.4 Monitore (Dijkstra ’71, Hoare ’74) . . 3 Nebenläufige Programmierung in Java 3.1 Java . . . . . . . . . . . . . . . . . . . . . . . . . . . 3.2 Interface der Thread-Objekte . . . . . . . . . . . . . 3.2.1 Dämonthreads . . . . . . . . . . . . . . . . . 3.2.2 sleep()-Methode: . . . . . . . . . . . . . . . . 3.3 Synchronisation . . . . . . . . . . . . . . . . . . . . . 3.4 Synchronized Klassen-Methoden . . . . . . . . . . . . 3.5 Synchronized Anweisungen . . . . . . . . . . . . . . . 3.6 Unterscheidung der Synchronisation im OO-Kontext 3.7 Kommunikation zwischen Threads . . . . . . . . . . 3.7.1 Semantik . . . . . . . . . . . . . . . . . . . . 3.7.2 Variante von wait() mit Timeout: . . . . . . 3.7.3 Fallstudie: einelementiger Puffer . . . . . . . 3.8 Beenden von Threadausführungen . . . . . . . . . . 3.9 Warten auf Ergebnisse . . . . . . . . . . . . . . . . . 3.10 ThreadGroups . . . . . . . . . . . . . . . . . . . . . 4 Nebenläufige Programmierung 4.1 Concurrent Haskell . . . . 4.2 Kommunikation . . . . . . 4.3 Kanäle in Haskell . . . . . 5 Erlang 5.1 Sequentielle Programmierung in Erlang . . . 5.2 Nebenläufige Programmierung . . . . . . . . . 5.3 Ein einfacher Key-Value-Store . . . . . . . . . 5.4 Wie können Prozesse in Erlang synchronisiert 5.5 MVar . . . . . . . . . . . . . . . . . . . . . . 5.6 Nebenläufige Programmierung in Erlang . . . 5.7 Verteilte Programmierung in Erlang . . . . . 5.8 Verbinden unabhängiger Erlang-Prozesse . . . 5.8.1 Veränderungen des Clients . . . . . . . 5.8.2 Veränderungen des Servers . . . . . . 5.9 Robuste Programmierung . . . . . . . . . . . 5.10 Systemabstraktionen . . . . . . . . . . . . . . . . . . . . . . . . . . . . . werden? . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 6 Grundlagen der Verteilte Programmierung 6.1 7 Schichten des ISO-OSI-Referenzmodells . . . . 6.2 Protokolle des Internets . . . . . . . . . . . . . . 6.3 Darstellung von IP-Adressen/Hostnames in Java 6.4 Netzwerkkommunikation . . . . . . . . . . . . . . 6.4.1 UDP in Java . . . . . . . . . . . . . . . . 6.5 Socket-Optionen . . . . . . . . . . . . . . . . . . i . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 25 25 26 27 28 29 30 30 30 30 31 32 33 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 34 34 35 36 36 36 40 6.6 6.7 Sockets in Erlang . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Verteilung von Kommunikationsabstraktionen . . . . . . . . . . . . . . . . . . . . . . . 40 41 7 Synchronisation durch Tupelräume 7.1 Java-Spaces . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 7.2 Implementierung von Tupelräumen in Erlang . . . . . . . . . . . . . . . . . . . . . . . 42 44 44 8 Spezifikation und Testen von Systemeigenschaften 46 9 Linear Time Logic (LTL) 9.1 Implementierung von LTL zum Testen . . . . . . . . . . . . . . . . . . . . . . . . . . . 9.2 Verifikation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 9.3 Simulation einer Turing-Maschine . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 46 48 52 53 10 Transaktionsbasierte Kommunikation 10.1 Ein Bankbeispiel in Concurrent Haskell . . . . . . . 10.1.1 Gefahren bei der Programmierung mit Locks 10.2 Transaktionsbasierte Kommunikation (in Concurrent 10.2.1 Beispielprogramm . . . . . . . . . . . . . . . 10.3 Synchronisation mit Transaktionen . . . . . . . . . . 54 54 55 56 57 57 ii . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . Haskell als anderen Ansatz) . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1 Einleitung 1.1 Sequentielle Programmierung In der sequentiellen Programmierung führen Programme Berechnungen Schritt für Schritt aus. Hierdurch ergeben sich insbesondere Probleme bei der Entwicklung reaktiver Systeme, wie z.B. GUIs, Betriebssystemprogrammen oder verteilten/Web-Applikationen. Hier sollen alle möglichen Anfragen/Eingaben quasi “gleichzeitig” behandelt werden können. Ein erster Ansatz hierzu war Polling. Alle möglichen Anfragen werden in einer großen Schleife bearbeitet. Ein reaktives System läuft potentiell unendlich lange und reagiert auf seine Umgebung (z. B. GUI, Textverarbeitung, Webserver, Betriebssystemprozesse). Eine Besonderheit ist hierbei dass meist mehrere Anfragen/Eingaben möglichst gleichzeitig bearbeitet werden sollen. Wie ist es möglich Reaktivität zu gewährleisten? Ein erster Ansatz wäre mittels Polling möglich, d. h. der Abfrage aller möglichen Anfragen in einer großen Schleife, welcher jedoch folgende Nachteile hat: • Busy Waiting verbraucht Systemressourcen. • Code ist schlecht strukturiert, da alle alle Anfragen in die Schleife integriert werden müssen. Modularität ist nur eingeschränkt über Unterprozeduren möglich. • Bearbeitung einer Anfrage blockiert andere Anfragen, da Rückkehr in Schleife erst nach Beendigung vorheriger Anfragen. Somit sequentielles Bearbeiten der Anfragen und keine Reaktivität während der Bearbeitung einer Anfrage. Lösung:Nebenläufigkeit (Concurrency) Mehrere Threads (Fäden) können nebenläufig ausgeführt werden und die einzelnen Dienste eines reaktiven Systems zur Verfügung stellen. Hierdurch ergeben sich folgende Vorteile: • kein Busy Waiting (auf Interruptebene verschoben) • Bessere Strukturierung: Dienst = b (ggf. mehreren) Threads • eventuell können mehrere Anfragen gleichzeitig bearbeitet werden • System ist immer reaktiv Beachte hierbei, dass die letzten beiden Punkte von der Implementierung der Threads auf der (Betriebs-)Systemebene abhängen. 1.2 Begriffe 1.2.1 Nebenläufigkeit (Concurrency) Durch die Verwendung von Threads/Prozessen können einzelne Aufgaben/Dienste eines Programms unabhängig von anderen Aufgaben/Diensten programmiert werden. 1.2.2 Parallelität Meist in Verbindung mit High-Performance-Computing. Durch die parallele Ausführung mehrerer Prozesse (auf in der Regel mehreren Prozessoren) wird eine schnellere Ausführung angestrebt. Beachte aber, dass manchmal auch auf Ein-Prozessor-Systemen durch Nebenläufigkeit Performance-Gewinne erreicht werden können, z. B. bei geringerer Performance von Netzwerkkarten. Früher wurden insbesondere Shared-Memory-Systeme betrachtet, welche heute aber wieder in Form der Multicore-Processoren an Relevanz gewinnen. Allerdings werden oft auch Rechner-Cluster zur parallelen Programmierung eingesetzt (Distributed-Memory-Systeme) bis hin zu Netzwerken im Internet. Der Vorteil ist die Unabhängigkeit von Spezialhardware, sowie eine einfache Steigerung der zur Verfügung stehenden Prozessoren. Das größte Problem bei der parallelen Programmierung ist in der Regel eine möglichst gleichmäßige Ausnutzung der Prozessoren (Vermeidung von Sequentialisierung) und eine Minimierung der Kommunikation. 1 Wir werden in dieser Vorlesung nicht weiter speziell auf die parallele Programmierung eingehen. Die präsentierten Techniken finden hierbei aber durchaus auch Anwendung, wenngleich der Fokus sicherlich meist auf andere Aspekte gerichtet werden muss. 1.2.3 Verteilte Programmierung (Distributed Programming) Ein verteiltes System besteht aus mehreren physisch getrennten Prozessoren, welche über ein Netzwerk verbunden sind. Zum Einen werden solche Systeme wie zuvor erwähnt zur parallelen Programmierung eingesetzt. Darüber hinaus sind aber viele Anwendungen von ihrer Aufgabenstellung schon verteilt, so dass sie auch entsprechend implementiert werden müssen. Beispiele sind Telefonsysteme oder Chats. Manchmal stehen auch bestimmte Ressourcen nur an bestimmten Stellen zur Verfügung, so dass ebenfalls eine Verteilung der Anwendung notwendig ist, z. B. Geldautomaten, spezielle Datenserver. Eine Verteilung wird manchmal auch zur Performancesteigerung, z. B. Lastverteilung eingesetzt und kann auch zur Fehlertoleranz durch redundante Informationsvorhaltung eingesetzt werden. Verteilte Systeme werden in der Regel mit lokaler Nebenläufigkeit kombiniert, um Reaktivität gegenüber der Netzwerkkommunikation zu ermöglichen. Durch mehrere verteilte Prozesse kann es aber auch ohne Nebenläufigkeit zu den gleichen Problemen wie bei rein nebenläufigen Systemen kommen. 1.3 Inhalt der Vorlesung • Vergleich unterschiedlicher Konzepte zur nebenläufigen und verteilten Programmierung am Beispiel von Java, Erlang, Concurrent Haskell und weiteren Konzepten. • Ausgewählte verteilte Algorithmen. 2 Nebenläufige Systeme Generell sind immer drei Aspekte zu berücksichtigen: • Definition und Start von Threads (ggf. auch deren Terminierung) • Kommunikation zwischen Threads • Synchronisation von Threads Hierbei hängen die beiden letzten Punkte stark miteinander zusammen, da Kommunikation nur schwerlich ohne Synchonisation möglich ist und jede Synchronisation natürlich auch eine Art Kommunikation ist. Wir betrachten folgendes Beispiel in Pseudocode: 1 2 par { while true do w r i t e ( ” a ” ) } { while true do w r i t e ( ”b” ) } wobei Semantik des par-Konstrukts: Spaltet Thread in zwei Unterthreads auf und führt diese nebenläufig aus. ababab ... aaabaaabaabbbaa ... aaaaa ... bbbbbb ... Oft (aber nicht immer) deterministische Ausführung d. h. gleiche Ausgabe bei allen Läufen), die durch den Scheduler geregelt wird. Dies muss aber nicht immer der Fall sein und ein Programmierer darf sich hierauf auf keinen Fall verlassen. Der Schedule teilt den Prozessen die Prozessorzeit zu. Im Wesentlichen werden dabei folgende Arten von Schedulern unterschieden: • kooperatives Multitasking ⇒ ein Thread rechnet solange, bis er die Kontrolle abgibt (z. B. mittels yield). Die anderen warten (suspendiert) auf das Ergebnis (z. B. aaaaa ... und bbbbbb ...) • präemptives Multitasking ⇒ der Scheduler entzieht den Prozessen regelmäßig die Kontrolle. Dies erfordert eine aufwändigere Implementierung, bietet aber mehr Programmierkomfort zur 2 Gewährleistung der Reaktivität des Systems (z. B. alle Ausgaben außer aaaaa ... und bbbbbb ...) Prinzipiell sollten Programmierer nebenläufiger Systeme möglichst für beliebige Scheduler entwickeln. Einzige Ausnahme stellt manchmal die Einschränkung auf präemptives Multitasking dar. Aber selbst dies ist oft nicht notwendig, da durch Synchronisation die Kontrolle häufig abgegeben wird und somit auch kooperatives Multitasking ausreicht. 2.1 Kommunikation ¿¿¿¿¿¿¿ origin/master Wird oft über geteilte Variablen realisiert. Listing 1: Beispiel 1 2 3 int i = 0; par { i := i + 1 } { i := i ∗ 2 } write ( i ) ; Semantik: wie zuvor. Es ergibt sich Nichtdeterminismus mit mehreren Ergebnissen: i = 0; i = i + 1; i = i * 2 i = 0; i = i * 2; i = i + 1 mit dem Ergebnis 2 mit dem Ergebnis 1 Wann ist ein Programm also korrekt? • Ein sequentielles Programm ist korrekt, falls es für alle Eingaben ein korrektes Ergebnis liefert. • Ein nebenläufiges Programm ist korrekt, wenn es für alle Eingaben und alle Schedules das richtige Ergebnis liefert. In der Praxis (leider) oft “für einen (den testbaren) Schedule”, wobei folgende Probleme auftreten können: • Portierung auf andere Architektur ⇒ anderer Schedule • Erweiterung des Systems um zusätzliche Threads/zusätzliche Berechnungen ⇒ verändertes Scheduling • Beeinflussung durch Ein-/Ausgaben ⇒ Deshalb: für alle Schedules !!! (manchmal ist auch eine Einschränkung auf präemptives Multitasking okay) ⇒ Bei Korrektheit für alle Schedules verliert man den Vorteil des präemptiven Multitaskings. Deshalb wird die Korrektheit oft nur “für alle fairen Schedules” verlangt. Der Begriff fair bedeutet hierbei, dass jeder Thread nach endlicher Zeit wieder an die Reihe kommt, also falls möglich einen Schritt ausführt. Als Konsequenz daraus ist das Testen der Programme schwierig, da ggf. unendlich viele Abläufe betrachtet werden müssen und eine Beeinflussung des Schedules fast unmöglich ist. Deshalb: formale Spezifikation, formale Verifikation, Testtools und Debugger mit Einfluss auf den Scheduler. Weiteres Problem im Beispiel: Welche Anweisungen sind atomar? Der Compiler wird ein solches Programm sicherlich in Maschinencode oder Pseudocode für eine abstrakte Maschine übersetzen. Listing 2: Stackmaschinencode des Programms 1 2 3 int i = 0; par { LOAD i ; LIT 1 ; ADD; STORE i } { LOAD i ; LIT 2 ; MULT; STORE i } Aufgrund dieses Codeaufbaus ist dann ebenfalls folgende Ausführung möglich: LOAD i; LIT 2; MULT; LOAD i; LIT 1; ADD; STORE i; STORE i Diese liefert das Ergebnis i = 0, was sicherlich nicht das gewünschte Ergebnis ist. Bei dieser Ausführung ist der 1. Thread in den 2. Thread “eingebettet”. Zuweisungen können aber nicht generell atomar gemacht werden (wegen z. B. großer Berechnungen, Funktionsaufrufe). 3 Lösung: Der Programmierer bekommt die Möglichkeit zur Synchronisation. Im Folgenden werden zunächst einige bekannte Konzepte wiederholt. 2.1.1 Semaphor (Dijkstra ’68) Ziel: atomare (ununterbrochene) Ausführung von Programmabschnitten. Semaphore besitzen eine feste Schnittstelle, bestehen aus einem Integer und einer zugeordneten Prozesswarteschlange und stellen einen abstrakten Datentyp mit zwei atomaren Operationen P und V dar, wobei die Operationen der Veränderung der Variablen dienen. Listing 3: Semaphor-Operationen 1 2 3 4 5 P( s ) : IF ( s >= 1 ) THEN { s := s − 1 ; } ELSE { P r o z e s s wird i n Warteschlange zu s e i n g e t r a g e n und s u s p e n d i e r t ; s := s − 1 ; } 6 7 8 9 10 V( s ) : s := s + 1 ; IF ( Warteschlange zu s n i c h t l e e r ) THEN { Erwecke e r s t e n P r o z e s s d e r Warteschlange zu s ; } Das P steht hierbei für passieren (passeer) und das V für verlassen (verlaat). Das obige Beispiel lässt sich mittels Semaphor also wie folgt implementieren, was das Ergebnis i = 0 verhindert: 1 2 3 int i = 0; semaphor s := 1 ; par { P( s ) ; i := i + 1 ; V( s ) ; } { P( s ) ; i := i ∗ 2 ; V( s ) ; } Der initiale Wert des Semaphors bestimmt die maximale Anzahl der Prozesse im kritischen Bereich. Meist ist dies 1, was zu einem binären Semaphor führt. 2.1.2 Producer-/Consumer Problem (andere Verwendung von Semaphoren) n Producer erzeugen Waren, die dann von m Consumern verbraucht werden. Zunächst gehen wir bei der Modellierung von der Verwendung eines unbeschränkten Puffers aus: Listing 4: Zunächst mit unbeschränktem Puffer 1 semaphor num := 0 ; 2 3 4 5 6 7 8 Producer : while ( true ) { produce p r o d u c t ; push p r o d u c t t o b u f f e r ; V(num ) ; } 9 10 11 12 13 14 15 Consumer : while ( true ) { P(num ) ; p u l l p r o d u c t from b u f f e r ; consume p r o d u c t ; } Bemerkungen: Zeile 1: Füllstand des Puffers Zeile 12: an dieser Stelle findet ggf. eine Suspension statt 4 Der Semaphor fungiert als Produktzähler für den Puffer. Hochzählen mittel V im Producer und Herunterzählen mittels P im Consumer. Wir haben aber zunächst noch keine Synchronisation auf dem Puffer modelliert (kritischer Bereich). Um hier zusätzlich synchronisieren zu können, fügen wir einen zweiten Semaphor ein, welcher wie oben skiziert den kritischen Bereich schützt. Listing 5: Zusätzlich mit Synchronisation 1 2 semaphor num := 0 ; semaphor b a c c e s s := 1 ; 3 4 5 6 7 8 9 10 11 Producer : while ( true ) { produce p r o d u c t ; P( b a c c e s s ) ; push p r o d u c t t o b u f f e r ; V( b a c c e s s ) ; V(num ) ; } 12 13 14 15 16 17 18 19 20 Consumer : while ( true ) { P(num ) ; P( b a c c e s s ) ; p u l l p r o d u c t from b u f f e r ; V( b a c c e s s ) ; consume p r o d u c t ; } Bemerkung: Zeile 1: Füllstand des Puffers Zeile 15: an dieser Stelle findet ggf. eine Suspension statt Es fällt auf, dass die Verwendung von P-/V-Operationen schnell unübersichtlich werden kann. Es gibt keine strukturelle Zuordnung von P/V. 2.1.3 Dinierende Philosophen An einem Tisch mit Essen sitzen 5 Philosophen. Es gibt allerdings nur 5 Stäbchen zum Essen, wobei eine Person immer 2 Stäbchen benötigt, so dass jeweils 1 Stäbchen mit dem Nachbarn geteilt werden muss. Ein Philosoph denkt so lange, bis er hungrig wird und versucht dann nacheinander beide Stäbchen aufzunehmen und zu essen. Es findet keine zusätzliche Kommunikation untereiander statt. Die Implementierung mit Semaphoren sieht wie folgt aus: Listing 6: Ein Philosoph i von n lässt sich dann wie folgt implementieren 1 2 3 4 5 6 7 8 9 while ( true ) { think ( ) ; g e t hungry ( ) ; P( s t i c k ( i ) ) ; P( s t i c k ( ( i +1)%n ) ) ; eat ( ) ; V( s t i c k ( i ) ) ; V( s t i c k ( ( i +1)%n ) ) ; } Die Reihenfolge der V -Operationen ist hierbei egal. Problem der Implementierung: Deadlocks (Verklemmungen), falls alle Philosophen ein Stäbchen nehmen. Eine mögliche Lösung wird durch ein Zurücklegen des ersten Stäbchens aufgelöst, falls kein Zweites verfügbar ist. 5 Listing 7: Implementierung mit Zurücklegen 1 2 3 4 5 6 7 8 9 10 11 while ( true ) { think ( ) ; g e t hungry ( ) ; P( s t i c k ( i ) ) ; IF ( s t i c k ( ( i +1)%n ) == 0 ) THEN { V( s t i c k ( i ELSE { P( s t i c k ( ( eat ( ) ; V( s t i c k ( i V( s t i c k ( ( } } )); } i +1)%n ) ) ; )); i +1)%n ) ) ; An der Stelle P(stick((i+1)%n)) kann ein anderer Philosoph das Stäbchen wegnehmen. Der letzte Philosoph muss sein Stäbchen aber immer wieder weglegen. Livelocks sind aber nicht ausgeschlossen, d. h. ein oder mehrere Philosophen können verhungern (trotz eines fairen Schedules). Nachteile von Semaphoren: P-/V-Operatoren sind schwer zuordenbar (sind eigentlich wie Klammern, aber nur während der Laufzeit; nicht in der Syntax) ⇒ Codestrukturierung ist aus softwaretechnischer Sicht schwer und bietet viele mögliche Fehlerquellen ⇒ lieber: strukturiertes Konzept 2.1.4 Monitore (Dijkstra ’71, Hoare ’74) Monitore entsprechen einer Menge von Prozeduren und Datenstrukturen, die als Betriebsmittel betrachtet mehreren Threads zugänglich sind, aber nur von einem Thread zur Zeit benutzt werden können ⇒ in einem Monitor befindet sich höchstens ein Thread. Als Beispiel betrachten wir die Monitor-Implementierung eines Puffers: Listing 8: Implementierung eines beschränkten Puffers als Monitor 1 2 3 4 5 6 7 8 9 10 11 monitor b u f f e r { [ i n t ] [ 0 . . . ( n −1)] c o n t e n t s ; i n t num , wp , rp = 0 ; queue s e n d e r , r e c e i v e r ; p r o c e d u r e push ( i n t item ) { IF (num == n ) THEN { d e l a y ( s e n d e r ) ; } c o n t e n t s [ wp ] = item ; wp := (wp+1) mod n ; num := num + 1 ; continue ( r e c e i v e r ) ; } 12 function int pull () { IF (num == 0 ) THEN { d e l a y ( r e c e i v e r ) ; } i n t item := c o n t e n t [ rp ] ; rp := ( rp +1) mod n ; num := num − 1 ; continue ( s e n d e r ) ; r e t u r n item ; } 13 14 15 16 17 18 19 20 21 } Bemerkungen: Zeile 4: Prozessqueues Zeile 6: Puffer ist voll Zeile 14: Puffer ist leer Semantik: • num als Anzahl der Elemente im Puffer • wp als Pointer zum Schreiben 6 • rp als Pointer zum Lesen • delay(Q) ⇒ der aufrufende Prozess wird in die Warteschlange Q eingefügt • continue(Q) ⇒ aktiviert den ersten Thread in der Warteschlange Q, falls ein solcher vorhanden ist Vorteile: • innerhalb der Monitorprozesse können Synchronisationsprobleme ignoriert werden • alle kritischen Bereiche/Datenstrukturen befinden sich in einem abstrakten Datentyp Nachteile: • häufig zu viel Synchronisation ⇒ sequentielles Programm oder Deadlocks • wird in Programmiersprachen selten unterstützt (in Java existiert ein monitorähnliches Konzept) 3 Nebenläufige Programmierung in Java 3.1 Java Programmiersprache des Internets ⇒ Reaktivität notwendig ⇒ Nebenläufigkeit. Klasse Thread im Paket java.lang: • eigene Threads als Unterklasse von Thread implementierbar • eigentlicher Code in run()-Methode • Anlegen von Threads mit new und Konstruktor1 • Ausführung des Threads mittels der Methode start()2 Listing 9: Beispiel 1 2 3 4 5 6 7 8 9 public c l a s s C o n c u r r e n t P r i n t e r extends Thread { private S t r i n g s t r ; public C u r r e n t P r i n t ( S t r i n g s t r 1 ) { s t r = s t r 1 ; } public void run ( ) { while ( true ) { System . out . p r i n t ( s t r + ” ” ) ; } } public s t a t i c void main ( S t r i n g [ ] a r g s ) { new C o n c u r r e n t P r i n t e r ( ” a ” ) . s t a r t ( ) ; new C o n c u r r e n t P r i n t e r ( ”b” ) . s t a r t ( ) ; } } Ausgabe: Menge aus a’s und b’s. Bei der Verwendung von green Threads, ggf. auch nur a’s oder nur b’s. Wegen fehlender Mehrfachvererbung in Java ist oft das Erben von der Threadklasse problematisch. Deshalb besteht alternativ die Möglichkeit der Implementierung des Interfaces Runnable: 1 2 3 4 5 6 7 8 9 10 11 public c l a s s C o n c u r r e n t P r i n t implements Runnable { String str ; public C o n c u r r e n t P r i n t ( S t r i n g s t r 1 ) { s t r = s t r 1 ; } public void run ( ) { while ( true ) { System . out . p r i n t ( s t r + ” ” ) ; } } public s t a t i c void main ( S t r i n g [ ] a r g s ) { Runnable aThread = new C o n c u r r e n t P r i n t ( ” a ” ) ; Runnable bThread = new C o n c u r r e n t P r i n t ( ”b” ) ; new Thread ( aThread ) . s t a r t ( ) ; new Thread ( bThread ) . s t a r t ( ) ; } } 1 initialisiert 2 diese nur die Attribute ⇒ ist also sehr klein! führt die run()-Methode aus 7 Achtung this liefert jetzt kein Thread-Objekt mehr, sondern ein Runnable-Objekt. Das aktuelle Thread-Objekt bekommt man mittels Thread.currentThread(). 3.2 Interface der Thread-Objekte Name ⇒ standard main Thread, Thread-0, Thread-1, ... / getName, setName Prioritäten ⇒ Zahlen zwischen MIN_PRIORITY (1) und MAX_PRIORITY (10) mit Standard-Wert NORM_PRIORITY (5) / setPriority, getPriority (⇒ auf die Prioritäten darf man sich aber nicht verlassen!) Threadgruppe ⇒ Zuordnung von Threads zu Gruppen als Strukturierungskonzept Zustände: • erzeugt ⇒ wird durch new Thread() erreicht • aktivierbar ⇒ wird durch start oder yield sowie dem Eingabeende oder dem Ablauf der Sleep-Zeit erreicht • terminiert ⇒ wird erreicht, wenn run beendet wird • nicht aktivierbar ⇒ wird durch das Warten auf eine Eingabe oder ein sleep erreicht Das Thread-Objekt bleibt erhalten, solange es referenziert wird. Während der Ausführung existiert eine “Eigenreferenz”. Durch Aufruf der Methode yield wird dem Scheduler signalisiert, dass der Prozess die Kontrolle abgeben kann. Bei green Threads führt dieser Aufruf in der Regel zum Threadswitch, falls dies möglich ist. Bei präemptivem Scheduling kann yield auch keinen Efekt haben. 3.2.1 Dämonthreads setDaemon(true) von start(), um einen Thread als Dämonthread zu deklarieren. Idee: “unwichtiger” Hintergrundthread ⇒ Java Virtual Machine (JVM) terminiert, falls nur noch Dämonthreads existieren. Beispiel: AWT-Thread, Serverthreads, Garbage Collector Scheduler: Es gibt unterschiedliche Implementierungen • green Threads ⇒ kooperativer Scheduler in JVM (wird in der Regel nicht mehr unterstützt) • native Threads ⇒ Verwendung von Betriebssystemprozessen mit präemptivem Multitasking Vorteil: • Ausnutzung moderner Multicore-Architekturen • Wiederverwendung des Betriebssystem-Schedulers Nachteil: • größerer Verbrauch von Systemressourcen (keine light-weight Threads) 3.2.2 sleep()-Methode: Die Objekt-Methode sleep deaktiviert den ausführenden Thread um die angegebene Zeit in Millisekunden. sleep kann eine InterruptedException werfen. Hierzu später mehr. Listing 10: erste Variante 1 2 try { Thread . c u r r e n t T h r e a d ( ) . s l e e p ( 3 0 0 0 ) ; } catch ( I n t e r r u p t e d E x c e p t i o n s e ) { . . . } Listing 11: zweite Variante 1 this . s l e e p ( 3 0 0 0 ) ; 8 3.3 Synchronisation Kombination aus Lock/Unlock-Konzept und Monitoren. Thread-Methoden können als synchronized deklariert werden: • In allen synchronized-Methoden eines Objektes darf sich maximal ein Thread aufhalten. Hierzu zählen auch Unterbrechungen (auch unsynchronisierte Methoden), die in einer synchronizedMethode aufgerufen werden. • kein Verlassen der synchronized-Methode durch sleep oder yield. andere Sicht • jedes Objekt besitzt einen Lock • beim Versuch der Ausführung einer synchronized-Methode wird wie folgt vorgegangen: – Lock verfügbar ⇒ Lock nehmen und fortfahren – eigener Thread hat Lock ⇒ fortfahren – sonst ⇒ auf Lock suspendieren • Lock wird beim Verlassen der synchronized-Methode wieder freigegeben (auch bei Terminierung oder Exception) Im Vergleich zu Semaphoren sind synchronisierte Methoden: • strukturorientiert • ein “unlock” kann nicht vergessen werden • keine Suspension bei “mehrfachem” Nehmen des Locks Im Vergleich zum Monitorkonzept sind synchronisierte Methoden flexibler, da einzelne Methoden unsynchronisiert bleiben können, aber auch größere Gefahr von Fehlern. Ein einfaches Beispiel soll die Verwendung von synchronized-Methoden veranschaulichen. Wir betrachten eine Implementierung einer Klasse für ein Bankkonto: Listing 12: Beispiel 1 2 3 4 5 6 7 8 c l a s s Account { private double b a l a n c e ; public Account ( double i n i t i a l ) { b a l a n c e = i n i t i a l ; } public synchronized double g e t B a l a n c e ( ) { return b a l a n c e ; } public synchronized void d e p o s i t ( double amount ) { b a l a n c e += amount ; } } 9 10 Account a = new Account ( 3 0 0 ) ; 11 12 ... 13 14 a . deposit (100) . . . a . deposit (100); // n e b e n l a e u f i g e Ausfuehrung 15 16 ... Die möglichen Ergebnisse ohne Synchronisation sind 400, 500 oder richtiger Unsinn (z. B. falls der Double teilweise geschrieben wird, da diese Operation nicht atomar ist). Mit Synchronisation ist nur das Ergebnis 500 möglich. synchronized ist für Konstruktoren nicht erlaubt, aber auch nicht sinnvoll, da diese immer nur von einem Thread ausgeführt werden. Vererbte synchronized-Methoden müssen nicht synchronisiert sein (⇒ verfeinerte Implementierung). Nicht überschriebene Methoden bleiben synchronized. Nicht-synchronisierte Methoden können beim Überschreiben synchronized werden. 9 3.4 Synchronized Klassen-Methoden Auch Klassenmethoden können synchronisiert werden: static synchronized analog wie oben, nur auf Klassenebene. Keine Wechselwirkung mit synchronized Objektmethoden. 3.5 Synchronized Anweisungen Vererbte synchronisierte Methoden müssen nicht zwingend wieder synchronisiert sein. Wenn man solche Methoden überschreibt, so kann man das Schlüsselwort synchronized auch weglassen. Dies bezeichnet man als verfeinerte Implementierung. Die Methode der Oberklasse bleibt dabei synchronized. Andererseits können unsynchronisierte Methoden auch durch synchronisierte überschrieben werden. Klassenmethoden, die als synchronisiert deklariert werden (static synchronized) haben keine Wechselwirkung mit synchronisierten Objektmethoden. Die Klasse hat also ein eigenes Lock. Man kann sogar auf einzelne Anweisungen synchronisieren: synchronized ( expr ) b l o c k 1 Dabei muss expr zu einem Objekt auswerten, dessen Lock dann zur Synchronisation verwendet wird. Streng genommen sind synchronisierte Methoden also nur syntaktischer Zucker: So steht die Methodendeklaration 1 synchronized A m( a r g s ) b l o c k eigentlich für A m( a r g s ) { synchronized ( t h i s ) b l o c k } 1 Einzelne Anweisungen zu synchronisieren ist sinnvoll, um weniger Code synchronisieren bzw. sequentialisieren zu müssen: private double s t a t e ; 1 2 public void c a l c ( ) { double r e s ; 3 4 // do some r e a l l y e x p e n s i v e c o m p u t a t i o n ... 5 6 7 // s a v e t h e r e s u l t t o an i n s t a n c e v a r i a b l e synchronized ( t h i s ) { state = res ; } 8 9 10 11 } 12 Synchronisierung auf einzelne Anweisungen ist auch nützlich, um auf andere Objekte zu synchronisienen. Wir betrachten als Beispiel eine einfache Implementierung einer synchronisierten Collection: 1 2 3 4 class Store { public synchronized boolean hasSpace ( ) { ... } 5 public synchronized void i n s e r t ( int i ) throws N o S p a c e A v a i l a b l e E x c e p t i o n { 6 7 8 ... 9 } 10 11 } Wir möchten diese Collection nun verwenden wie folgt: 10 1 S t o r e s = new S t o r e ( ) ; 2 3 ... 4 5 6 7 i f ( s . hasSpace ( ) ) { s . insert (42); } Dies führt jedoch zu Problemen, da wir nicht ausschließen können, dass zwischen den Aufrufen von hasSpace() und insert(int) ein Re-Schedule geschieht. Da sich das Definieren spezieller Methoden für solche Fälle oft als unpraktikabel herausstellt, verwenden wir die obige Collection also besser folgendermaßen: 1 2 3 4 5 synchronized ( s ) { i f ( s . hasSpace ( ) ) { s . insert (42); } } 3.6 Unterscheidung der Synchronisation im OO-Kontext Wir bezeichnen synchronisierte Methoden und synchronisierte Anweisungen in Objektmethoden als server-side synchronisation. Synchronisation der Aufrufe eines Objektes bezeichnen wir als client-side synchronisation. Aus Effizenzgründen werden Objekte der Java-API, insbesondere Collections, nicht mehr synchronisiert. Für Collections stehen aber synchronisierte Versionen über Wrapper wie synchronizedCollection, synchronizedSet, synchronizedSortedSet, synchronizedList, synchronizedMap oder synchronizedSortedMap zur Verfügung. Sicheres Kopieren einer Liste in ein Array kann nun also auf zwei verschiedene Weisen bewerkstelligt werden: Als erstes legen wir eine Instanz einer synchronisierten Liste an: 1 L i s t <I n t e g e r > u n s y n c L i s t = new L i s t <I n t e g e r > ( ) ; 2 3 4 // f i l l t h e l i s t ... 5 6 L i s t <I n t e g e r > l i s t = C o l l e c t i o n s . s y n c h r o n i z e d L i s t ( u n s y n c L i s t ) ; Nun können wir diese Liste entweder mit der einfachen Zeile 1 I n t e g e r [ ] a = l i s t . toArray (new I n t e g e r [ 0 ] ) ; oder über 1 Integer [ ] b ; 2 3 4 5 6 synchronized ( l i s t ) { b = new I n t e g e r [ l i s t . s i z e ( ) ] ; l i s t . toArray ( b ) ; } in ein Array kopieren. Bei der zweiten, zweizeiligen Variante ist die Synchronisierung auf die Liste unabdingbar: Wir greifen in beiden Zeilen auf die Collection zu, und wir können nicht garantieren, dass nicht ein anderer Thread die Collection zwischenzeitig verändert. Dies ist ein klassisches Beispiel für Client-side synchronisation. 11 3.7 Kommunikation zwischen Threads Komunikation geschieht über geteilte Objekte. Problem: Wann erhält die Variable den Wert? Als Beispiel betrachten wir den einfachen Fall, dass ein Thread einem anderen einen Wert mitteilt und dieser beim Empfänger ausgegeben wird. Den Code können wir in folgender Klasse zusammenfassen: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 class C { private i n t s t a t e = 0 ; private boolean m o d i f i e d = f a l s e ; public synchronized void p r i n t N e w S t a t e ( ) { while ( ! m o d i f i e d ) { } ; System . out . p r i n t l n ( s t a t e ) ; modified = false ; } public synchronized void s e t V a l u e ( i n t v ) { state = v ; m o d i f i e d = true ; System . out . p r i n t l n ( ” v a l u e s e t ” ) ; } } Diese erste Lösung zeigt den erfolgten Versand durch Veränderung eines geteilten Objekts, hier boolesches Flag modified an. Ein großer Nachteil dieser Lösung ist sicherlich das Busy Waiting beim Leser (Wurde modified verändert?). Als Verbesserung können Threads in Java mittels wait()3 und Aufwecken mittels notify(). Es ergibt sich folgender Code: 1 2 3 4 5 6 7 8 9 10 11 12 class C { private i n t s t a t e = 0 ; public synchronized void p r i n t N e w S t a t e ( ) { wait ( ) ; System . out . p r i n t l n ( s t a t e ) ; } public synchronized void s e t V a l u e ( i n t v ) { state = v ; notify (); System . out . p r i n t l n ( ” v a l u e s e t ” ) ; } } Thread 1 führt printNewState() und Thread 2 führt setValue(42) nebenläufig aus. Dies ergibt die Ausgabe “value set”, “42” und nicht “42”, “value set” und auch nicht 0! Bevor wir das Beispiel weiter verfeinern, wollen wir zunächst die Semantik der Methoden wait() und notify() näher beleuchten. 3.7.1 Semantik • wait() ⇒ suspendiert den ausführenden Thread und gibt den Lock des Objektes frei • notif y() ⇒ erweckt einen4 Thead des Objektes und fährt direkt mit der eigenen Berechnung fort. Wenn kein Thread suspendiert ist, dann wird nur fortgefahren (⇒ ein notify() wird nicht gespeichert) Ausgabe von Thread 1 nach der Ausgabe von Thread 2 oder Thread 2 ist fertig und anschließendes Suspendieren von Thread 1 (hier keine Ausgabe). printNewState() kann nur Änderungen ausgeben, die nach seinem wait()-Aufruf passieren. Wie können wir gewährleisten, dass auch vorhergehende “Nachrichten” ausgeben werden? Wie bei der “busy waiting”-Idee verwenden wir zusätzlich noch ein Flag: 1 private boolean m o d i f i e d = f a l s e ; 3 verlässt 4 es die Methode und versucht beim Aufwecken die synchronized Methode wieder neu zu betreten ist nicht genau festgelegt welchen! 12 2 3 4 5 6 7 public synchronized void p r i n t N e w S t a t e ( ) { i f ( ! modified ) { wait ( ) ; } System . out . p r i n t l n ( s t a t e ) ; modified = false ; } In setValue() wird zusätzlich (irgendwo) modified = true; eingefügt. Somit gehen Nachrichten, welche früher gesendet wurden nicht verloren. Was passiert aber bei mehreren schreibenden Threads? Die erste Zustandsänderung kann übersehen werden und nicht ausgegeben werden. Als Lösung, müssen wir also auch die “Sender” synchronisieren. Diese müssen also warten, bis ein alter Wert verarbeitet wurde. Listing 13: Nichtfunktionierende Lösung 1 2 3 4 5 6 7 public synchronized void s e t V a l u e ( i n t v ) { i f ( modified ) { wait ( ) ; } state = v ; notify (); m o d i f i e d = true ; System . out . p r i n t l n ( ” v a l u e s e t ” ) ; } Nun stellt sich aber das Problem, dass notify() anstelle eines Lesers einen Schreiber erweckt und der erste geschriebene Wert immer noch verloren geht. Es ist also notwendig, alle schlafenden Threads zu erwecken einen lesenden Thread erweckt? Möglicherweise wird ein weiterer Schreiber erweckt und die Nachricht geht immer noch verloren. Eine mögliche Lösung ist alle schlafenden Threads zu erwecken und über ein bisschen Busy Waiting, die fälschlicherweise erweckten Threads wieder schlafen legen: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class C { private i n t s t a t e = 0 ; private m o d i f i e d = f a l s e ; public synchronized void p r i n t N e w S t a t e ( ) { while ( ! m o d i f i e d ) { w a i t ( ) } ; modified = false ; notify (); System . out . p r i n t l n ( s t a t e ) ; } public synchronized void s e t V a l u e ( i n t v ) { while ( m o d i f i e d ) { w a i t ( ) } ; state = v ; m o d i f i e d = true ; notify (); System . out . p r i n t l n ( ” v a l u e s e t ” ) ; } } ⇒ Es werden also alle suspendierten Objekte aufgeweckt und das richtige erkennt sich selber. Dies ist der Java-Stil zur synchronisierten Kommunikation: Verwendung von wait() in Kombination mit notifyAll() und einen bisschen Busy Waiting: while(!Nachricht erhalten) { wait(); } und notifyAll(); Dies hat aber folgende Nachteile: • sehr unkontrollierte Kommunikation • Verschwendung von Systemressourcen (Busy Waiting) • selbst bei passender wait-/notify()-Verwendung können von außen mit synchronisierten Anweisungen weitere wait()-/notify-Aufrufe hinzugefügt werden ⇒ falscher Thread wird aufgeweckt 13 3.7.2 Variante von wait() mit Timeout: wait(long) oder wait(long, int), wobei long die Zeit in Millisekunden und int die Zeit in Nanosekunden darstellen. Nach Ablauf der Zeit fordert der Thread eigenständig wieder einen Lock an. Bei notify() ist kein Zeitablauf möglich, wobei der Thread trotzdem aufgeweckt wird. wait(0) = wait(0,0) = wait() 3.7.3 Fallstudie: einelementiger Puffer In dieser Fallstudie wollen wir eine Kommunikationsabstraktion in Java entwickeln: einen einelementigen Puffer, welcher einen synchronisierten Zugriff auf einen Wert wie folgt bietet: • Puffer kann leer oder voll sein • ein Wert kann in einen leeren Puffer geschrieben werden (⇒ put()) • aus einem vollen Puffer kann ein Wert entfernt werden (⇒ take()) • take() suspendiert, falls der Puffer leer ist • put() suspendiert falls der Puffer voll ist Der einelementige Puffer kann auch als veränderbare Variable gesehen werden. Wir nennen ihn deshalb MVar (mutable variable). 1 2 3 public c l a s s MVar <T> { private T c o n t e n t = null ; private boolean empty ; 4 public MVar ( ) { empty = true ; } 5 6 7 8 public MVar(T o ) { empty = f a l s e ; content = o ; } 9 10 11 12 13 public synchronized T t a k e ( ) throws I n t e r r u p t e d E x c e p t i o n { while ( empty ) { w a i t ( ) ; } notifyAll (); empty = true ; return c o n t e n t ; } 14 15 16 17 18 19 20 public synchronized void put (T o ) throws I n t e r r u p t e d E x c e p t i o n { while ( ! empty ) { w a i t ( ) ; } notifyAll (); empty = f a l s e ; content = o ; } 21 22 23 24 25 26 27 } Bemerkungen: Zeile 1: das <T> ist eine Typvariable, d. h. eine Variable über Typen, die mit Objekttypen gefüllt werden kann ⇒ wirkt als Polymorphie Zeile 9: das Argument ist hier ein Objekt o vom Typ T Zeile 16: hier werden die Schreiber aufgeweckt, da der Puffer jetzt leer ist Zeile 23: hier werden die Leser aufgeweckt, da der Puffer jetzt wieder voll ist Dieser Code ist unschön, da er ein wenig zielgerichtetes Erwecken der suspendierten Threads implementiert. 14 1. Verbesserung Gruppierung der Threads in Gruppen, was ein Erwecken von jeweils nur relevanten Threads ermöglicht: (i) take()-Threads (ii) put()-Threads Listing 14: Verwendung von Synchronisationsobjekten zur zielgerichteten Erweckung der Threads 1 2 3 public c l a s s MVar <T> { private T c o n t e n t = null ; private boolean empty ; 4 private Ob ject r = new Obj ect ( ) ; w = new Obj ect ( ) ; 5 6 7 public MVar ( ) { empty = true ; } 8 9 10 11 public MVar(T o ) { empty = f a l s e ; content = o ; } 12 13 14 15 16 public T t a k e ( ) throws I n t e r r u p t e d E x c e p t i o n { synchronized ( r ) { while ( empty ) { r . w a i t ( ) ; } synchronized (w) { empty = true ; w. n o t i f y A l l ( ) ; return c o n t e n t ; } } } 17 18 19 20 21 22 23 24 25 26 27 public void put (T o ) throws I n t e r r u p t e d E x c e p t i o n { synchronized (w) { while ( ! empty ) { w . w a i t ( ) ; } synchronized ( r ) { empty = f a l s e ; content = o ; r . notifyAll (); } } } 28 29 30 31 32 33 34 35 36 37 38 } Bemerkungen: Zeile 5/6: dies sind die beiden Synchronisationsobjekte Zeie 21: r.wait() darf nur ausgeführt werden, wenn man einen Lock auf r hat Zeile 23: hier befindet sich maximal 1 Thread und es gilt empty = false Zeile 26/27 dies stellt einen kritischen Bereich dar, der über r und wait() abgesichert ist Zeile 36: hier wird kein Lock gehalten Zeile 38: hier befindet sich maximal 1 Thread und es gilt empty = true Achtung: take() hält ein Lock auf r und wartet ggf. auf w und put() hält Lock auf w und wartet auf r! (race condition) ⇒ Gefahr eines Deadlocks! Es ist aber kein Deadlock möglich, da an relevanten Programmpunkten unterschiedliche Werte für empty vorliegen müssen ⇒ es können keine zwei Threads gleichzeitig an diesem Punkt sein! 15 2. Verbesserung Da wir jetzt schon spezielle Lockobjekte zur Unterscheidung der Leser und Schreiber eingeführt haben, sollte es doch auch möglich sein gezielt nur einen Leser bzw. Schreiber zu erwecken, d. h. notify() statt notifyAll() zu verwenden. 1 2 3 public c l a s s MVar <T> { private T c o n t e n t = null ; private boolean empty ; 4 private Ob ject r , w ; 5 6 7 8 9 10 public MVar ( ) { empty = true ; r = new Ob ject ( ) ; w = new Ob ject ( ) ; } public MVar(T o ) { empty = f a l s e ; content = o ; r = new Ob ject ( ) ; w = new Ob ject ( ) ; } 11 12 13 14 15 16 17 public T t a k e ( ) throws I n t e r r u p t e d E x c e p t i o n { synchronized ( r ) { while ( empty ) { r . w a i t ( ) ; } synchronized (w) { empty = true ; w. n o t i f y ( ) ; return c o n t e n t ; } } } 18 19 20 21 22 23 24 25 26 27 28 public void put (T o ) throws I n t e r r u p t e d E x c e p t i o n { synchronized (w) { while ( ! empty ) { w . w a i t ( ) ; } synchronized ( r ) { empty = f a l s e ; content = o ; r . notify (); } } } 29 30 31 32 33 34 35 36 37 38 39 } Auf den ersten Blick scheint es auch möglich, dieses Programm weiter zu vereinfachen und anstelle von while nur ein if zu verwenden. Dies ist aber nicht möglich, wenn man sich nochmal die genaue Semantik des wait und notify Mechanismus anschaut. Bei notify wird ein schlafender Thread nicht unmittelbar erweckt und weiterlaufen. Vielmehr bewirbt sich der erweckte Thread um den Lock, welcher zunächst noch vom notify ausführenden Thread gehalten wird. Dieser kann ihm aber von einem gerade neu initierten Leser (oder Schreiber) weggeschnappt werden, wodurch die Semantik der MVar verletzt würde und ggf. zwei Threads einen Wert auslesen oder setzen würden. Nach außen bieten MVars eine gute Abstraktionsstruktur, welche durchaus sinnvoll zur synchronisierten Threadkommunikation eingesetzt werden kannn. [⇒] Verwende in Zukunft besser MVars anstatt synchronized, wait() und notify(). Ähnliche Kommunikationsabstraktionen werden im Paket java.util.concurrent.atomic zur Verfügung gestellt. Allerdings Java-typisch mit sehr vielen Kontrollmöglichkeiten. Als Verwendungsbeispiel für MVars können wir das Producer-/Consumer-Problem betrachten. Auf den ersten Blick scheint ein einelementiger Puffer nicht besonders geeignet zu sein, da Produzenten blockiert werden. Dieses Konzept ist in der Praxis aber oft gut geeignet, da auch “automatisch” eine Lastbalancierung stattfindet. Ein Überangebot führt zu einer Sequentialisierung auf Seiten der Producer, wodurch mehr Rechenzeit für die Consumer zur Verfügung steht (siehe Abbildung 1). 16 Abbildung 1: Verwendung von MVar’s beim Producer-/Consumer-Problem Abbildung 2: 3.8 Beenden von Threadausführungen Die Methode isAlive() liefert true, falls der Thread noch läuft (auch wenn er gerade suspendiert ist). Beenden eines Threads durch: a) normales Beenden der run-Methode (durch Terminieren des Codes) b) Abbruch der run-Methode c) Aufruf der destroy-Methode (deprecated) d) Ende aller Nicht-Dämon-Threads Bei a) und b) werden alle Locks freigegeben. Bei c) wird der Thread wirklich “abgeschossen”, ohne dass Aufräumarbeiten stattfinden (destroy ist allerdings nicht immer implementiert und sollte nicht mehr verwendet werden). Alternative Interrupts (s.u.). d) ist unproblematisch, da wir ja sowieso am Programmende sind. Unterbrechung von Threads • Es stehen folgende Methoden der Klasse Thread zur Verfügung: – public void interrupt() ⇒ sendet einen Interrupt an den Thread, was zu einer InterruptedException in den Zuständen sleep und wait führt – public boolean isInterrupted() ⇒ testet, ob der Thread ein Interrupt erhielt – public static boolean interrupted() ⇒ testet den aktuellen Thread auf einen Interrupt und löscht ggf. den “Interrupt-Zustand” zum Beenden einer langen nebenläufigen Berechnung (diese Methode kann ein Thread nur auf sich selbst anwenden) • InterruptedExceptions können entweder mittels try ... catch ... finally aufgefangen oder weitergeleitet werden (mittels throws InterruptedException) ⇒ in MVar: put / read / take throws InterruptedException • siehe Abbildung 2 3.9 Warten auf Ergebnisse Wenn eine Threadberechnung terminiert, kann sie Ergebnisse anderen Threads zur Verfügung stellen, z. B. als Attribut oder über spezielle Selektor-Methoden. Um dies zu synchronisieren könnte man isAlive() und busy waiting verwenden. D. h. Threads welche auf das Ergebnis eines anderen Threads warten, überprüfen ob der Thread noch lebt und fahren fort, wenn dies nicht mehr der Fall ist. Dies 17 verschwendet aber Systemressourcen. Alternative 1: Kommunikation und Synchronisation über MVars Alternative 2: join-Methode der Thread-Klasse. Ein Thread, der die join-Methode eines anderen Threads aufruft, suspendiert, bis der andere Thread terminiert. Listing 15: Beispiel 1 2 3 4 5 c l a s s Calc extends Thread { private i n t r e s u l t ; public void run ( ) { r e s u l t = . . . ; } public i n t g e t R e s u l t ( ) { return r e s u l t ; } } 6 7 8 9 10 11 12 13 14 15 16 c l a s s showJoin { void run ( ) { Calc c a l c = new Calc ; calc . start (); try { calc . join (); System . out . p r i n t l n ( ” R e s u l t i s ”+ c a l c . g e t R e s u l t ( ) ) ; } catch ( I n t e r r u p t e d E x c e p t i o n e ) { . . . } } } Wie sleep() und wait() kann join() durch einen Interrupt unterbrochen werden. Die Semantik von join() ist ähnlich: while(calc.isAlive()) { wait(); } mit zugehörigem notifyAll(); bei Terminierung. Variante mit Timeout join(long Nanosekunden) join(long Nanosekunden, int Millisekunden) Der Grund für die Wiedererweckung (Terminierung oder Zeitablauf) ist nicht unterscheidbar. Durch ein zusätzliches isAlive() kann dies herausgefunden werden. Weitere Konzepte Lese- und Schreiboperationen auf int und boolean sind atomar. Die Kombination von mehreren aber nicht, genauso wie dies nicht für long und double gilt. Dies kann ggf. zu inkonsistenten Werten führen (z. B. wenn die erste Hälfte geschrieben wurde) Lösung: Synchronisieren oder Werte als volatile deklarieren ⇒ atomare Veränderungen weiteres Problem: Compileroptimierung Wir betrachten einen Thread, der den Code in Listing 16 ausführt. Wenn nun andere Threads den Wert der Variablen currentValue verändern, so sollte sich auch die Ausgabe des Threads ändern. Listing 16: Beispiel 1 2 3 4 5 currentValue = 5; while ( true ) { System . out . p r i n t l n ( c u r r e n t V a l u e ) ; Thread . s l e e p ( 1 0 0 0 ) ; } Dies ist aber nicht unbedingt der Fall und es ist möglich, dass weiterhin nur 5 und nicht der neu gesetzte Wert ausgegeben wird. Dies liegt an einer Compileroptimierung: Konstantenpropagation. Dadurch wird der Wert der Variablen currentValue vom Compiler direkt in das System.out.println() hinein geschrieben, da der Wert ja immer gleich ist. Als Lösung sollte die Variable currentValue als volatile deklariert werden, was eine Compileroptimierung verhindert. 18 3.10 ThreadGroups Threads können in Gruppen zusammengefasst werden, was die Strukturierung von Threadstrukturen ermöglicht. Es handelt sich um eine hierarchische Struktur, was bedeutet, dass ThreadGroups sowohl Threads als auch ThreadGroups enthalten können. Prinzipiell gehört jeder Thread zu einer ThreadGroup. Standardmäßig ist dies die gleiche ThreadGroup wie die des Elternthreads. Die ThreadGroup kann verändert werden mittels public Thread(ThreadGroup g [, String name]), wobei der Name optional ist. Nun ThreadGroups können mittels public ThreadGroup(String name) als Untergruppe zur aktuellen Threadgruppe generiert werden. ThreadGroups bieten folgende Methoden: • getName() • getParent() • setDaemon(boolean daemon) • setMaxPriority(int maxPri) • int activeCount() ⇒ gibt die Anzahl der Threads aus; die Gruppe bekommt man mittels isAlive() • int enumerate(Thread[] threadsInGroup, boolean recursive), wobei das int die Anzahl der Threads, Thread[] die Threads der Gruppe und boolean = true ist (auch bei Untergruppen) Die Semantik ist hier etwas anders als bei Threads: ThreadGroups werden gelöscht, wenn sie leer sind. Außerdem gibt es SecurityHandler auf Gruppen (z. B. Wer darf Thread in ThreadGroups erzeugen?“) ” ThreadPools Fast das gleiche Konzept wie ThreadGroups gibt es im Paket java.util.concurrent mit den ThreadPool noch einmal. Hierbei handelt es sich allerdings um eine Abstraktion zur Verwaltung von laufenden Threads. Sie ermöglichen eine Einsparung von System-Ressourcen durch Wiederverwendung von Threads. Executer Interface Das Executer Interface stellt Möglichkeiten zum Ausführen von Threads im Rahmen von ThreadPools zur Verfügung: • void execute(Runnable o) ⇒ nimmt einen Task (das Runnable o / nicht Thread!) in einen ThreadPool auf; also e.execute(r) mit e als Executer-Objekt und r als Runnable-Objekt statt new(Thread(r)).start() • die Klasse Executer stellt statische Methoden zur Generierug von Executoren zur Verfügung: static ExecuterService newFixedThreadPool(int n) startet den ThreadPool mit n Threads und stellt eine Verfeinerung von Executer dar. • Ein neuer Task (mit execute hinzugefügt) wird von einem dieser Threads ausgeführt, falls ein Thread verfügbar ist. Ansonsten wird er in eine Warteschlange eingefügt und gestartet, falls ein anderer Task beendet wird ⇒ kein Overhead durch Threadstart. Die Threads werden erst beim Shutdown beendet. • Es sind alternative Methoden verfügbar (z. B. zum bedarfsgesteuerten Starten von Threads [mit Obergrenze] oder dynamischen ThreadPools) Debugging Es gibt nur eingeschränkte Infomationen, die für ein printf-Debugging genutzt werden können: • die Klasse Thread stellt einige Methoden zum Debuggen zur Verfügung: – public String toString() ⇒ liefert Stringrepräsentation des Thread inklusive Name, Priorität und ThreadGroup 19 Abbildung 3: – public static void dumpStack() ⇒ liefert einen Stacktrace des aktuellen Threads auf System.out • für ThreadGroups gibt es: – public String toString() – public void list() ⇒ druckt rekursiv die toString()-Ergebnisse aller Threads/ ThreadGroups aus Beurteilung von Java Threads • Kommunikation über geteilten Speicher (Objekte) • gute Kapselung des wechselseitigen Ausschlusses in Obkjekten mittels synchronized • Synchronisation der Kommunikation mittels wait(), notify() und notifyAll(), was allerdings sehr ungerichtet ist • meistens Verwendung von notifyAll() und busy waiting • viele zusätzliche Konzepte: Prioritäten, Dämonen, sleep()/yield(), Server-Side- und ClientSide-Synchronisation mittels synchronized-Ausdrücken, Interrupts, isAlive(), join(), ThreadGroups, ThreadPools, Synchronisation auf Klassenebene, . . . • Vererbung und synchronized-Methoden passen nicht gut zusammen • Kommunikationsabstraktion (MVar, Chan oder entsprechende Konstrukte in java.util.concurrent) ersetzen inzwischen viele andere Konzepte und ermöglichen eine Programmierung auf höherem Niveau (Message Passing). Auch hier gibt es aber wieder viele Konfigurationsmöglichkeiten (z. B. timeouts). Unbeschränkter Puffer (Chan) Ein Chan dient zur Kommunikation zwischen Schreibern und Lesern, so dass die Schreiber nie suspendieren. Implementierung mittels verketteter Liste Leser müssen ggf. suspendieren, falls der Chan leer ist. Leser und Schreiber müssen synchronisiert werden. Zur Implementierung (der Zeiger) wollen wir MVars verwenden (siehe Abbildung 3). 1 2 c l a s s Chan<T> { private MVar<MVar<ChanElem<T>>> read , w r i t e ; 3 4 private c l a s s ChanElem<T> { 5 6 7 private T v a l u e ; private MVar<ChanElem<T>> next ; 8 9 10 11 12 public ChanElem (T v , MVar<ChanElem<T>> n ) { value = v ; next = n ; } 20 Abbildung 4: 13 public T v a l u e ( ) { return v a l u e ; } 14 15 16 17 public MVar<ChanElem<T>> next ( ) { return next ; } 18 19 20 } 21 22 public Chan ( ) throws I n t e r r u p t e d E x c e p t i o n { MVar<ChanElem<T>> h o l e = new MVar<ChanElem<T> >(); r e a d = new MVar<MVar<ChanElem<T>>>(h o l e ) ; w r i t e = new MVar<MVar<ChanElem<T>>>(h o l e ) ; } 23 24 25 26 27 28 public T r e a d ( ) throws I n t e r r u p t e d E x c e p t i o n { MVar<ChanElem<T>> rEnd = r e a d . t a k e ( ) ; ChanElem<T> item = rEnd . r e a d ( ) ; r e a d . put ( item . next ( ) ) ; return item . v a l u e ( ) ; } 29 30 31 32 33 34 35 public void w r i t e (T o ) { MVar<ChanElem<T>> newHole = new MVar<ChanElem<T>>(), oldHole = write . take ( ) ; o l d H o l e . put (new ChanElem<T>(o , newHole ) ) ; w r i t e . put ( newHole ) ; } 36 37 38 39 40 41 42 } Bemerkungen: Zeile 26: hier findet eine Synchronisation der Leser statt, da durch read die MVar kurzzeitig leer wird, was dann andere Leser suspendiert Zeile 27: liest die erste MVar s und liefert das erste Element (v1 ) Zeile 28: hier wird die Blockade der anderen Leser wieder aufgehoben Zeile 34: hier werden die anderen Schreiber suspendiert Siehe Abbildung 4 Beachte: Die Implementierung ermöglicht gleichzeitiges Lesen und Schreiben des nicht-leeren Chans! Als Erweiterung können wir zum Chan noch eine Methode isEmpty hinzufügen, welche den aktuellen Zustand des Chans analysiert. Sicherlich muss bei der Verwendung einer solchen Methode bedacht werden, dass erhaltene Information nur eine sehr begrenzte Gültigkeit hat. Man kann sich aber dennoch Fälle vorstellen, in denen solch eine Methode nützlich sein kann. 1 2 3 4 public boolean isEmpty ( ) { MVar<ChanElem<T>> rEnd = r e a d . t a k e ( ) ; MVar<ChanElem<T>> wEnd = w r i t e . t a k e ( ) ; w r i t e . put (wEnd ) ; 21 r e a d . put ( rEnd ) ; return ( rEnd==wEnd ) ; 5 6 7 } In der Übung wird die MVar-Implementierung um eine read-Methode, welche die Mvar nicht leert erweitert. Mit dieser Methode ist eine Vereinfachung dieser Implementierung möglich. Beachte aber, dass diese Implementierung nicht in allen Situationen das erwartete Verhalten hat. Durch die Synchronisierung mit anderen lesenden Threads, kann es dazu kommen, dass isEmpty blockiert, nämlich genau dann, wenn ein anderer Thread bereits von einem leeren Chan lesen will und hierbei natürlich suspendiert. Lösungsideen für dieses Problem werden in den Übungen besprochen. Eine weitere Lösung werden wir bei abstrakteren Konzepten später in dieser Vorlesung kennen lernen. Noch ein Konzept: Pipes (oder ein alter Schritt in Richtung Message Passing) Pipes werden meist für Datei- oder Netzwerkkommunikation verwendet. Mit ihnen ist aber auch eine Kommunikation zwischen Threads möglich. Eine Pipe besteht aus zwei Strömen: (i) PipedInputStream (ii) PipedOutputStream , welche bei der Konstruktion durch: PipedOutputStream out = new PipedOutputStream(); PipedInputStream in = new PipedInputStream(out); oder mittels in.connect(out) verbunden werden können. Pipes haben in der Regel eine beschränkte Größe. Threads, die aus einer leeren Pipe lesen oder in eine volle Pipe schreiben wollen, werden suspendiert. Das Schreiben erfolgt mittels: • write(int b) • write(byte[], int off, int len) ⇒ schreibt b[of f ] . . . b[of f + len] • flush() ⇒ setzt die gepufferte write-Methode um • close() ⇒ schließt das Schreibende das Lesen mittels: • int read() ⇒ liefert -1, falls die Pipe geschlossen ist • int read(byte[], int off, int len) ⇒ suspendiert bei einer unzureichenden Zahl an Bytes auf der Pipe • int available() ⇒ liefert die Anzahl der Bytes in der Pipe • close() Andere Daten als Bytes können mittels DataOutputStream oder PipeWriter übertragen werden. Ein Vergleich mit Chan und MVar findet in der Übung statt. 4 Nebenläufige Programmierung in Haskell Auch Haskell bietet in Form der Bibliothek Concurrent Haskell die Möglichkeit nebenläufig zu programmieren. Hierbei stellt eine MVar, wie wir sie in Java implementiert haben die grundlegende Datenstruktur dar. 4.1 Concurrent Haskell Erweiterung von Haskell 98 um Konzepte zur nebenläufigen Programmierung. Modul: Control.Concurrent. Zur Generierung neuer Threads steht die Funktion forkIO :: IO () -> IO ThreadId 22 zur Verfügung. Das Argument vom Typ IO() stellt den abzuspaltenden Prozess dar. Die Aktion ist sofort beendet. Die ThreadId identifiziert den neu abgespalteten Thread. Ein Thread kann seine eigene ThreadId mit folgenden Funktion abfragen: myThreadId :: IO ThreadId Threads werden bei Terminierung beendet oder mittels killThread::ThreadId->IO() abgeschos” sen“. Im Gegensatz zu Erlang werden die ThreadIds aber nicht zur Kommunikation verwendet. Listing 17: Beispiel 1 2 3 4 5 6 7 main : : IO ( ) main = do id <− f o r k I O ( l o o p 0 ) s t r <− getLine i f s t r == ” s t o p ” then k i l l T h r e a d id e l s e return ( ) 8 9 10 11 12 l o o p : : Int−>IO ( ) l o o p n = do print n l o o p ( n+1) Das Beispiel ist nicht ganz so schön, da das gesamte Programm ohnehin terminiert, falls der Hauptthread terminiert. 4.2 Kommunikation Haskell bietet Kommunikationsabstraktionen namens MVar und Chan, wie wir sie auch schon in Java kennengelernt (und dort auch implementiert) haben. newEmptyMVar :: IO(MVar a) kreiert eine neue leere MVar newMVar :: a -> IO(MVar a) kreiert neue gefüllte MVar. takeMVar :: MVar a -> IO a putMVar :: Mvar a - > a -> IO() readMVar :: MVar a -> IO a tryPutMVar :: Mvar a -> a -> IO Bool ... Dinierende Philosophen Listing 18: Dinierende Philosophen 1 2 3 4 5 6 7 8 p h i l : : MVar ( ) −> MVar ( ) −> IO ( ) p h i l l r = do takeMVar l takeMVar r −− e a t putMVar l ( ) putMVar r ( ) phil l r 9 10 11 12 13 14 main : : IO ( ) main = do s t i c k s <− c r e a t e S t i c k s 5 startPhils sticks p h i l ( l a s t s t i c k s ) ( head s t i c k s ) 15 16 17 18 19 20 c r e a t e S t i c k s : : I n t −> IO ( [ MVar ( ) ] ) c r e a t e S t i c k s 0 = return [ ] c r e a t e S t i c k s n = do s t i c k s <− c r e a t e S t i c k s ( n−1) s t i c k <− newMVar ( ) 23 21 return ( s t i c k : s t i c k s ) 22 23 24 25 26 27 s t a r t P h i l s : : [ MVar ( ) ] −> IO ( ) s t a r t P h i l s [ ] = return ( ) s t a r t P h i l s ( l : r : s t i c k s ) = do forkIO ( p h i l l r ) startPhils ( r : sticks ) --eat ist ein Kommentar [_] matched eine ein-elementige Liste 4.3 Kanäle in Haskell Da die MVar als grundlegende Datenstruktur auch in Haskell vorhanden ist, ist eine Übertragung der Chan-Implementierung recht einfach möglich: 1 module Chan where 2 3 import C o n t r o l . Concurrent . MVar 4 5 6 data Chan a = Chan (MVar ( Stream a ) ) −− r e a d end (MVar ( Stream a ) ) −− w r i t e end 7 8 ty pe Stream a = MVar ( ChanElem a ) 9 10 data ChanElem a = ChanElem a ( Stream a ) 11 12 13 14 15 16 17 newChan : : IO ( Chan a ) newChan = do h o l e <− newEmptyMVar r e a d <− newMVar h o l e w r i t e <− newMVar h o l e return ( Chan r e a d w r i t e ) 18 19 20 21 22 23 24 writeChan writeChan newHole oldHole putMVar putMVar : : Chan a −> a −> IO ( ) ( Chan w r i t e ) v = do <−newEmptyMVar <− takeMVar w r i t e o l d H o l e ( ChanElem v newHole ) w r i t e newHole 25 26 27 28 29 30 31 readChan : : Chan a −> IO a readChan ( Chan r e a d ) = do rEnd <− takeMVar r e a d ( ChanElem v newReadEnd ) <− readMVar rEnd putMVar r e a d newReadEnd return v 32 33 34 35 36 37 38 39 isEmptyChan : : Chan a −> IO Bool isEmptyChan ( Chan r e a d w r i t e ) = do rEnd <− takeMVar r e a d wEnd <− takeMVar w r i t e putMVar w r i t e wEnd putMVar r e a d rEnd return ( rEnd==wEnd) Die Verwendung von Kanälen ermöglicht ein anderes Programmierparadigma, das so genannte Message Passing. Message Passing wird z. B. in Erlang und Scala zur nebenläufigen und auch verteilten Programmierung eingesetzt. Im folgenden wollen wir uns zunächst intensiver mit Erlang auseinandersetzen. 24 5 Erlang Erlang wurde von der Firma Ericsson entwickelt. Es ist eine funktionale Programmiersprache mit speziellen Erweiterungen zur nebenläufigen und verteilten Programmierung. Sie wurde gezielt zur Entwicklung von Telekommunikationsanwendungen eintwickelt, wird inzwischen aber auch in anderen Bereichen zur Entwicklung insbesondere verteilter Anwendungen eingesetzt. 5.1 Sequentielle Programmierung in Erlang Erlang ist eine funktionale Programmiersprache mit strikter Auswertung, d. h. es werden zuerst die Argumente ausgewertet und erst anschließend die Funktion aufgerufen, wie in imperativen Sprachen auch. Erlang ist ungetypt/(dynamisch) getypt ⇒ Laufzeittypfehler für unpassende Basistypen (z. B. true + 7 ⇒ Widerspruch). Erlang ist verfügbar unter www.erlang.org und auf den SUN’s unter /home/erlang/bin/erl installiert. Listing 19: Fakultätsfunktion (factorial) als Beispiel 1 2 f a c ( 0 ) −> 1 ; f a c (N) when N>0 −> N ∗ f a c (N−1). Erklärungen: • “;” trennt die Regeln der Funktions-Definition • Variablen werden groß geschrieben • der guard when N>0 gehört zur Auswahl der Regel mit hinzu • jede Funktionsdefinition endet mit einem Punkt. Statt let-Ausdrücken bind-once-Variablen: 1 2 3 f a c (N) when N>0 −> N1 = N−1, F1 = f a c (N1 ) , N ∗ F1 . Erklärungen: • “,” als Trennzeichen der Sequenzen • die letzte Zeile ist das Ergebnis der Berechnung, welches automatisch ausgegeben wird Funktionen werden nach ihrer Stelligkeit unterschieden: fac(N,M) -> ... ist eine andere Funktion als fac/1, was für eine einstellige Funktion wie z. B. fac(N) steht. jedes Erlang-Programm benötigt einen Modulkopf: • -modul(math). ⇒ das Modul befindet sich in der Datei math.erl • -export([fac/1]). ⇒ erlaubt den Zugriff von außen Ablauf: > erl ... > c(math). > math:fac(6). 720 Erklärungen: • c(math). compiliert das Modul math in der Datei math.erl. Alternativ kann in der Shell mittels erlc math.erl kompiliert werden. • halt(). oder Crtl + C“und dann a führt zum Verlassen der Erlang-Umgebung. ” Datenstrukturen/Werte: • Atome: a, b, c, 0, 1 ,true, false, ..., ’Hallo’, ’willi@mickey’ ⇒ beginnen mit Kleinbuchstaben oder stehen in Hochkommata. 25 • Tupel: { t1 , . . . , tn } mit beliebigen Werten ti , z. B. Bäume: {node,{node,empty,1,empty},2,empty} • Listen: [e|l], wobei e das Kopfelement der Liste ist. l ist eine Restliste und [] steht für die leere Liste. Als Abkürzung können wir auch [1,2,3] statt [1|[2|[3|[]]]] schreiben. Beispiel: Konkatenation zweier Listen app([], Ys) -> Ys; app([X|Xs], Ys) -> [X|app(Xs, Ys)]. Patern Matching: • Pat = e, wobei Pat eine der folgenden Formen hat: – Variable – Atom – Tupel {P at1 , . . . , P atn } – [ P at1 |P ate ] – [] – [ P at1 , . . . , P atn ] Das Matching eines Pattern bindet ungebundene Variablen an Teilstrukturen der Werte; z.B: – {X,[1|Ys]} = {42,[1,2,3]} das X gebunden wird [X/42, Ys/[2,3]], wobei X/42 bedeutet, dass die 42 an – {X,{1,Y},Y} = {3,{1,[]},42} müsste – {X,{1,Y},Y} = {3,{1,42},42} macht nicht, da Y an [] und 42 gebunden werden [X/3, Y/42] Beachte: Die Substitution wird auf alle vorkommenden Variablen angewendet, auch in Pattern. Ein “Umbinden” ist nicht mehr möglich! Beispiel: {X,[Y|Ys]} = {42,[1,2]} [X|Xs] = [3,4] wobei die zweite Zeile zum Fehler führt, da X bereits an 42 gebunden ist. 5.2 Nebenläufige Programmierung Erlang-Prozesse sind light-weight, so dass davon gleichzeitig sehr viele existieren können. Ein Erlang-Prozess besteht aus folgenden drei Bestandteilen: • Programmterm t, der “ausgewertet” wird • Prozess-Identifier pid • Mailbox (Message Queue), die Nachrichten aufnimmt Starten von Erlang-Prozessen: • spawn(module, func,[a1 , . . . , an ]) ⇒ startet einen nebenläufigen Prozess, dessen Berechnung mit module:func(a1 , . . . , an ) startet • spawn terminiert sofort und liefert den pid des neuen Prozesses zurück • Beachte: func muss in module exportiert werden (auch wenn spawn im selben Modul ausgerufen wird) Alternativ kann man spawn auch eine parameterlose Funktion übergeben, welche dann als separater Prozess gestartet wird: spawn(fun () -> module:fun(a1 ,. . . ,an ) end). Der Funktionsrumpf muss hierbei natürlich keine definierte Funktion sein. Es kann ein beliebiger Erlang-Ausdruck sein. Seinen eigenen pid erhält ein Prozess durch Aufruf von self(). Senden von Nachrichten: • pid!v ⇒ sendet den Wert v an den Prozess pid. Der Wert wird hinten in die Mailbox von pid eingetragen. • pids sind Werte wie Atome oder Listen und können somit auch in Datenstrukturen gespeichert 26 oder auch verschickt werden • es ist gewährleistet, dass Nachrichten nicht verloren gehen Empfangen von Nachrichten: receive-Ausdruck 1 2 3 4 5 6 receive Pat1 −> e1 ; ... Patn −> e n [ a f t e r t −> e ] end • die Pattern werden der Reihe nach gegen die Werte der eigenen Mailbox gematcht (exakte Reihenfolge siehe Übung) • erstes passendes Pattern P ati wird gewählt (→ Substitution σ). Das gesamte receive-Statement wird durch σ(ei ) ersetzt. Die Berechnung macht mit ei weiter. • bei after t -> e findet ein Timeout nach t Millisekunden statt und es geht mit e weiter Spezialfall: t = 0 ⇒ die Pats werden nur genau einmal gegen alle Mailbox-Einträge gematcht 5.3 Ein einfacher Key-Value-Store Listing 20: Server des Key-Value-Stores 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 −module ( dataBase ) . −e x p o r t ( s t a r t / 0 ) . s t a r t ( ) −> dataBase ( [ ] ) . dataBase (KVs) −> receive { a l l o c a t e , Key , P} −> case lo oku p ( Key , KVs) o f n o t h i n g −> P ! f r e e , r e c e i v e { v a l u e , V, P} −> dataBase ( [ { Key ,V} | KVs ] ) end ; { j u s t ,V} −> P ! a l l o c a t e d , dataBase (KVs) end ; { lookup , Key , P} −> P ! lo oku p ( Key , KVs ) , dataBase (KVs) end . 17 18 19 20 l o o k u p (K , [ ] ) −> n o t h i n g ; l o o k u p (K, [ { K,V} | ] ) −> { j u s t ,V} ; l o o k u p (K, [ | KVs ] ) −> lo oku p (K, KVs ) ; Listing 21: Beispiel-Client zum Key-Value-Stores 1 2 3 −module ( c l i e n t ) . −e x p o r t ( [ s t a r t / 0 ] ) . −import ( base , [ g e t L i n e / 1 , p r i n t L n / 1 ] ) 4 5 6 7 s t a r t ( ) −> DB = spawn ( database , s t a r t , [ ] ) , c l i e n t (DB) . 8 9 10 11 12 13 14 15 16 17 c l i e n t (DB) −> I n p u t = g e t L i n e ( ” ( l ) ookup / ( i ) n s e r t > ” ) , case I nput o f ” i ” −> K = g e t L i n e ( ” key > ” ) , DB! { a l l o c a t e , K, s e l f ( ) } , receive f r e e −> V = getLine ( ” value > ” ) , DB! { v a l u e , V, s e l f ( ) } ; 27 18 19 20 21 22 23 24 25 26 27 28 29 a l l o c a t e d −> p r i n t L n ( ” key a l l o c a t e d ” ) end ; ” l ” −> K = g e t L i n e ( ” key > ” ) , DB! { lookup , K, s e l f ( ) } , receive n o t h i n g −> p r i n t L n ( ”Key not a l l o c a t e d ” ) ; { j u s t , V} −> p r i n t L n (V) end ; X −> p r i n t L n (X) end , c l i e n t (DB) . Bemerkungen: Zeile 3: base ist ein Modul mit IO-Funktionen (wird auf der Web-Site bereit gestellt) Zeile 6: DB enthält den zugehörigen Prozess-Identifier Zeile 27: dieser Fall fängt alle “sonst”-Fälle ab. Hier kann auch _ -> 42 stehen. Beachte: Es können auch mehrere Clienten mit einem Server kommunizieren. Dies entspricht einem Nameserver, kann aber auch als geteilter Speicher gesehen werden. Dieser Key-Value-Store stellt zum Einen eine einfache erste Anwendung zur nebenläufigen Programmierung in Erlang dar. Er zeigt aber auch, dass man mit Hilfe eines Server-Prozesses und Message Passing, das Modell eines geteilten Speichers simulieren kann. Der Schlüssel entspricht hierbei der Speicheradresse und der Wert dem abgelegten wert. Im nächsten Schritt sollten wir also untersuchen, ob auch ähnliche Synchronisationsmechanismen, wie sie bei geteiltem Speicher verwendet werden, simuliert werden können. 5.4 Wie können Prozesse in Erlang synchronisiert werden? Als Beispiel: Implementierung der dinierenden Philosophen Idee: Repräsentiere den Zustand eines Sticks innerhalb eines speziellen Stick-Prozesses. Die Funktionen zum Nehmen und Hinlegen eines Essstäbchens können dann wie folgt definiert werden: Erlang-Prozesse sind seiteneffektsfrei, bis auf das Senden von Nachrichten! Listing 22: Essstäbchen 1 2 3 4 5 6 stickDown ( ) −> receive { take , P} −> P ! took , stickUp ( ) . −> stickDown ( ) end . 7 8 9 10 11 s t i c k U p ( ) −> receive put −> stickDown ( ) end . 12 13 14 15 16 t a k e ( St ) −> St ! { take , s e l f ( ) } , receive took−> ok end . 17 18 put ( St ) −> St ! put . Hierbei ist zu beachten, dass ein entsprechender Stabprozess zwischen den Zuständen stickUp() und stickDown() hin- und herwechselt. Die take-Nachricht wird aber nur im stickDown()-Zustand verarbeitet. Bei stickUp() verbleibt sie in der Mailbox. Außerdem wartet der Prozess, der take/2 ausführt im receive, bis eine entsprechende Bestätigungsnachricht (took) verschickt wurde. Nun ist es einfach, die Philosophen zu realisieren: 28 Listing 23: Dinierende Philosophen 1 2 3 4 5 −module ( d i n i n g P h i l s ) . −e x p o r t ( [ s t a r t / 1 , stickDown ( ) / 0 , p h i l / 3 ] ) . s t a r t (N) when N >= 2 −> S t i c k s = c r e a t e S t i c k s (N) , startPhils ([ l i s t s : last ( Sticks )| Sticks ] , 0). 6 7 c r e a t e S t i c k s ( 0 ) −> [ ] ; 8 9 10 11 12 c r e a t e S t i c k s (N) −> S t i c k = spawn ( d i n i n g P h i l s , stickDown , [ ] ) , S t i c k s = c r e a t e S t i c k s (N−1) , [ Stick | Sticks ] . 13 14 s t a r t P h i l s ( [ ] , N) −> ok ; 15 16 17 18 s t a r t P h i l s ( [ SL , SR | S t i c k s ] , N) −> spawn ( d i n i n g P h i l s , p h i l , [ SL , SR , N] ) , s t a r t P h i l s ( [ SR | S t i c k s ] , N+1). 19 20 21 22 23 24 25 26 27 28 29 30 p h i l ( SL , SR , N) −> b a s e : p r i n t (N) , base : printLn ( ” i s thinking ” ) , t i m e r : s l e p (10+N) , // #4 t a k e ( SL ) , t a k e (SR ) , b a s e : p r i n t (N) , base : printLn ( ” i s e a t i n g ” ) , put ( SL ) , put (SR ) , p h i l ( SL , SR , N ) . Bemerkungen: Zeile 2: stickDown()/0 und phil/3 müssen exportiert werden, damit diese gestartet werden können Zeile 23: die Zeile ist zur direkten “Deadlockvermeidung” notwendig An diesem Beispiel erkennt man gut das Prinzip der Synchronisation in Erlang. Als weiteres Beispiel betrachten wir noch die Implementierung einer MVar in Erlang. 5.5 MVar Als weiteres Beispiel wollen wir eine MVar in Erlang implementieren. Hierbei können wir genau wie bei der Stick-Implementierung vorgehen. Wir müssen nur zusätzlich einen Wert als Zustand des MVarProzesses vorsehen. Listing 24: MVar in Erlang 1 −module (mVar ) . 2 3 −e x p o r t ( [ new/ 0 ,new/ 1 ,mVar/ 1 , t a k e / 1 , put / 2 ] ) . 4 5 new( ) −> spawn (mVar , mVar , [ empty ] ) . 6 7 new(V)−> spawn (mVar , mVar , [ { f u l l ,V } ] ) . 8 9 10 11 12 13 14 mVar( empty ) −> r e c e i v e { put , V, P} −> P ! put , mVar( { f u l l ,V} ) end ; mVar( { f u l l ,V} ) −> r e c e i v e { take , P} −> P ! { took ,V} , mVar( empty ) end . 15 16 17 t a k e (MVar) −> MVar ! { take , s e l f ( ) } , 29 18 19 20 receive { took ,V} −> V end . 21 22 23 24 25 26 put (MVar ,V) −> MVar ! { put , V, s e l f ( ) } , receive put −> ok end . Im Gegensatz zur Implementierung der Stäbchen, verwenden wir hier keine zwei sich abwechselnd aufrufenden Funktionen, sondern unterschieden die Zustände durch zwei unterschiedliche Argumente (empty und {full,...}). 5.6 Nebenläufige Programmierung in Erlang Wir haben nun gesehen, wie man geteilten Speicher und lockbasierte Synchronisation in Erlang simulieren kann. In der Praxis greift man allerdings selten auf diesen Ansatz zurück. Vielmehr verwendet man ausgezeichnete Server-Prozesse, welche die Anfragen an ein Objekt sequentialisieren und hierdurch den geteilten Zustand schützen. 5.7 Verteilte Programmierung in Erlang Nebenläufige Prozesse haben keinen gemeinsamen Speicher und können deshalb problemlos auf mehreren Rechnern verteilt werden. Nachrichten werden automatisch in TCP-Nachrichten umgewandelt. (siehe Abbildung 5). Es ist auch möglich, dass mehrere Knoten auf einem Rechner laufen. Starten eines Erlang-Knotens: erl -name willi ⇒ da ansonsten keine veteilte Kommunikation möglich ist, da der Prozess nicht bekannt ist Alternative: erl -sname willi ⇒ analog, funktioniert aber nur im gleichen Subnetz Starten von Prozessen auf anderen Knoten: • Voraussetzung: Erlang-Shell muss auf dem anderen Knoten/Rechner laufen • z. B. DB = spawn(’[email protected]’, database, start,[]) in obigem Beispiel • allgemein: spawn(Knoten, module, func, args) wobei das funktionale Ergebnis wieder eine PID ist, welche jetzt auch IP-Adresse und Knotennamen mitkodiert. Dies eignet sich zum Auslagern von Berechnungen (z. B. zur Lastverteilung). 5.8 Verbinden unabhängiger Erlang-Prozesse In offenen Systemen (z. B. Telefon, Chat) ist es nötig, Verbindungen zur Laufzeit herzustellen. Dazu ist das globale Registrieren eines Prozesses auf einem Knoten notwendig: register(name, pid). Senden an restliche Prozesse: {name, knoten}! msg, was meistens nur für den “Erstkontakt” verwendet wird. Danach findet ein Austausch der PID’s statt (z. B. Database-Server läuft auf Knoten “servermickey” ⇒ erl -sname server auf mickey). 5.8.1 Veränderungen des Clients Listing 25: Veränderung des Clients 1 2 3 4 5 s t a r t −> { dbServer , ’ s e rv e r @m i c ke y ’ } ! { connect , s e l f ( ) } , receive { c onnected , DB} −> c l i e n t (DB) end . 30 Abbildung 5: 5.8.2 Veränderungen des Servers Listing 26: Veränderung des Servers 1 2 3 s t a r t ( ) −> r e g i s t e r ( dbServer , s e l f ( ) ) , database ( [ ] ) . 4 5 6 7 8 9 10 11 d a t a b a s e (L) −> receive { a l l o c a t e , . . . } −> . . . ; { lookup , . . . } −> . . . ; { connect , P} −> P ! { connected , s e l f ( ) } , d a t a b a s e (L) end . Es sind also nur wenige Veränderungen beim Übergang von nebenläufiger zu verteileter Programmierung notwendig. Als weiteres Beispiel wollen wir einen verteilten Chat implementieren, der aus einem (registrierten) Server und beliebig vielen passenden Clients besteht. Listing 27: Veränderung des Clients 1 2 −module ( c h a t S e r v e r ) . −e x p o r t ( [ s t a r t / 0 ] ) . 3 4 5 s t a r t ( ) −> r e g i s t e r ( chat , s e l f ( ) ) , run ( [ ] ) . 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 run ( C l i e n t s ) −> receive { connect , Pid , Name} −> case l oo k up 2 (Name , C l i e n t s ) o f n o t h i n g −> Pid ! { connected , names ( C l i e n t s ) , s e l f ( ) } , b r o a d c a s t ( C l i e n t s , { l o g i n , Name } ) , run ( [ { Pid , Name } | C l i e n t s ] ) ; { j u s t , } −> Pid ! nameOccupied , run ( C l i e n t s ) end ; { message , Nonsens , Pid } −> case lo oku p ( Pid , C l i e n t s ) o f { j u s t , Name} −> b r o a d c a s t ( C l i e n t s , { msg , Name , Nonsens } ) , run ( C l i e n t s ) ; n o t h i n g −> run ( C l i e n t s ) end ; { l o g o u t , Pid } −> 31 case lo oku p ( Pid , C l i e n t s ) o f { j u s t , Name} −> R e m a i n i n g C l i e n t s = remove ( Pid , C l i e n t s ) , b r o a d c a s t ( R e m a i n i n g C l i e n t s , { l o g o u t , Name } ) , run ( R e m a i n i n g C l i e n t s ) ; n o t h i n g −> run ( C l i e n t s ) end 26 27 28 29 30 31 32 end . 33 34 35 36 names ( [ ] ) −> [ ] ; names ( [ { , Name } | C l i e n t s ] ) −> [ Name | names ( C l i e n t s ) ] . %names ( C l i e n t s ) −> l i s t s : map( fun ( { ,X})−>X end , C l i e n t s ) . 37 38 39 40 b r o a d c a s t ( [ ] , ) −> ok ; b r o a d c a s t ( [ { Pid , } | C l i e n t s ] , Msg ) −> Pid ! Msg , b r o a d c a s t ( C l i e n t s , Msg ) . 41 42 43 44 l o o k u p ( , [ ] ) −> n o t h i n g ; l o o k u p (K, [ { K,V} | ] ) −> { j u s t ,V} ; l o o k u p (K, [ | KVs ] ) −> lo oku p (K, KVs ) . 45 46 47 48 l o o k u p 2 ( , [ ] ) −> n o t h i n g ; l o o k u p 2 (V, [ { K,V} | ] ) −> { j u s t ,K} ; l o o k u p 2 (V , [ | KVs ] ) −> l oo k up 2 (V, KVs ) . 49 50 51 52 remove ( , [ ] ) −> [ ] ; remove (K, [ { K, } | KVs ] ) −> remove (K, KVs ) ; remove (K, [ KV| KVs ] ) −> [KV| remove (K, KVs ) ] . Listing 28: Veränderung des Clients 1 2 −module ( c h a t C l i e n t ) . −e x p o r t ( [ c o n n e c t / 2 , keyboard / 2 ] ) . 3 4 5 6 7 8 9 10 11 12 c o n n e c t (Name , Node ) −> { chat , Node } ! { connect , s e l f ( ) , Name} , receive { c onnected , Names , S e r v e r P i d } −> b a s e : p r i n t ( Names ) , spawn ( c h a t C l i e n t , keyboard , [ S e r v e r P i d , Name ] ) , run ( S e r v e r P i d ) ; nameOccupied −> 42 end . 13 14 15 16 17 run ( S e r v e r P i d ) −> receive ??? −> . . . end . 18 19 20 21 22 23 24 25 26 keyboard ( S e r v e r P i d , Name) −> S t r = b a s e : g e t L i n e (Name ) , case S t r o f ” bye ” −> S e r v e r P i d ! { l o g o u t , s e l f ( ) } ; −> S e r v e r P i d ! { message , Str , s e l f ( ) } , keyboard ( S e r v e r P i d , Name) end . 5.9 Robuste Programmierung In Erlang sterben Prozesse normalerweise unbemerkt; Nachrichten an terminierte Prozesse gehen ohne Fehlermeldungen verloren. Prozesse können aber auch mittels link(pid) verbunden werden. Durch einen Link wird eine bidirektionale Verbindung zwischen dem ausführendem Prozess und dem Prozess pid aufgebaut. Stirbt 32 einer der gelinkten Prozesse (auch bei Terminierung), so wird der andere ebenfalls terminiert. Dies ist zum Aufräumen bei Prozessende praktisch. Alternativ ist es aber auch möglich, anstelle kooperativ mit zu sterben, eine Nachricht zu empfangen, falls der andere gelinkte Prozesse abstürzt, ist auch der Empfang einer Benachrichtigung möglich. Wurde mit folgendem Funktionsaufruf process_flag(trap_exit, true) ein Prozessflag gelöscht, so erhält man anstelle des “Mitsterbens” eine Nachricht der Form: {’EXIT’, reason, pid}, wobei reason den Grund für die Prozessterminierung und pid den abgestürzten Prozess angibt. 5.10 Systemabstraktionen Bei der Entwicklung von Erlang-Anwendungen fällt auf, dass in verteilten Systemen immer wieder ähnliche Prozess-Muster auftreten. Diese wurden als generische Varianten heraus gearbeitet und stehen als Bibliothek in Erlang zur Verfügung. In der Vorlesung haben wir uns den gen_server etwas genauer angeschaut. Er eigent sich zur Entwicklung von Client-Server-Systemen. Es werden insbesondere zwei unterschiedliche Arten der Anfrage von Clienten beim Server unterstützt: synchrone Kommunikation (wartet auf Antwort; gen_server:call) und asynchrone Kommunikation (kein Warten auf Antwort; gen_server:cast). Außerdem werden natürlich auch die anderen Erlang-Mechanismen, wie Linking und Code-Austausch zur Laufzeit unterstützt. Der Vorteil der Verwendendung des Frameworks ist, dass bei der Implementierung des Protokolls weniger Fehler gemacht werden können und aufbauend noch weitere Abstraktionen, wie z.B. der Supervisiontree, definiert werden können. Als Beispiel für die Verwendung eines gen_servers realisieren wir nun unseren Chat in diesem Framework: Listing 29: Chat unter Verwendung des generischen Servers 1 2 −module ( genChat ) . −b e h a v i o u r ( g e n s e r v e r ) . 3 4 5 6 7 −e x p o r t ( [ s t a r t / 0 ] ) . −e x p o r t ( [ l o g i n / 2 , msg / 2 , l o g o u t / 1 , who / 1 ] ) . −e x p o r t ( [ i n i t / 1 , h a n d l e c a l l / 3 , h a n d l e c a s t / 2 , h a n d l e i n f o / 2 , terminate /2 , code change / 3 ] ) . 8 9 10 s t a r t ( ) −> b a s e : putStrLn ( ” S e r v e r s t a r t e d ” ) , g e n s e r v e r : s t a r t l i n k ( { l o c a l , c h a t } , genChat , [ ] , [ ] ) . 11 12 13 i n i t ( ) −> b a s e : putStrLn ( ” S e r v e r i n i t i a l i z e d ” ) , {ok , [ ] } . 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 % Asynchronous i n t e r f a c e h a n d l e c a s t ( { message , Text , P} , C l i e n t s ) −> case l o oku p (P , C l i e n t s ) o f nothing −> ok ; { j u s t , Name} −> b r o a d c a s t ( { message , Text , Name} , C l i e n t s ) end , { noreply , C l i e n t s } ; h a n d l e c a s t ( { l o g o u t , P} , C l i e n t s ) −> case l o oku p (P , C l i e n t s ) o f nothing −> { n o r e p l y , C l i e n t s } ; { j u s t , Name} −> NewClients=remove (P , C l i e n t s ) , b r o a d c a s t ( { l o g o u t , Name} , NewClients ) , { n o r e p l y , NewClients } end . 29 30 31 32 33 34 35 36 % Synchronous i n t e r f a c e h a n d l e c a l l ( { l o g i n , Name} , { ClientP , } , C l i e n t s ) −> base : p r i n t ( ClientP ) , b r o a d c a s t ( { l o g i n , Name} , C l i e n t s ) , { reply , { l o g g e d I n , s e l f ( ) , l i s t s : map( fun ( { ,N} ) −> N end , C l i e n t s ) } , [ { ClientP , Name } | C l i e n t s ] } ; 33 37 38 39 40 h a n d l e c a l l ( who , C l i e n t P , C l i e n t s ) −> { reply , { o n l i n e , l i s t s : map( fun ( { ,N} ) −> N end , C l i e n t s ) } , Clients }. 41 42 43 44 h a n d l e i n f o ( Msg , C l i e n t s ) −> b a s e : putStrLn ( ” Unexpected Message : ”++b a s e : show ( Msg ) ) , { noreply , C l i e n t s } . 45 46 t e r m i n a t e ( shutdown , C l i e n t s ) −> ok . 47 48 49 50 c o d e c h a n g e ( OldVsn , S t a t e , E x t r a ) −> % Code t o change S t a t e {ok , S t a t e } . 51 52 53 %C l i e n t i n t e r f a c e l o g i n ( Node , Name) −> g e n s e r v e r : c a l l ( { chat , Node } , { l o g i n , Name } ) . 54 55 msg ( Pid , Msg ) −> g e n s e r v e r : c a s t ( Pid , { message , Msg , s e l f ( ) } ) . 56 57 l o g o u t ( Pid ) −> g e n s e r v e r : c a s t ( Pid , { l o g o u t , s e l f ( ) } ) . 58 59 who ( Pid ) −> g e n s e r v e r : c a l l ( Pid , who ) . 60 61 62 63 64 %h e l p e r f u n c t i o n s b r o a d c a s t ( , [ ] ) −> ok ; b r o a d c a s t ( Message , [ { P , } | C l i e n t s ] ) −> P ! Message , b r o a d c a s t ( Message , C l i e n t s ) . 65 66 67 68 remove ( , [ ] ) −> [ ] ; remove (K, [ { K, } | KVs ] ) −> remove (K, KVs ) ; remove (K, [ KV| KVs ] ) −> [KV| remove (K, KVs ) ] . 69 70 71 72 l o o k u p ( , [ ] ) −> n o t h i n g ; l o o k u p (K, [ { K,V} | ] ) −> { j u s t ,V} ; l o o k u p (K, [ | KVs ] ) −> lo oku p (K, KVs ) . Ähnlich funktionieren die generischen Abstraktionen einer Finite-State-Maschine (gen_fsm)und des Event Handlings (gen_event). Die Prozesse der generische Prozessabstraktionen können dann auch noch mit Hilfe der Supervisiontree-Abstraktion struturiert werden. Hier können dann Strategien, wie die Anwendung auf den Ausfall einzelner Komponenten reagieren soll, definiert werden. Hierdurch wird die Entwicklung robuster Anwendungen zusätzlich unterstützt. 6 Grundlagen der Verteilte Programmierung Computer bilden Knoten in Netzwerken und können miteinander kommunizieren. Wegen der Komplexität der Netzwerktheorie wurden Schichten (Layer) zur Aufgabenverteilung eingeführt. Gängigste Ansatz: Open Systems Interconnection (OSI)-Modell der International Standards Organisation (ISO) 6.1 7 Schichten des ISO-OSI-Referenzmodells 1. Physical Layer Stellt den Strom auf der Leitung bzw. die 0 und 1 dar 2. Data Link Layer Gruppierung von Bits/Bytes in Frames (Startmarken, Endmarken, Checksummen) Versand zwischen benachbarten Knoten defekte Frames werden verworfen 3. Network Layer Bildung von Datenpaketen statt Frames (Netzwerkadresse, Routing) 34 4. Transport Layer automatische Fehlererkennung und Fehlerkorrektur Flusskontrolle zur Begrenzung der Datenmenge (⇒ Vermeidung von Überlast) 5. Session Layer Anwendung-zu-Anwendung-Verbindung Kommunikationssitzung, aber auch verbindungslose Kommunikation möglich 6. Presentation Layer Representation von Daten und ihrer Konvertierung (z. B. Kompression, Verschlüsselung) 7. Application Layer Anwendungen (z. B. Java-Anwendung) 6.2 Protokolle des Internets 1. Internet Protocol (IP): • befindet sich auf Schicht 3 (network layer) • wird sowohl in local area networks (LANs) als auch in wide area networks (WANs) verwendet • Informationsaustausch zwischen Endknoten (Hosts) mit IP-Paketen oder IP-Datagrammen • keine Abhängigkeit zwischen Paketen • verbindungslos • Versionen: IPv4 (32 Bit-Adressen), IPv6 (128 Bit-Adressen) • IP-Adresse: z. B. 134.245.248.202 für falbala“ ” • Die IP-Adresse ist für uns eine eindeutige Adresse im Internet. Sie wird beim Routing verwendet. • Alternativ: Hostnames, Domain Name System (DNS) “falbala.informatik.uni-kiel.de” wobei “falbala” der Rechnername und “informatik.uni-kiel.de” die (Sub-) Domain sowie “uni-kiel.de” die Domain sind 2. Transmisson Control Protocol (TCP): • befindet sich auf Schicht 4 (transport layer) • garantiert die Zustellung (durch Wiederholung) und richtige Reihenfolge der gesendeten Bytes • verwendet IP • sendet Byte-Sequenz • sorgt für faire Lastverteilung • arbeitet mit Ports (in der Regel) von 0 bis 65535 • hält die Verbindung zwischen Hosts 3. User Datagram Protocol (UDP): • befindet sich auf Schicht 4 (transport layer) • sendet nur Datenpakete • arbeitet mit Ports • keine garantierte Zustellung • keine garantierte richtige Reihenfolge • verbindungslos • schneller als TCP • ist fast nur ein Wrapper für IP 35 6.3 Darstellung von IP-Adressen/Hostnames in Java Die Klasse InetAddress im Paket java.net behandelt sowohl IPv4 als auch IPv6. Es stehen zwei Klassenmethoden zur Konstruktion zur Verfügung: • static InetAddress getByName(String hostname) throws java.net.UnknownHostException mit folgenden Möglichkeiten für den hostname: – IP-Adresse ⇒ das Objekt wird angelegt, falls die IP-Adresse bekannt ist – Domain-Name ⇒ DNS-Lookup; das Objekt wird angelegt, falls die IP-Adresse bekannt ist – “localhost” ⇒ das Objekt wird mit der IP-Adresse 127.0.0.1 angelegt • getAllByName(String hostname) ⇒ liefert alle IP-Adressen eines Hosts Zur Ausgabe stehen weiterhin folgende Methoden zur Verfügung: • String getHostAddress() ⇒ liefert eine IP-Adresse des Hosts • String getHostName() ⇒ liefert den bei der Konstruktion angegebenen hostname • boolean equals(InetAddress i) ⇒ vergleicht zwei IP-Adressen 6.4 Netzwerkkommunikation Der Rechner kommuniziert über sogenannte Ports (durchnummeriert von 0 bis 65535) mit dem Netzwerk. Auf Programmiersprachenseite werden die Ports über Sockets angesprochen, wobei pro Port nur eine Socketverbindung möglich ist. 6.4.1 UDP in Java Z. B. nützlich bei DNS-Anfragen, Audio-/Videodaten, Zeitsignalen, bei zeitkritischen Anwendungen. Eigentlich nur Wrapper für IP-Pakete. Datagram Packet Klasse: • repräsentiert UDP-Pakete • Verwendung beim Senden und Empfangen ⇒ unterschiedliche Bedeutung der Adresse: – Senden ⇒ Zieladresse – Empfangen ⇒ Sourceadresse (günstig für Antwort) • Aufbau: siehe Abbildung 6 Konstruktoren: • DatagramPacket(byte[] buffer, int length) ⇒ zum Empfang von Paketen • DatagramPacket(byte[] buffer, int length, InetAdress dest_addr, int dest_port) ⇒ zum Senden Methoden zum Modifizieren: getAddress, setAddress, getData, setData, getLength, setLength, getPort, setPort zur Anbindung an Ports: Datagram Socket Klasse: • zum Senden und Empfangen von Datagram Paketen • Aufbau siehe Abbildung 7 • Konstruktoren: ? DatagramSocket() throws SocketException ⇒ nur zum Senden von UDP-Paketen (freier Port wird verwendet und belegt); eine Exception wird geworfen, falls kein freier Port mehr vorhanden ist ? DatagramSocket(int port) throws SocketException ⇒ zum Senden und Empfangen geeignet (ist aber eher für Server gedacht); eine Exception wird geworfen, falls der Port belegt ist • wichtigste Methoden: ? close() ⇒ Schließen des Sockets und Freigeben des Ports ? getLocalPort, setLocalPort 36 Abbildung 6: Abbildung 7: ? getLocalAddress ? getReceiveBuffersize, setReceiveBuffersize ⇒ Abfragen bzw. Setzen der maximalen Größe des Puffers ? void receive(DatagrammPacket p) throws IOException ⇒ liest UDP-Paket (gepuffert) und schreibt dieses in p 1. IP- und PortAdresse werden mit Senderadresse und Senderport überschrieben 2. das length-Attribut enthält die tatsächliche Länge, die kleiner oder gleich der Größe des Pakets ist 3. blockiert, bis das Paket empfangen wurde Programmierung mit UDP verwendete Klassen: • DatagramPacket mit receive und send ⇒ das, was übermittelt werden soll • DatagramSocket Für receive gibt es auch einen Timeout, welcher mit der Methode int getSoTimeout() gelesen und mit setSoTimeout(int t) gesetzt werden kann. Die Zeit wird hierbei in Millisekunden angegeben und definiert, wie lange receive maximal blockiert; falls keine Nachricht in dieser Zeit eintrifft, so gibt es eine java.io.InterruptedIOException. Die möglichen Verbindungen können auch eingeschränkt werden: • connect(InetAddress remoteAddr, int remotePort) ⇒ es sind nur noch Verbindungen mit dem angegebenen Rechner möglich (Filter für ein- und ausgehende Pakete) • disconnect • getInetAddr • getPort Statt byte[] können auch Streams angebunden werden. Als Beispiel betrachten wir eine einfache Client-/Server-Architektur, die einen Inkrementserver zur Verfügung stellt. Der Client schickt einen Wert hin und erhält den inkrementierten Wert zurück. 1 import j a v a . n e t . ∗ ; 37 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public c l a s s Sender { public s t a t i c void main ( S t r i n g [ ] a r g s ) { byte [ ] b = new byte [ 1 ] ; b [ 0 ] = Byte . p a r s e B y t e ( a r g s [ 0 ] ) ; DatagramPacket p a c k e t = new DatagramPacket ( b , 1 ) ; try { p a c k e t . s e t A d d e s s ( I n e t A d d r e s s . getByName ( ” mickey . i n f o r m a t i k . uni−k i e l . de ” ) ) ; packet . setPort ( 6 0 0 0 1 ) ; DatagramSocket s o c k e t = new DatagramSocket ( ) ; s o c k e t . send ( p a c k e t ) ; socket . r e c e i v e ( packet ) ; System . out . p r i n t l n ( ” R e s u l t : ”+b [ 0 ] ) ; } catch ( E x c e p t i o n e ) { System . out . p r i n t l n ( ”Bad I n t e r n e t c o n n e c t i o n ” ) ; } } } 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public c l a s s R e c e i v e r { public s t a t i c void main ( S t r i n g [ ] a r g s ) { byte [ ] b = new byte [ 1 ] ; b [ 0 ] = 42; try { DatagramPacket p a c k e t = new DatagramPacket ( b , 1 ) ; DatagramSocket s o c k e t = new DatagramSocket ( 6 0 0 0 1 ) ; while ( b [ 0 ] ! = 0 ) { socket . r e c e i v e ( packet ) ; System . out . p r i n t l n ( ” R e c e i v e d : ”+b [ 0 ] ) ; b[0]++; s o c k e t . send ( p a c k e t ) ; } } catch ( E x c e p t i o n e ) { System . out . p r i n t l n ( ”Bad I n t e r n e t c o n n e c t i o n ” ) ; } } } Bemerkungen: Zeile 5: Byte-Array mit einem Eintrag Zeile 6: parseByte(...) übersetzt das Hauptargument als Byte (-128 bis 127) Zeile 7: das erste Argument ist das Byte-Array; das zweite Argument die Packetlänge Zeile 13: Anfrage an den Server Zeile 14: Abwarten der Antwort (blockiert bis irgendein Packet ankommt) Zeile 23: der Inhalt ist hier egal, da als Erstes etwas empfangen und der Inhalt dabei überschrieben wird Zeile 26: dient der Definition des zu hörenden Ports (“Server-Port” des Dienstes) Zeile 27: Beenden des Receivers über java Sender -1 möglich Zeile 28: blockiert, bis eine Anfrage reinkommt Zeile 30: Inkrement-Service Zeile 31: Absenden der Antwort (Adresse und Port sind durch den Paketeingang bereits definiert; Verwendung der Adresse des Senders für die Antwort) Problem: falls der Sender keine Antwort bekommt, so blockiert dieser dauerhaft ⇒ eine Lösungsmöglichkeit wäre ein Timeout ggf. mit erneuter Anfrage (unsichere Kommunikation). Transmission Control Protocol (TCP) Eigenschaften: • verbindungsorientiertes Protokoll • sichere Übertragung ist garantiert (es gehen keine Pakete verloren) 38 • Versenden von Datenströmen ⇒ werden automatisch in TCP-Pakete aufgebrochen • richtige Reihenfolge ist garantiert • die Bandbreite wird zwischen mehreren TCP-Verbindungen fair geteilt • TCP unterscheidet zwischen Client und Server einer Verbindung • der TCP-Client initiiert eine Verbindung • der TCP-Server antwortet auf diese Verbindung ⇒ bidirektionale Verbindung ist etabliert • TCP-Sockets in java.net ⇒ verbinden zwei Ports auf (eventuell) unterschiedlichen Rechnern (kann auch für UDP genutzt werden) Konstruktoren: • Socket(InetAddr addr, int port) thorws java.net.IOException ⇒ stellt Verbindung zu addr und port her, wobei als lokaler Port dieser Verbindung ein freier Port verwendet wird • Socket(InetAddr addr, int port, InetAddr localAddr, int localPort) throws java.net.IOException ⇒ analog wie oben, aber mit festem lokalem Port (→ besser nicht verwenden, da ein Fehler auftritt, falls Port besetzt ist) • Socket(String host, int port) throws java.net.IOException Beachte: Socket blockiert, bis die Verbindung aufgebaut ist. Methoden: • close() ⇒ schließt die Verbindung (die Daten sollten vorher “geflushed” werden, d. h. der Puffer sollte geleert und die Daten übertragen werden) • InetAddr getInetAddress() ⇒ liefert remoteAddress • int getPort() ⇒ liefert den remotePort • getLocalInetAddress • getLocalPort • InputStream getInputStream() • OutputStream getOutputStream() • es ist auch eine Konfiguration der Socket-Verbindung (z. B. Timeout, etc.) möglich Lesen und Schreiben geschieht über die Streams. Listing 30: Im Beispiel verbindet sich der Client mit dem Server und empfängt eine Nachricht 1 2 import j a v a . n e t . ∗ ; import j a v a . i o . ∗ ; 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public c l a s s D a y t i m e C l i e n t { public s t a t i c void main ( S t r i n g [ ] a r g s ) { try { S o c k e t s o c k e t = new S o c k e t ( ” mickey ” , 6 0 0 0 1 ) ; System . out . p r i n t l n ( ” Connection e s t a b l i s h e d ” ) ; B u f f e r e d R e a d e r r e a d e r = new B u f f e r e d R e a d e r ( new InputStreamReader ( s o c k e t . g e t I n p u t S t r e a m ( ) ) ); System . out . p r i n t l n ( ” R e s u l t : ”+r e a d e r . r e a d L i n e ( ) ) ; socket . close ( ) ; } catch ( IOException i o e ) { System . out . p r i n t l n ( ” E r r o r : ”+i o e ) ; } } } TCP-Server: Klasse ServerSocket Konstruktoren: • ServerSocket(int port) throws java.io.IOException ⇒ “horcht” auf dem Port • ServerSocket(int port, int NumberOfClients) throws java.io.IOException, wobei numberOfClients die Größe der Backlog-Queue (Anzahl der wartenden Clients; default: 50) darstellt Zum Akzeptieren von Verbindungen: 39 • Socket accept() throws java.io.IOException ⇒ wartet auf den Client und akzeptiert diesen → Verbindung • close() ⇒ schließt den ServerSocket • getSoTimeout() • setSoTimeout(int ms) → Setzen eines Timeout für das accept Listing 31: Server des Beispiels 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public c l a s s DayTimeServer { public s t a t i c void main ( S t r i n g [ ] a r g s ) { try { S e r v e r S o c k e t s e r v e r = new S e r v e r S o c k e t ( 6 0 0 0 1 ) ; while ( true ) { Socket nextClient = s e r v e r . accept ( ) ; OutputStream out = n e x t C l i e n t . getOutputStream ( ) ; new P r i n t S t r e a m ( out ) . p r i n t (new j a v a . u t i l . Date ( ) ) ; out . c l o s e ( ) ; nextClient . close ( ) ; // Ende d e r S o c k e t −Verbindung } } catch ( BindException be ) { System . out . p r i n t l n ( ” S e r v i c e a l r e a d y r u n n i n g on p o r t 60001 ” ) ; } catch ( IOException i o e ) { System . out . p r i n t l n ( ” I /O e r r o r : ”+i o e ) ; } } } 6.5 Socket-Optionen • Zugriff/Einstellung über Socket-Methoden, d. h. Methoden der Klasse Socket • SO_KEEPALIVE: setKeepAlive(true) ⇒ regelmäßiges Pollen (Anfragen) zum Verbindungstest • SO_RCVBUF: setReceiveBuffersize(size) ⇒ Betriebssystempuffer wird in seiner Größe beeinflusst (size ist in Byte) • SO_SNDBUF: setSendBuffersize(size) ⇒ Betriebssystempuffer wird in seiner Größe beeinflusst (size ist in Byte) • SO_LINGER: setSoLinger(true,50) ⇒ auch beim Schließen des sockets wird noch für 50 Millisekunden versucht, die Daten zu senden • TCP_NODELAY ⇒ Veränderung des TCP-Protokolls, so dass keine Transmissionskontrolle (Warten, bis ein Paket voll ist, bevor es gesendet wird) mehr stattfindet ACHTUNG: die Performance kann sinken • SO_TIMEOUT: setSoTimeout(int time) ⇒ Leseoperation versucht time Millisekunden zu lesen. Eventuell tritt eine InterruptedIOException auf, falls keine Antwort kommt (time=0 entspricht keinem Timeout). Listing 32: Beispiel 1 2 3 4 5 6 7 try { s . setSoTimeout ( 2 0 0 0 ) ; . . . l e s e aus S o c k e t s . . . } catch ( I n t e r r u p t e d I O E x c e p t i o n i o e ) { System . e r r . p r i n t l n ( ” S e r v e r a n t w o r t e t n i c h t ! ” ) ; } catch ( IOException i o ) { . . . } 6.6 Sockets in Erlang In der Vorlesung haben wir die TCP-Kommunikation über Sockets in Erlang kennen gelernt. Die Prinzipien sind natürlich identisch, mit denen in Java. Als Beispielanwendung haben wir einen einfachen Tupelserver entwickelt, welchen wir später auch als Registry verwenden können: 40 Listing 33: TCP-Version eines Key-Value-Servers in Erlang 1 2 −module ( t c p ) . −e x p o r t ( [ s e r v e r / 0 , c l i e n t / 0 , r e q u e s t H a n d l e r / 3 ] ) . 3 4 5 6 7 8 s e r v e r ( ) −> {ok , LSock } = g e n t c p : l i s t e n ( 5 0 4 2 , [ l i s t , { packet , l i n e } , { r e u s e a d d r , true } ] ) , DB = spawn ( k e y V a l u e S t o r e , s t a r t , [ ] ) , acceptLoop ( LSock ,DB) . 9 10 11 12 13 14 acceptLoop ( LSock ,DB) −> spawn ( tcp , r e q u e s t H a n d l e r , [ LSock ,DB, s e l f ( ) ] ) , receive nex t −> acceptLoop ( LSock ,DB) end . 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 r e q u e s t H a n d l e r ( LSock ,DB, Parent ) −> {ok , Sock } = g e n t c p : a c c e p t ( LSock ) , Parent ! next , receive { tcp , Sock , S t r } −> case l i s t s : s p l i t w i t h ( fun (C) −> C/=44 end , l i s t s : d e l e t e ( 1 0 , S t r ) ) o f { ” s t o r e ” , [ | Arg ] } −> {Key , [ | Value ] } = l i s t s : s p l i t w i t h ( fun (C) −> C/=44 end , Arg ) , DB! { s t o r e , Key , Value } ; { ” loo kup ” , [ | Key ] } −> DB! { lookup , Key , s e l f ( ) } , receive Ans −> g e n t c p : send ( Sock , b a s e : show ( Ans)++” \n” ) end end , g e n t c p : c l o s e ( Sock ) ; Other −> b a s e : p r i n t ( Other ) end . 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 c l i e n t ( ) −> case b a s e : g e t L i n e ( ” ( s ) t o r e , ( l ) ookup : ” ) o f ” s ” −> Key = b a s e : g e t L i n e ( ”Key : ” ) , Value = b a s e : g e t L i n e ( ” Val : ” ) , {ok , Sock } = g e n t c p : c o n n e c t ( ” l o c a l h o s t ” , 5 0 4 2 , [ l i s t , { packet , l i n e } ] ) , g e n t c p : send ( Sock , ” s t o r e , ”++Key++” , ”++Value++” \n” ) , g e n t c p : c l o s e ( Sock ) ; ” l ” −> Key = b a s e : g e t L i n e ( ”Key : ” ) , {ok , Sock } = g e n t c p : c o n n e c t ( ” l o c a l h o s t ” , 5 0 4 2 , [ l i s t , { packet , l i n e } , { active , false }]) , g e n t c p : send ( Sock , ” lookup , ”++Key++” \n” ) , {ok , Res } = g e n t c p : r e c v ( Sock , 0 ) , b a s e : putStrLn ( Res ) , g e n t c p : c l o s e ( Sock ) end . 6.7 Verteilung von Kommunikationsabstraktionen Nachdem wir nun verstanden haben, wie eine verteilte Kommunikation über TCP funktioniert, können wir uns überlegen, wie Kommunikationsabstraktionen transparent in ein verteiltes System eingebettet werden können. Hierbei ist es egal, ob es sich um Pids in Erlang, Remoteobjekte bei RMI oder Ähnliches handelt. Man benötigt eine externe Darstellung des entsprechenden Objekts im Netzwerk, welche über IP-Adresse des Rechners, einen Port und ggf. noch eine weitere Identifikationsnummer, einen Zugriff auf das entsprechende Objekt von Remote ermöglicht. Wird ein entsprechendes (lokales) Objekt serialisiert, wandelt man es in seine Remote-String-Darstellung um und verschickt diese. Alle Kommunikation auf ein Remote-Objekt wird dann über TCP serialisiert, wodurch entsprechend auch wieder andere Kommunikationsabstraktionen verteilt werden 41 können. Als Beispiel hierfür haben wir in Vorlesung und Übung Channels in Java, Erlang bzw. Haskell für einen Zugriff von Remote erweitert. Hierbei haben wir insbesondere die folgenden Aspekte diskutiert und realisiert: • Verwendung des Key-Value-Stores als Registry • Schreiben eines Channels von Remote • Optimierung der Kommunikation durch Zusammenlegen der Kommunikation auf einen Port • Garbage Collection von nicht mehr verwendeten Channels • Lesen aus einem remote Channel • Realisierung von Linking 7 Synchronisation durch Tupelräume Ansatz ohne Kommunikation zwischen zwei Partnern (Server/Client) Linda: Kommunikation durch Bekanntmachen und Abfragen von Tupeln. Das grundlegende Modell sieht wie folgt aus: • zentraler Speicher (ggf. auch mehrere) Tupelraum als Multimenge von Tupeln • viele unabhängige Prozesse, die Tupel in den Tupelraum hinzufügen oder Tupel aus diesem lesen (entfernen) können • sprachunabhängig: Linda ist ein Kommunikationsmodell, das zu verschiedenen Sprachen hinzugefügt werden kann (z. B. C-Linda, Fortran-Linda, Scheme-Linda, Java-Spaces) Hierbei basiert Linda auf folgenden Grundoperationen: • out(t) ⇒ fügt das Tupel t in den Tupelraum ein. Berechnungen in t werden vor dem Einfügen ausgewertet. Beispiel: out(("hallo",1+3,6.0/4.0)) fügt das Tupel ("hallo",4,1.5) in den Tupelraum ein. • in(t) ⇒ entfernt das Tupel t aus dem Tupelraum, falls t dort vorhanden ist. Ansonsten suspendiert es, bis t im Tupelraum eingefügt wurde. t kann auch Variablen enthalten (Pattern Matching). Diese parsen immer und werden an entsprechende Werte aus dem Tupelraum gebunden. Beispiel: in(("hallo",?i,?f)) entfernt obig eingefügtes Tupel und bindet i/4 und f/1.5 • rd(t) ⇒ wie in(t), aber das Tupel wird nicht entfernt • eval(stmt) ⇒ erzeugt einen neuen Prozess, der stmt auswertet • inp(t), rdp(t) ⇒ wie in, rd, aber ohne Blockade, falls das Tupel nicht vorhanden ist. Als Rückgabewert erhält man einen boolschen Wert. Dieser ist false, falls t nicht im Tupelraum vorhanden ist. • Beispiel: zwei Prozesse, die abwechselnd aktiv sind (ping-pong) in Linda 1 2 3 4 5 6 1 2 3 4 5 6 ping ( ) { while ( true ) { out ( ” p i n g ” ) ; i n ( ” pong ” ) ; } } pong ( ) { while ( true ) { in ( ” ping ” ) ; out ( ” pong ” ) ; } } 42 Listing 34: Hauptprogramm 1 2 3 4 main ( ) { eval ( ping ( ) ) ; e v a l ( pong ( ) ) ; } Es ist aber auch möglich, Datenstrukturen durch Tupel darzustellen: Interpretiere bestimmte Werte als Referenzen im Speicher (Tupelraum), z. B. durch Integer-Werte. Dann ist z. B. ein Array kodierbar als: {("A",1,r1),("A",2,r2),...,("A",n,rn);...} Der Zugriff auf einzelne Arraykomponenten kann dann ermöglicht werden. Z. B. können wir den 4. Index im Array wie folgt mit drei multiplizieren: in("A",4,?e); out("A",4,3*e); Anwendung: verteilte und parallele Bearbeitung komplexer Datenstrukturen Listing 35: Rechenoperation auf jedem Element 1 2 3 4 5 f o r ( i =1; i<=n ; i ++) { e v a l ( f ( i ) ) ; } f ( int i ) { i n ( ”A” , i , ? r ) ; out ( ”A” , i , compute ( r ) ) ; } Dies ist nur sinnvoll, wenn compute aufwendig ist und die Verteilung auf mehrere Prozessoren möglich ist. Formulierung von Synchronisationsproblemen: Kodierung der dinierenden Philosophen 1 2 3 4 5 6 7 8 9 10 p h i l ( int i ) { while ( true ) { think ( ) ; in ( ” stab ” , i ) ; i n ( ” s t a b ” , ( i +1)% j ) ; eat ( ) ; out ( ” s t a b ” , i ) ; out ( ” s t a b ” , ( i +1)% j ) ; } } Listing 36: Initialisierung 1 2 3 4 f o r ( i =0; i <s ; i ++) { out ( ” s t a b ” , i ) ; eval ( phil ( i ) ) ; } Eine Client/Server-Kommunikation ist auch modellierbar Idee: Zur Zuordnung der Antworten zu Anfragen werden von dem Server eindeutige ID’s verwendet: 1 2 3 4 5 6 7 8 9 10 server () { i n t i =1; eval ( idserver ( 1 ) ) ; while ( true ) { in ( ” request ” , i , ? req ) ; ... out ( ” r e s p o n s e ” , i , r e s ) ; i ++; } } 43 1 2 3 4 5 6 7 1 2 3 4 5 6 i d s e r v e r ( int i ) { while ( true ) { i n ( ”newID” ) ; out ( ”newID” , i ) ; i ++; } } Client () { int id ; out ( ”newID” ) ; i n ( ”newID” , ? i d ) ; out ( ” r e q u e s t ” , ? id , 1 ) ; i n ( ” r e s p o n s e ” , id , ? r e s 1 ) ; 7.1 Java-Spaces Linda in Java out t space.write(Entry e, Transaction tr, long lease) mit: • Entry e als Interface für Einträge in den Tupelspace • Transaction tr als Transaktionskonzept zur atomaren Ausführung mehrerer Modelle von Tupelräumen, sonst null • long lease gibt die Zeit an, wie lange ein Tupelraum verbleibt in t space.take(Entry e, Transaction tr, long lease) mit: • Entry e als Template (Pattern) • long lease als maximale Suspensionszeit das Template kann auch Variablen/Wildcards in Form von null enthalten. Das Template matcht den Eintrag im Space, falls: 1. das Template den gleichen Typ oder Obertyp wie der Eintrag hat 2. alle Felder des Templates matchen: – Wildcard (null) matcht immer – Werte matchen nur gleiche Werte Das Matchen der Variablen erfolgt durch das Ersetzen von null durch den korrekten Wert. rd(t) space.read(...) eval(stmt) Java Threads inp(t), rdp(t) über lease=0 Beachte: Die Objektidentität bleibt in Space nicht erhalten. Objekte werden (de-)serialisiert, da es sich häufig um verteilte Anwendungen handelt. Spaces können lokal oder im Netzwerk gestartet werden. Sie können persistent sein. Sie können über Jini-lookup-Sevice oder RMI-Registrierung oder andere gestartet werden. 7.2 Implementierung von Tupelräumen in Erlang Um zu verstehen, wie Tupelräume realisiert werden können, betrachten wir eine prototypische Implementierung in Erlang. Erlang verfügt zwar auch über Pattern Matching, Pattern sind aber keine Bürger erster Klasse und können nicht als Nachrichten verschickt werden. Es ist in Erlang aber möglich, Funktionen zu verschicken. Somit können wir recht einfach partielle Funktionen verschicken, welche das Pattern Matching im Tupelraum realisieren. Als Beispiel können wir das Pattern {hallo, X, Y} durch die Funktionsdefinition 1 fun ( { h a l l o , X,Y) −> {X,Y} end realisieren. Hierbei liefern wir die Bindungen als Paar der gebundenen Variablen zurück. Die Partialität der Funktion verwenden wir um das Nicht-Matchen des Patterns auszudrücken. Im Server können wir diesen Fall mit Hilfe eines catch-Konstrukts erkennen und entsprechend verfahren. 44 Im Tupel-Server benötigen wir zwei Strukturen (Listen) zur Speicherung von Anfragen. In der einen Liste werden alle Werte im Tupelraum gespeichert. Neue in- bzw. rd-Requests werden zunächst gegen alle Werte gemacht und ggf. direkt beantwortet. Sollte kein passender Wert gefunden werden, speichern wir die Requests in einer zweiten Liste. Wird später dann ein neuer Wert eingefügt, wird dieser entsprechend für alle offenen Requests überprüft und ggf. eine Antwort zurück geschickt. Die konkrete Implementierung ergibt sich wie folgt: Listing 37: Erlang Implementierung des Linda-Modells 1 2 −module( l i n d a ) . −export ( [ s t a r t / 0 , out / 2 , i n / 2 , rd /2 ] ) . 3 4 5 s t a r t ( ) −> r e g i s t e r ( l i n d a , s e l f ( ) ) , lindaLoop ( [ ] , [ ] ) . 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 l i n d a L o o p ( Ts , Reqs ) −> receive {out , T} −> {Reqs1 , KeepT} = findMatchingReq (T, Reqs ) , case KeepT of true −> l i n d a L o o p ( [T | Ts ] , Reqs1 ) ; f a l s e −> l i n d a L o o p ( Ts , Reqs1 ) end ; {InRd , F , P} −> case f i n d M a t c h i n g T u p l e (F , Ts , InRd ) of nothing −> l i n d a L o o p ( Ts , [ {F , P , InRd} | Reqs ] ) ; { j u s t , {Match , Ts1}} −> P! {tupleMatch , Match} , l i n d a L o o p ( Ts1 , Reqs ) end end . 21 22 23 24 25 26 27 28 29 30 31 32 33 findMatchingReq ( , [ ] ) −> { [ ] , true} ; findMatchingReq (T, [ Req | Reqs ] ) −> {F , P , InRd} = Req , case catch F(T) of { ’EXIT ’ , } −> {Reqs1 , KeepT} = findMatchingReq (T, Reqs ) , { [ Req | Reqs1 ] , KeepT} ; Match −> P! {tupleMatch , Match} , case InRd of i n −> {Reqs , f a l s e } ; rd −> findMatchingReq (T, Reqs ) end end . 34 35 36 37 38 39 40 41 42 43 44 45 46 f i n d M a t c h i n g T u p l e ( , [ ] , ) −> n o t h i n g ; f i n d M a t c h i n g T u p l e (F , [T | Ts ] , InRd ) −> case catch F(T) of { ’EXIT ’ , } −> case f i n d M a t c h i n g T u p l e (F , Ts , InRd ) of nothing −> n o t h i n g ; { j u s t , {M1, Ts1}} −> { j u s t , {M1, [T | Ts1 ] }} end ; Match −> case InRd of i n −> { j u s t , {Match , Ts}} ; rd −> { j u s t , {Match , [T | Ts ] }} end end . 47 48 out ( Space , T) −> Space ! {out , T} . 49 50 51 52 53 i n ( Space , F) −> Space ! { in , F , s e l f ( ) } , receive {tupleMatch , Match} −> Match end . 54 55 56 rd ( Space , F) −> Space ! {rd , F , s e l f ( ) } , receive 45 57 58 {tupleMatch , Match} −> Match end . 8 Spezifikation und Testen von Systemeigenschaften Das Testen von Systemeigentschaften von nebenläufigen Systemen geht über Ansätze, wie Unit-Tests oder Zusicherungen im Quellcode hinaus. Man ist in der Regel daran interessiert, globale Systemeigenschaften zu analysieren, die sich aus den Zuständen der einzelnen Prozesse zusammen setzen. Als Beispiel können wir noch einmal auf das nebenläufige Verändern eines geteilten Zustands zurückkommen, hier als Client-Server-Struktur in Erlang: Listing 38: Kritischer Bereich 1 2 −module ( c r i t i c a l ) . −e x p o r t ( [ s t a r t / 0 ] ) . 3 4 5 6 s t a r t ( ) −> S = spawn ( fun ( ) −> s t o r e ( 4 2 ) end ) , spawn ( fun ( ) −> i n c ( S ) end ) , dec ( S ) . 7 8 9 10 11 s t o r e (V) −> r e c e i v e { lookup , P} −> P ! V, s t o r e (V ) ; { s e t , V1} −> s t o r e (V1) end . 12 13 14 15 16 17 i n c ( S ) −> S ! { lookup , s e l f ( ) } , receive V −> S ! { s e t ,V+1} end , inc (S ) . 18 19 20 21 22 23 dec ( S ) −> S ! { lookup , s e l f ( ) } , receive V −> S ! { s e t , V−1} end , dec ( S ) . Hierbei stellen die Zustände, bevor der Zustand neu geschireben wird sicherlich kritische Zustände da. Eigentlich sollten beide Client-Prozesse nie gleichzeitig in diesen Zuständen sein, da dann der Speicher in Abhängigkeit eines veralteten Wertes verändert wird. Solche Eigenschaften können elegant mit Hilfe von Temporallogiken ausgedrückt werden. 9 Linear Time Logic (LTL) Die Lineare temporalale Logik (LTL) ermöglicht es, Eigenschaften von Pfaden eines Systems auszudrücken. Ihre Syntax ist wie folgt definiert: Syntax der Linear Time Logic (LTL) Sei Props eine (endliche) Menge von Zustandspropositionen. Die Menge der LTL-Formeln ist definiert als kleinste Menge mit: • Props ⊆ LT L • ϕ, ψ ∈ LT L =⇒ − − − − Zustandspropositionen ¬ϕ ∈ LT L ϕ ∧ ψ ∈ LT L Xϕ ∈ LT L ϕ U ψ ∈ LT L 46 Negation Konjunktion Im nächsten Zustand gilt ϕ ϕ gilt bis ψ gilt Eine LTL-Formel wird bzgl eines Pfades interpretiert. Propositionen gelten, wenn der erste Zustand des Pfades die Propositionen erfüllt. Der Modaloperator Next Xϕ ist erfüllt, wenn ϕ auf dem Pfad ab dem nächsten Zustand gilt. Außerdem enthält LTL den Until-Operator, welcher erfüllt ist, wenn ϕ solange gilt, bis ψ gilt. Hierbei mus ψ aber tatsächlich irgendwann gelten. Formal ist die Semantik wie folgt definiert: Pfadsemantik von LTL Ein unendliches Wort über Propositionsmengen π = p0 p1 p2 . . . ∈ P(Props)ω heißt Pfad. Ein Pfad π erfüllt eine LTL-Formel ϕ (π |= ϕ) wie folgt: p0 π π π p0 π p0 p1 . . . |= |= |= |= |= : ⇐⇒ : ⇐⇒ : ⇐⇒ : ⇐⇒ : ⇐⇒ P ¬ϕ ϕ∧ψ Xϕ ϕU ψ P ∈ p0 π 6|= ϕ π |= ϕ ∧ π |= ψ π |= ϕ ∃i ≥ 0 : pi pi+1 . . . |= ψ ∧ ∀j < i : pj pj+1 . . . |= ϕ Ausgehend vom Kern von LTL können wir praktische Abkürzungen defnieren, welche es häufig einfacher machen, Eigentschaften zu spezifizieren. Abkürzungen in LTL f alse true ϕ∨ψ ϕ→ψ F ϕ Gϕ F ∞ϕ G∞ ϕ ϕWψ ϕ Rψ := := := := := := := := := := ¬P ∧ P 5 ¬f alse ¬(¬ϕ ∧ ¬ψ) ¬ϕ ∨ ψ true U ϕ ¬F ¬ϕ GF ϕ F Gϕ (ϕ U ψ) ∨ (G ϕ) ¬(¬ϕ U ¬ψ) der Boolesche Wert false der Boolesche Wert true Disjunktion Implikation Irgendwann (Finally) gilt ϕ Immer (Globaly) gilt ϕ Unendlich oft gilt ϕ Nur endlich oft gilt ϕ nicht Schwaches Until (weak until) ϕ lässt ψ frei (release) Das Release ist hierbei sicherlich nicht ganz so verständlich, wird aber bei der Implementierung recht nützlich sein. Ausgehend von der Pfadsemantik, erfüllt ein System eine LTL-Formel, wenn jeder Pfad des Systems die LTL-Formel erfüllt. Formal werden Systeme hierbei als Kripke-Strukturen repräsentiert: Kripke-Struktur K = (S, Props, −→, τ, s0 ), wobei • S eine Menge von Zuständen, • Props eine Menge von Propositionen, • −→⊆ S × S die Transitionsrelation, • τ : S −→ P(Props) eine Beschriftungsfunktion für die Zustände und • s0 ∈ S der Startzustand sind, heißt Kripke-Struktur. Anstatt (s, s0 ) ∈−→ schreibt man in der Regel s −→ s0 . Ein Zustandspfad von K ist ein unendliches Wort s0 s1 . . . ∈ S ω mit si −→ si+1 und s0 der Startzustand von K. Wenn s0 s1 . . . ein Zustandspfad von K und pi = τ (si ) für alle i ≥ 0, dann ist das unendliche Wort p0 p1 . . . ∈ P(Props)ω ein Pfad von K. Kripke-Struktur-Semantik von LTL Sei K = (S, →, τ, s0 ) eine Kripke-Struktur. K erfüllt eine LTL-Formel ϕ (K |= ϕ) genau dann, wenn für alle Pfade π von K: π |= ϕ. 47 Mit Hilfe von LTL können im Wesentlichen drei unterschiedliche Arten von Eigenschaften definiert werden: • Sicherheitseigenschaften (Safety): Solche Eigenschaften haben in der Regel die Form: G ¬ ϕ Ein mögliches Beispiel für ϕ kann für zwei Prozesse, für welche der wechselseitige Ausschluss gewährleistet sein soll cs1 ∧ cs2 sein, wenn der Prozess i ∈ {1, 2} seinen kritischen Bereich durch die Proposition csi anzeigt. • Lebendigkeitseigenschaften (Liveness): Diese Eigenschaften haben in der Regel die Form: F ϕ, wobei sie oft auch nur bedingt gelten sollen, z.B. in einer Implikation. Ein Beispiel ist, die Antwort (answ) auf einen Request (req) eines Servers: G (req −→ F answ). Hierbei ist die eigentlich Lebendigkeitseigenschaft nur erwünscht, wenn eine Anfrage erfolgte. Mit Hilfe von G fordern wir, dass diese Eigenschaft überall im System gelten soll. • Fairness: Fairness kann als F ∞ ϕ beschrieben werden. Hiermit können wir ausdrücken, dass jeder beteiligte Prozess unendlich oft dran kommt und vom Scheduler nicht ausgeschlossen wird. Man verwendet Fairnesseingenschaften in der Regel als Vorbedingung einer Implikation, um “unfaire” Pfade nicht zu betrachten. Dies entspricht der Beschränkung auf präemptive Scheduler, welche wir einleitend in Betracht gezogen haben. 9.1 Implementierung von LTL zum Testen LTL wurde ursprünglich als Spezifikationslogik entwickelt, welche dann mit Hilfe von Model Checking gegenüber einer endlichen Kripke-Struktur (Systembeschreibung) überprüft werden kann. Das Model Checking ist aber für reale Systeme im Allgemeinen unentscheidbar, wie folgendes Erlang Beispiel zeigt: Listing 39: Unentscheidbarkeit von LTL 1 s t a r t ( ) −> f ( ) , prop ( t e r m i n a t e d ) . Hierbei soll prop(terminated) bedeuten, dass das Programm nachdem der Funktionsaufruf f() beendet wurde, in einen Zustand überführt wird, in dem die Proposition terminated gilt. Will man für dieses Programm die LTL-Formel F terminated entscheiden, so müsste man ja entscheiden, ob die Funktion f() terminiert. Da Erlang ja eine universelle Programmiersprache ist, ist dies aber direkt ein Widerspruch zum Halteproblem und somit im Allgemeinen unentscheidbar. Auch wenn die formale Verifikation also nicht möglich ist, kann es aber dennoch sinnvoll sein, LTLEigenschaften zu testen. Inwieweit die sich ergebenden Aussagen zu interpretieren sind, werden wir später noch besprechen. Zunächst wollen wir den Ansatz zum LTL-Testen implementieren. Als ersten Schritt müssen wir uns eine geeignete Darstellung von LTL-Formeln in Erlang überlegen. Wir beschänken und hier auf einen Teil der LTL-Operatoren. Die restlichen Operatoren sollen als Übung ergänzt werden. Bei der Negation des Untils ist die Verwendung des Release hilfreich, da dies gerade als “Negation” des Until definiert ist. Listing 40: LTL-Implementierung in Erlang 1 −module ( l t l ) . 2 3 −e x p o r t ( [ prop / 1 , d i s j / 2 , c o n j / 2 , neg / 1 , x / 1 , f / 1 , g / 1 ] ) . 4 5 6 7 8 9 10 11 prop ( Phi ) −> { prop , Phi } . neg ( Phi ) −> { neg , Phi } . d i s j ( Phi , P s i ) −> { d i s j , Phi , P s i } . c o n j ( Phi , P s i ) −> { c o n j , Phi , P s i } . x ( Phi ) −> {x , Phi } . g ( Phi ) −> {g , Phi } . f ( Phi ) −> { f , Phi } . 12 48 13 14 15 16 17 18 19 20 showLTL ( { prop , P} ) −> b a s e : show (P ) ; showLTL ( { d i s j , Phi , P s i } ) −> ” ( ”++showLTL ( Phi)++” o r ”++showLTL ( P s i)++” ) ” ; showLTL ( { c o n j , Phi , P s i } ) −> ” ( ”++showLTL ( Phi)++”and”++showLTL ( P s i)++” ) ” ; showLTL ( { neg , Phi } ) −> ” ( neg ”++showLTL ( Phi)++” ) ” ; showLTL ( { x , Phi } ) −> ”X”++showLTL ( Phi ) ; showLTL ( { f , Phi } ) −> ”F”++showLTL ( Phi ) ; showLTL ( { g , Phi } ) −> ”G”++showLTL ( Phi ) ; showLTL ( Phi ) −> b a s e : show ( Phi ) . Das erste Problem bei der Implementierung stellt die Negation dar. Diese ist über die Negation auf der mathematischen Meta-Ebene definiert und so nur schwer zu implementieren. Es ist aber möglich, LTL-Formeln so äquivalent umzuformen, dass die Negation nur noch direkt vor Propositionen auftritt. Die Negation wird vom Prinzip nach Innen geschoben. Listing 41: LTL-Implementierung in Erlang 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 n o r m a l i z e ( true ) −> true ; n o r m a l i z e ( f a l s e ) −> f a l s e ; n o r m a l i z e ( {prop , P} ) −> prop (P ) ; n o r m a l i z e ( { d i s j , Phi , P s i } ) −> d i s j ( n o r m a l i z e ( Phi ) , n o r m a l i z e ( P s i ) ) ; n o r m a l i z e ( { c o n j , Phi , P s i } ) −> c o n j ( n o r m a l i z e ( Phi ) , n o r m a l i z e ( P s i ) ) ; n o r m a l i z e ( {x , Phi} ) −> x ( n o r m a l i z e ( Phi ) ) ; n o r m a l i z e ( { f , Phi} ) −> f ( n o r m a l i z e ( Phi ) ) ; n o r m a l i z e ( {g , Phi} ) −> g ( n o r m a l i z e ( Phi ) ) ; n o r m a l i z e ( {neg , true} ) −> f a l s e ; n o r m a l i z e ( {neg , f a l s e } ) −> true ; n o r m a l i z e ( {neg , {prop , P}} ) −> neg ( prop (P ) ) ; n o r m a l i z e ( {neg , { d i s j , Phi , P s i }} ) −> c o n j ( n o r m a l i z e ( neg ( Phi ) ) , n o r m a l i z e ( neg ( P s i ) ) ) ; n o r m a l i z e ( {neg , { c o n j , Phi , P s i }} ) −> d i s j ( n o r m a l i z e ( neg ( Phi ) ) , n o r m a l i z e ( neg ( P s i ) ) ) ; n o r m a l i z e ( {neg , {x , Phi}} ) −> x ( n o r m a l i z e ( neg ( Phi ) ) ) ; n o r m a l i z e ( {neg , { f , Phi}} ) −> g ( n o r m a l i z e ( neg ( Phi ) ) ) ; n o r m a l i z e ( {neg , {g , Phi}} ) −> f ( n o r m a l i z e ( neg ( Phi} ) ) . Als Nächstes stellen wir eine Funktion zur Verfügung, welche anhand der gültigen Propositionen Formeln auf ihre Gültigkeit im aktuellen Zustand überprüft. Mögliche Ergebnisse sind hierbei true, f alse und noch nicht entschieden. Im letzteren Fall erhalten wir eine Formel, welche wir auf dem weiteren Pfad überprüfen müssen. Listing 42: LTL-Implementierung in Erlang 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 c h e ck ( true , P r o p s ) −> true ; c h e ck ( f a l s e , P r o p s ) −> f a l s e ; c h e ck ( {prop , P} , Props ) −> l i s t s : member (P , Props ) ; c h e ck ( {neg , {prop , P}} , Props ) −> not ( check ( prop (P) , Props ) ) ; c h e ck ( { c o n j , Phi , P s i } , Props ) −> case check ( Phi , Props ) of true −> check ( Psi , Props ) ; f a l s e −> f a l s e ; Phi1 −> case check ( Psi , Props ) of true −> Phi1 ; f a l s e −> f a l s e ; P s i 1 −> c o n j ( Phi1 , P s i 1 ) end end ; c h e ck ( { d i s j , Phi , P s i } , Props ) −> case check ( Phi , Props ) of true −> true ; f a l s e −> check ( Psi , Props ) ; Phi1 −> case check ( Psi , Props ) of true −> true ; f a l s e −> Phi1 ; P s i 1 −> d i s j ( Phi1 , P s i 1 ) end end ; c h e ck ( {x , Phi} , P r o p s ) −> x ( Phi ) ; 49 27 28 29 30 c h e ck ( { f , Phi} , Props ) −> check ( d i s j ( Phi , x ( f ( Phi ) ) ) , Props ) ; c h e ck ( {g , Phi} , Props ) −> check ( c o n j ( Phi , x ( g ( Phi ) ) ) , Props ) ; c h e ck ( Phi , P r o p s ) −> b a s e : putStrLn ( ” Unexpected f o r m u l a i n check : ”++showLTL ( Phi ) ) . Auf Grund der hier verwendeten dreiwertigen Logik, können wir Konjunktion und Disjunktion nicht auf die entsprecheden Booleschen Operationen zurückführen, sondern müssen sie selbst implementieren. Die Formel X ϕ können wir nicht entscheiden und geben sie unverändert zurück. Bei den Formeln F ϕ und G ϕ wickeln wir den Fixpunkt, über den diese Formeln definiert sind ab. Dies ist auf Grund der folgenden Äquivalenzen möglich: π |= F ϕ gdw. π |= ϕ ∨ X F ϕ π |= G ϕ gdw. π |= ϕ ∧ X G ϕ Beachte, dass die sich ergebenden Formeln nur true, f alse, X ϕ, Disjunktionen oder Konjunktionen sein können. F , G und prop sind nur unterhalb eines X möglich. Für den Fall, dass das System einen Schritt macht, können wir nun in den Formeln das X realisieren: Listing 43: LTL-Implementierung in Erlang 1 2 3 4 s t e p ( {x , Phi} ) −> Phi ; s t e p ( { c o n j , Phi , P s i } ) −> c o n j ( s t e p ( Phi ) , s t e p ( P s i ) ) ; s t e p ( { d i s j , Phi , P s i } ) −> d i s j ( s t e p ( Phi ) , s t e p ( P s i ) ) ; s t e p ( Phi ) −> b a s e : putStrLn ( ” Unexpected f o r m u l a i n s t e p : ”++showLTL ( Phi ) ) . Die letzte Regel sollte eigentlich nicht auftreten und dient nur dem Erkennen von Programm-Fehlern. Nun können wir der Server, welcher für das Verwalten und Überprüfen der LTL-Formeln zuständig ist, implementieren. Er wird global unter dem Atom ltl registriert, damit seine Pid nicht in jeden Prozess propagiert werden muss und Anwendungen somit einfacher erweitert werden können. Als Zustand speichern wir in diesem Prozess eine Liste von zur Zeit gültigen Propositionen, sowie die Formeln, welche in den weiteren Zuständen noch überprüft werden müssen. Außerdem speichern wir die ursprünglichen mit assert hinzugefügten Formeln, um diese im Fall einer gefundenen Verletzung ausgeben zu können. Alternaitv könnte man hier sicherlich auch einen String, der die Eigenschaft geeignet benennt hinterlegen. Bei den Propositionen ermöglichen wir es den Prozessen beliebige Propositionen zu setzen bzw. zu löschen. Zur Vereinfachung vermeiden wir doppelte Vorkommen, da wir die Propositionsmenge als einfache Liste implementieren. Zur einfacheren Kommunikation mit dem LTL-Server, stellen wir geeignete Schnittstellen-Funktionen zur Verfügung. Erkennen wir die Verletzung einer Assertion, geben wir eine Warnmeldung aus. Diese sollte besser in eine spezielle Datei geschrieben werden, da sie sonst in anderen Ein-/Ausgaben des eigentlichen Programms untergeht. Listing 44: LTL-Implementierung in Erlang 1 2 s t a r t ( ) −> LTL = spawn( fun ( ) −> l t l ( [ ] , [ ] , [ ] ) end ) , r e g i s t e r ( l t l , LTL ) . 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 l t l ( Phis , A s s e r t s , Props ) −> receive { a s s e r t , Phi} −> case check ( n o r m a l i z e ( Phi ) , Props ) of true −> l t l ( Phis , A s s e r t s , Props ) ; f a l s e −> ptong ( Phi ) , l t l ( Phis , A s s e r t s , Props ) ; Phi1 −> l t l ( [ Phi1 | P h i s ] , [ Phi | A s s e r t s ] , Props ) end ; {newProp , P} −> NewProps=[ P | Props ] , P h i s 1 = l i s t s : map( fun ( Phi ) −> check ( s t e p ( Phi ) , NewProps ) end , P h i s ) , {Phis2 , A s s e r t s 2 } = a n a l y z e ( Phis1 , A s s e r t s ) , l t l ( Phis2 , A s s e r t s 2 , NewProps ) ; { r e l e a s e P r o p , P} −> 50 18 19 20 21 22 23 24 25 26 27 28 29 NewProps= l i s t s : d e l e t e (P , Props ) , P h i s 1 = l i s t s : map( fun ( Phi ) −> check ( s t e p ( Phi ) , NewProps ) end , P h i s ) , {Phis2 , A s s e r t s 2 } = a n a l y z e ( Phis1 , A s s e r t s ) , l t l ( Phis2 , A s s e r t s 2 , NewProps ) ; s t a t u s −> b a s e : putStrLn ( ” Unevaluated A s s e r t i o n s : ” ) , l i s t s : z i p w i t h ( fun ( Phi , A s s e r t ) −> b a s e : putStrLn ( showLTL ( A s s e r t ) ) , b a s e : putStrLn ( ” ”++showLTL ( Phi ) ) end , Phis , A s s e r t s ) , l t l ( Phis , A s s e r t s , Props ) end . 30 31 32 33 34 a s s e r t ( Phi ) −> l t l ! { a s s e r t , Phi} . newProp (P) −> l t l ! {newProp , P} . r e l e a s e P r o p (P) −> l t l ! { r e l e a s e P r o p , P} . s t a t u s ( ) −> l t l ! s t a t u s . 35 36 37 38 39 40 41 42 43 44 a n a l y z e ( [ ] , [ ] ) −> { [ ] , [ ] } ; a n a l y z e ( [ true | P h i s ] , [ A s s e r t | A s s e r t s ] ) −> a n a l y z e ( Phis , A s s e r t s ) ; a n a l y z e ( [ f a l s e | P h i s ] , [ A s s e r t | A s s e r t s ] ) −> potong ( A s s e r t ) , a n a l y z e ( Phis , A s s e r t s ) ; a n a l y z e ( [ Phi | P h i s ] , [ A s s e r t | A s s e r t s ] ) −> {Phis1 , A s s e r t s 1 } = a n a l y z e ( Phis , A s s e r t s ) , { [ Phi | P h i s 1 ] , [ A s s e r t | A s s e r t s 1 ] } . 45 46 47 ptong ( Phi ) −> b a s e : putStrLn ( ” A s s e r t i o n v i o l a t e d : ”++showLTL ( Phi ) ) , e x i t ( −1). Die Funktion analyze löscht erfüllte und wiederlegte Formeln aus unseren Listenstrukturen. Im Fehlerfall gibt sie außerdem eine entsprechende Fehlermeldung mittels ptong/1 aus. Abschließend geben wir dem Benutzer mit der Funktion status noch die Möglichkeit alle noch nicht entschiedenen Formeln auszugeben. Als Beispiel für die Benutzung der LTL-Bibliothek betrachten wir den wechselseitigen Ausschluss zweier Prozesse. Auf Grund des Kommunikationsmodells von Erlang ist es nicht so einfach, überhaupt einen kritischen Bereich zu definieren, in dem sich zwei Prozesse befinden. Wir verwenden hierzu einen Speicher(-Prozess), auf dem zwei Prozesse nebenläufig Veränderungen vornehmen: Listing 45: LTL-Implementierung in Erlang 1 2 −module( c r i t i c a l ) . −export ( [ s t a r t /0 ] ) . 3 4 5 6 7 8 s t a r t ( ) −> t r y l t l : s t a r t ( ) catch −> 42 end , \% F a l l s LTL−S e r v e r schon l a e u f t l t l : a s s e r t ( l t l : g ( l t l : neg ( l t l : c o n j ( l t l : prop ( c s 1 ) , l t l : prop ( c s 2 ) ) ) ) ) , S = spawn( fun ( ) −> s t o r e ( 4 2 ) end ) , spawn( fun ( ) −> i n c ( S ) end ) , dec ( S ) . 9 10 11 12 13 s t o r e (V) −> receive { lookup , P} −> P!V, s t o r e (V ) ; { s e t , V1} −> s t o r e (V1) end . 14 15 16 17 18 19 20 i n c ( S ) −> S ! { lookup , s e l f ( ) } , receive V −> l t l : newProp ( c s 1 ) , S ! { s e t ,V+1} end , l t l : releaseProp ( cs1 ) , inc (S ) . 21 22 23 dec ( S ) −> S ! { lookup , s e l f ( ) } , receive 51 24 25 26 27 V −> l t l : newProp ( c s 2 ) , S ! { s e t , V−1} end , l t l : releaseProp ( cs2 ) , dec ( S ) . In Abhängigkeit des Schedulings wird die angegebene Eigenschaft nach ein paar Schritten widerlegt. Bei der bisherigen Implementierung haben wir LTL zum Testen von Systemen eingesetzt. Hierbei ergibt sich allerdings das Problem, dass einige Formeln nie als falsch (z. B. F ϕ) bzw. andere nie als korrekt (z. B. Gϕ) beweisen werden können. Besonders problematisch sind somit Eigenschaften, welche F ∞ oder G∞ verwenden. Sie können durch Testen weder widerlegt noch gezeigt werden. Möglichkeiten hiermit umzugehen, ist z. B. obere Schranken einzubauen, nach wie vielen Schritten (Abwicklungen) bzw. für wie viele Schritte ein F oder G gelten soll. Außerdem kann es sinnvoll sein, zu protokollieren, wie oft F bzw. G bereits abgewicklet wurden, um einen Eindruck zu bekommen, wie häufig die Bedingung einer Formel schon getestet wurde. Entsprechende Implementierung werden in den Übungen besprochen. 9.2 Verifikation Ursprünglich wurde LTL aber zur formalen Verifikation entwickelt. Betrachtet man endliche Kripke Strukturen ist es nämlich sehr wohl möglich, Eigenschaften zu beweisen bzw. zu wiederlegen: LTL ist für endliche Kripke Strukturen entscheidbar. Das Verfahren, mit dem dieses entschieden wird, nennt man Model Checking, näheres hierzu findet man entsprechenden Theorievorlesungen. Beim Model Checking geht man in der Regel davon aus, dasss man ein System (insb. sein Protokoll) mit einer (wenig ausdrucksstarken) Spezifikationsprache beschreibt. Deren Semantik definiert eine endliche Kripke Struktur, welche dann zur Überprüfung von Eigenschaften (LTL) verwendet werden kann. Ein entsprechendes Tool zur Spezifikation/Verifikation mittels Model Checking ist z.B. Spin für die Sprache Promela und LTL. Ähnliche Tools existieren für Prozessalgebren, wie den Calculus of Communicating Systems (CCS) und modale Logiken (CTL, CTL∗ oder den modalen µ-Kalkül). Hierbei ist die Idee, dass man zunächst die Spezifikation erstellt und gegen spezifizierte Eigenschaften verifiziert. Danach wird diese dann schrittweise zum tatsächlichen System verfeinert. Als Konsequenz wird die Kommunikation dann aber in der Regel nur wenig von konkreten Werten abhängen dürfen, da dies auf Ebende der Spezifikation nicht ausgedrückt werden kann. Spezifikationssprachen bieten in der Regel nur über einen sehr eingeschränkten Wertebereich, Datenstrukturen stehen gar nicht zur Verfügung, da sonst die Semantik keine endliche/kleine Kripke Struktur bilden würde. Ein andere möglicher Ansatz zur formalen Verifikation ist der umgekehrte Weg. Ausgehend von einem existierenden System, versucht man durch Abstraktion eine endliche Kripke Struktur zu generieren. Die Idee ist hierbei, dass sämtliche Pfade des echten Systems auch in dem abstrahierten endlichen System enthalten sind. Hierzu kann man das Programm anstatt über konkreten Weten über abstrakten Werten ausführen. Bei Verzweigungen, welche dann über den abstrakten Werten nicht mehr entschieden werden können, muss man dann um tatsächlich alle möglichen Pfade zu repräsentieren zusätzlichen Nichtdeterminismus verwenden. Da LTL ja auf allen Pfaden eines Systems gelten muss und die Abstraktion eine endliche Kripke Struktur liefern kann, welche alle Pfade der tatsächlichen Semantik enthält, können wir LTL-Formeln mittels Model Checking entscheiden. Hierbei müssen wir aber beachten, dass für den Fall, dass eine LTL-Eigenschaft nicht gilt, zwei mögliche Fälle berücksichtigt werden müssen: • Der Pfad, welcher ein Gegenbeispiel darstellt, existiert auch in der konkreten Semantik und zeigt tatsächlich einen Fehler an. • Der Pfad existiert in der tatsächlichen Semantik nicht, da eine Programm-Verzweigung auf Grund der Abstraktion nicht korrekt entschieden wurde. In diesem Fall, kann man durch ein Verfeinerung der Abstraktion versuchen, diese Verzweigung korrekt zu entscheiden, was aber in der Regel zu einer Vergrößerung des Zustandsraums der abstrakten, endlichen Kripke-Struktur führen wird. In der Regel wird dieser Ansatz auch genau in den Fällen erfolgreich sein, in denen auch eine formale Spezifikation möglich gewesen wäre, bei der genau die entsprechenden abstrakten Werte verwendet worden wären, d.h. in Fällen, in denen das Systemverhalten, insbesondere die Kommunikation, wenig von konkreten Werten abhängen. 52 9.3 Simulation einer Turing-Maschine Die formale Verifikation von Programmen ist schwierig, da fast alle interessanten Programmeigenschaften unentscheidbar sind. Dies gilt bereits für sequentielle Programme, welche mit mehr als zwei Zahlenwerten rechnen können oder dynamische Datenstrukturen verwenden können. In diesem Kapitel wollen wir untersuchen, in wie weit auch die Erweiterung einer Programmiersprache um nebenläufige Konzepte die Entscheidung interessanter eigenschaften, wie z.B. der Deadlockfreiheit, unentscheidbar macht. Wir simulieren hierzu allein unter Verwendung einer endlichen Anzahl von Atomen und nur drei Prozessen eine Turingmaschine. Die Idee der Implementierung beruht auf der Darstellung des Bandes der Turing Maschine mit Hilfe zweier Stacks. Beim bewegen des Kontrollkopfes, wird der oberste Wert des einen Stacks auf den anderen Stack verschoben. Die Stacks werden jeweils durch einen Thread implementiert. Ein Thread, welcher die Datenstruktur eines Stacks implementiert, kann wie folgt konstruiert werden: Listing 46: Stack 1 2 3 4 5 6 s t a c k (P) −> r e c e i v e pop −> pop ; X −> s t a c k (P) , P ! X, s t a c k (P) end . Falls der Stack einen pop Nachricht erhält, tterminiertër, d.h. der darunter liegende Wert wird als Nachricht an einen Kontrollprozess verschickt. Alle anderen Nachrichten werden als push interpretiert und mit Hilfe des Laufzeitkellers gespeichert. Die Funktione push und pop können dann einfach wie folgt definiert werden: Listing 47: Stackoperationen 1 push ( St ,V) −> St !V. 2 3 4 5 6 pop ( St ) −> St ! pop , receive X −> X end . Bei der Verwendung eines Stacks zur Simulation einer Turing Maschine ist aber noch wichtig, dass auch über einen leeren Keller hinaus gelesen werden kann. Das Band enthält hier ja beliebig viele Blanks. Diese Blanks können aber einfach beim pop auf einem leeren Stack geliefert werden: Listing 48: Blank Stack 1 2 3 b l a n k S t a c k (P) −> s t a c k (P) , P ! blank , b l a n k S t a c k (P ) . Nun kann eine Turing Maschine einfach wie folgt definiert werden: Sei M = hQ, Γ, δ, q0 , F i eine deterministische Turing Maschine mit der Übergangsfunktin δ : Q \ F × Γ −→ Q × Γ × {l, r}. Dann ist kann der Kontrollprozess wie folgt als Erlangfunktion ausgedrückt werden: Für all q ∈ F , verwende Regel der Form: delta(SL,SR,q,_) -> pop. Für alle q ∈ Q\F mit δ(q, a) = (p, b, r): delta(SL,SR,q,a) -> push(SL,b), A = pop(SR), delta(SL,SR,p,A). und für alle q ∈ Q \ F mit δ(q, a) = (p, b, l): 53 delta(SL,SR,q,a) -> push(SR,b), A = pop(SL), delta(SL,SR,p,A). Die Turing Maschine kann dann wie folgt gestartet werden: start() -> SL=spawn(blankStack,[self()]), SR=spawn(blankStack,[self()]), writeInputToStack(SR), A = pop(SR), delta(SL,SR,q0 ,A), outputStack(SR). Man beachte, dass diese Turing-Maschinen-Simulation nur sehr wenige Werte verwendet. Außer dem Bandalphabet Γ und der Zustandsmenge Q, sind dies das Atom pop und die Pid des Hauptprozesses. Außerdem werden drei Threads verwendet. Es ist sogar möglich, auf noch einen weiteren Thread zu verzichten. Dies ist jedoch technisch aufwändiger (Übung). Entsprechend kann man auch mit endrekursiven Erlangprogrammen, die nur endlich viele Werte verwenden, aber beliebig viele Prozesse erlauben, eine Turingmaschine simulieren (Übung). Eine weitere Möglichkeit ist die Verwendung eines einzelnen Prozesses und beliebig vielen Werten in der Mailbox, bzw. dynamischen Datenstrukturen. Durch die Simulation der Turingmaschine können zeigen, dass das Model Checking Problem für die jeweils betrachtenen Systeme im allgemeinen unentscheidbar ist. 10 Transaktionsbasierte Kommunikation In diesem Kapitel werden wir uns noch einmal mit den Nachteilen der lockkbasierten Kommunikation beschäftigen und einen alternativen lockfreien Ansatz kennen lernen. Da der Ansatz besonders vielversprechend in Haskell implementiert wurde, betrachten wir in diesem Kapitel zunächst insbesondere Haskell. 10.1 Ein Bankbeispiel in Concurrent Haskell Aufgabe: Modellierung von Konten und zugehörigen Operationen auf den Konten. Idee: Repräsentation eines Kontos als MVar Int mit folgenden Operationen: 1 2 3 4 withdraw : : MVar I n t −> I n t −> IO ( ) withdraw a c c amount = do b a l a n c e <− takeMVar a c c putMVar a c c ( b a l a n c e − amount ) takeMVar lockt quasi die MVar und putMvar gibt sie wieder frei. 1 2 3 1 2 1 2 3 4 5 6 7 8 d e p o s i t : : MVar I n t −> I n t −> IO ( ) d e p o s i t a c c amount = withdraw a c c (−amount ) b a l a n c e : : MVar I n t −> IO I n t b a l a n c e a c c = readMVar a c c limitedWithdraw : : MVar I n t −> I n t −> IO Bool limitedWithdraw a c c amount = do b <− b a l a n c e a c c i f b >= amount then do withdraw a c c amount return True e l s e return F a l s e 54 Problem: Die atomare Ausführung ist nicht garantiert. Nach der balance-Abfrage kann ein anderer Thread Geld abheben ⇒ Kontostand ist zwar immer richtig, kann aber negativ werden. Lösung: Lock müsste über gesamte limitedWithdraw Aktion erhalten werden ⇒ keine Wiederverwendung von withdraw möglich. In Java werden alle Account-Methoden als synchronized gekennzeichnet, so dass eine Wiederverwendung möglich ist (mehrfaches Nehmen des gleichen Locks ist möglich). Weitere Operation: sicherer Transfer 1 2 3 4 5 6 7 8 9 10 11 12 l i m i t e d T r a n s f e r : : MVar I n t −> Mvar I n t −> I n t −> IO Bool l i m i t e d T r a n s f e r from t o amount = do b <− takeMVar from i f b >= amount then do b <− takeMVar t o putMVar from ( b − amount ) putMVar t o ( b + amount ) return True e l s e do putMVar from b return F a l s e Problem: Falls gleichzeitig von A nach B und von B nach A überwiesen wird, landen wir im Deadlock. Hier hilft auch in Java das synchronized nicht, da unterschiedliche Locks/Objekte beteiligt sind. Listing 49: Lösung 1 2 3 4 5 6 7 8 9 10 11 l i m i t e d T r a n s f e r from t o amount = do b <− takeMVar from i f b >= amount then do putMVar from ( b − amount ) b <− takeMVar t o putMVar t o ( b + amount ) return True e l s e do putMVar from b return F a l s e Wobei dies zu einem inkonsistenten Zwischenzustand führt. Eine andere Lösung wäre alle Locks zu Beginn der Aktion gemäß einer globalen Ordnung zu nehmen (zuerst kleine und dann große Locks nehmen, also immer erst A locken und dann B). 1 2 3 4 5 6 7 i f from < then do b1 <− b2 <− e l s e do b2 <− b1 <− to takeMVar from takeMvar t o takeMVar t o takeMVar from Während der gesamten Aktion sollten dann keine weiteren Locks erlaubt sein ⇒ keine Kompositionalität. 10.1.1 Gefahren bei der Programmierung mit Locks • Verwendung von zu wenig Locks ⇒ Inkonsistenzen • Verwendung von zu vielen Locks ⇒ übermäßige Sequentialisierung (im besten Fall) und Deadlocks (im schlechtesten Fall) • Verwendung falscher Locks, da oft der Zusammenhang zwischen Lock und zu sichernden Daten fehlt (bei Verwendung zusätzlicher Locks/Lockobjekte in Java) ⇒ Konsequenzen wie zuvor • Nehmen von Locks in falscher Reihenfolge (Race Condition) ⇒ Deadlock 55 • Robustheit, da beim Absturz von Teilkomponenten Locks eventuell nicht mehr freigegeben werden und inkonsistente Systemzustände zurück bleiben (z. B. Geld wird vernichtet) • Vergessen der Lockfreigabe ⇒ Deadlock 10.2 Transaktionsbasierte Kommunikation (in Concurrent Haskell als anderen Ansatz) Transaktionen sind ein bekanntes Konzept bei Datenbanken zur atomaren Ausführung komplexer Datenbankanfragen und -modifikationen. Diese Idee kann auch zur nebenläufigen Programmierung verwendet werden. Die modifizierbaren Variablen entsprechen der Datenbank und die Operationen entsprechen der Transaktion. Transaktionen beeinflussen die Welt nur, wenn sie erfolgreich waren. Sie werden in einer eigenen Monade ST M (Software Transactional Memory) definiert. Das atomare Ausführen einer Transaktion in der Welt geschieht mittels einer Funktion atomically :: STM a -> IO a. Im Gegensatz zu Datenbanken wird eine ST M immer wieder ausgeführt, bis sie erfolgreich war. Zur Kommunikation (anstelle der MVars) dienen spezielle Transaktionsvariablen: 1 data TVar a −− abstract 2 3 4 5 newTVar :: a −> STM ( TVar a ) readTVar : : TVar a −> STM a writeTVar : : TVar a −> a −> STM ( ) Beachte: Im Gegensatz zu MVars sind TVars niemals leer. Es gibt keinen Lock! Listing 50: Bankbeispiel 1 ty pe Account = TVar I n t 2 3 4 b a l a n c e : : Account −> STM I n t b a l a n c e a c c = readTVar a c c 5 6 7 8 9 1 2 3 1 2 3 4 5 6 7 8 withdraw : : Account −> I n t −> STM ( ) withdraw a c c amount = do b <− b a l a n c e a c c writeTVar a c c ( b − amount ) d e p o s i t : : Account −> I n t −> STM( ) d e p o s i t a c c amount = withdraw a c c (−amount ) limitedWithdraw : : Account −> I n t −> STM Bool limitedWithdraw a c c amount = do b <− b a l a n c e a c c i f b >= amount then do withdraw a c c amount return True e l s e return F a l s e Fassen wir nun noch einmal das STM-Interface von Haskell zusammen: Atomare Ausführung einer Transaktion: atomically :: STM a -> IO a TVars als Kommunikationsabstraktion, welche im Rahmen von Transaktionen verändert werden können: data TVar a ist abstrakt Erstellen, Lesen und Verändern von TVar-Inhalten: newTVar :: a -> STM (TVar a) 56 readTVar :: TVar a -> STM a writeTVar :: TVar a -> a -> STM () 10.2.1 Beispielprogramm Listing 51: Beispielprogramm 1 2 3 4 5 6 7 8 9 10 11 12 main : : IO ( ) main = do a c c 1 <− a t o m i c a l l y ( newTVar 1 0 0 ) a c c 2 <− a t o m i c a l l y ( newTVar 1 0 0 ) f o r k I O ( do b <− a t o m i c a l l y ( limitedWithdraw a c c 1 6 0 ) i f b then return ( ) e l s e putStrLn ”Du b i s t p l e i t e ! ” ) a t o m i c a l l y ( t r a n s f e r acc1 acc2 50) b <− a t o m i c a l l y ( b a l a n c e a c c 1 ) print b 10.3 Synchronisation mit Transaktionen Als Nächstes wollen wir untersuchen, in wie weit Transaktionen auch geeignet sind, Synchronisationsprobleme zu lösen. Hierzu betrachten wir wieder die dinierenden Philosophen und versuchen eine STM-basierte Implementierung zu entwickeln. Zunächst müssen wir uns überlegen, wie die Stäbchen repräsentiert werden können. Da TVars (im Gegensatz zu MVars) nicht leer sein können, benötigen wir eine TVar, welche boolesche Werte aufnehmen kann. Hierbei bedeutet der Wert True, dasss der Stab verfügbar ist und False, dass er bereits vergeben ist. Listing 52: Dinierende Philosophen 1 ty pe S t i c k = TVar Bool // True = l i e g t a u f dem T i s c h ( i n i t i a l ) Als Nächstes können wir versuchen Funktionen zum Aufnehmen bzw. Zurücklegen des Stäbchens zu definieren. Während putStick sehr einfach definiert werden kann, erreichen wir bei der Definition von takeStick einen Punkt, an dem wir nicht weiter wissen: Listing 53: Dinierende Philosophen 1 2 3 p u t S t i c k : : S t i c k −> STM ( ) p u t S t i c k s = do writeTVar s True 4 5 6 7 8 9 10 t a k e S t i c k : : S t i c k −> STM ( ) t a k e S t i c k s = do b <− readTVar s i f b then do writeTVar s F a l s e e l s e ??? // T r a n s a k t i o n e r n e u t v e r s u c h e n Falls takeStick ausgeführt wird, wenn die TVar den Wert False enthält, kann die gesamte Transaktion nicht mehr erfolgreich werden. An diesem Punkt hätte der lockbasierte Ansatz gewartet (suspendiert), bis ein anderer Prozess den Stab zurück legt. Denkt man in Transaktionen erkennen wir als Anwender der STM-Bibliothek an dieser Stelle, dass die Transaktion so insgesamt nicht erfolgreich ist, was wir durch Aufruf der Funktion retry :: STM () realisieren können. Bei Aufruf von retry wird die Transaktion erfolglos abgebrochen und erneut gestartet. Inwieweit hierbei tatsächlich Busy-Waiting notwendig ist, werden wir später klären. Zunächst besagt die Ausführung von retry, dass die Transaktion an dieser Stelle fehlgeschlagen ist: 57 Listing 54: Dinierende Philosophen 1 2 3 4 5 6 t a k e S t i c k : : S t i c k −> STM( ) t a k e S t i c k s = do b <− readTVar s i f b then do writeTVar s F a l s e else retry // T r a n s a k t i o n e r n e u t v e r s u c h e n Nun müssen wir nur noch die Philosophen hinzufügen: Listing 55: Dinierende Philosophen 1 2 3 4 5 6 7 8 9 10 p h i l : : S t i c k −> S t i c k −> IO ( ) p h i l l r = do // denken atomically ( takeStick l ) atomically ( takeStick r ) // E a t i n g a t o m i c a l l y ( do putStick l putStick r ) phil l r Die einzelnen Transaktionen werden mittels atomically ausgeführt. Hierbei fügen wir beide putStick Aktionen zusammen aus. Unter Hinzunahme einer geeigneten Startfunktion für Sticks und Philosophen, läuft das Programm schon sehr schnell nach seinem Start in einen Deadlock. Diesesmal können wir den Deadlock aber sehr viel einfacher vermeiden, als zuvor. Wir müssen einfach nur fordern, dass beide Stäbchen atomar, also innerhalb einer Transaktion, genommen werden. Hierzu komponieren wir beide sequentiell auf STM-Ebene und verwenden nur noch einen atomically Aufruf: // Think atomically(do takeStick l takeStick r) Nun läuft das Programm nicht mehr in die Deadlock-Situation. Ein Philosoph versucht, atomar beide Stäbchen zu nehmen. Ist dies nicht möglich, wird die gesamte Transaktion neu gestartet. Der Zustand, in dem also das linke Stäbchen aufgenommen wurde, aber das zweit nicht verfügbar ist, wird außerhalb der Transition nicht mehr sichtbar. 58