Skript zur Vorlesung Nebenläufige und verteilte

Werbung
Skript zur Vorlesung
Nebenläufige und verteilte Programmierung
SS 2016
Priv. -Doz. Dr. Frank Huch
19. Mai 2016
Version: 1.1
Dieses Skript basiert auf einer studentischen Vorlesungsmitschrift von Lutz Seemann, welche er dem
Dozenten dankenswerter Weise zur Verfügung gestellt hat.
Inhaltsverzeichnis
1 Einleitung
1.1 Sequentielle Programmierung . . . . . . . . . . . . . . . . . .
1.2 Begriffe . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
1.2.1 Nebenläufigkeit (Concurrency) . . . . . . . . . . . . .
1.2.2 Parallelität . . . . . . . . . . . . . . . . . . . . . . . .
1.2.3 Verteilte Programmierung (Distributed Programming)
1.3 Inhalt der Vorlesung . . . . . . . . . . . . . . . . . . . . . . .
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
1
1
1
1
1
2
2
2 Nebenläufige Systeme
2.1 Interprozesskommunikation und Synchronisation
2.1.1 Synchronisation mit Semaphoren . . . . .
2.1.2 Dinierende Philosophen . . . . . . . . . .
2.1.3 Monitore (Dijkstra ’71, Hoare ’74) . . . .
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
2
3
4
5
6
3 Nebenläufige Programmierung in Java
3.1 Java . . . . . . . . . . . . . . . . . . . . . . . . . . .
3.1.1 Die Klasse Thread . . . . . . . . . . . . . . .
3.1.2 Das Interface Runnable . . . . . . . . . . . .
3.2 Eigenschaften von Thread-Objekten . . . . . . . . .
3.3 Synchronisation von Threads . . . . . . . . . . . . .
3.4 Die Beispielklasse Account . . . . . . . . . . . . . . .
3.5 Genauere Betrachtung von synchronized . . . . . .
3.6 Unterscheidung der Synchronisation im OO-Kontext
3.7 Kommunikation zwischen Threads . . . . . . . . . .
3.7.1 Fallstudie: Einelementiger Puffer . . . . . . .
3.8 Beenden von Threads . . . . . . . . . . . . . . . . .
3.9 Warten auf Ergebnisse . . . . . . . . . . . . . . . . .
3.10 ThreadGroups . . . . . . . . . . . . . . . . . . . . .
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
7
7
7
8
8
9
10
10
11
12
15
18
19
20
in Haskell
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
24
24
25
25
4 Nebenläufige Programmierung
4.1 Concurrent Haskell . . . .
4.2 Kommunikation . . . . . .
4.3 Kanäle in Haskell . . . . .
5 Erlang
5.1 Sequentielle Programmierung in Erlang . . .
5.2 Nebenläufige Programmierung . . . . . . . . .
5.3 Ein einfacher Key-Value-Store . . . . . . . . .
5.4 Wie können Prozesse in Erlang synchronisiert
5.5 MVar . . . . . . . . . . . . . . . . . . . . . .
5.6 Nebenläufige Programmierung in Erlang . . .
5.7 Verteilte Programmierung in Erlang . . . . .
5.8 Verbinden unabhängiger Erlang-Prozesse . . .
5.8.1 Veränderungen des Clients . . . . . . .
5.8.2 Veränderungen des Servers . . . . . .
5.9 Robuste Programmierung . . . . . . . . . . .
5.10 Systemabstraktionen . . . . . . . . . . . . . .
. . . . .
. . . . .
. . . . .
werden?
. . . . .
. . . . .
. . . . .
. . . . .
. . . . .
. . . . .
. . . . .
. . . . .
6 Grundlagen der Verteilten Programmierung
6.1 7 Schichten des ISO-OSI-Referenzmodells . . . .
6.2 Protokolle des Internets . . . . . . . . . . . . . .
6.3 Darstellung von IP-Adressen/Hostnames in Java
6.4 Netzwerkkommunikation . . . . . . . . . . . . . .
6.4.1 UDP in Java . . . . . . . . . . . . . . . .
6.5 Socket-Optionen . . . . . . . . . . . . . . . . . .
6.6 Sockets in Erlang . . . . . . . . . . . . . . . . . .
6.7 Verteilung von Kommunikationsabstraktionen . .
i
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
26
26
28
28
30
31
31
31
32
32
32
34
34
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
36
36
36
37
37
37
41
42
43
7 Web-Applikationen
7.1 HTTP-Client . . . . . . . . . . . . . . . . . . . . . . . .
7.2 Client-Request-Header . . . . . . . . . . . . . . . . . . .
7.3 Webserver-Antwort . . . . . . . . . . . . . . . . . . . . .
7.4 Common Gateway Interface (CGI) . . . . . . . . . . . .
7.4.1 GET-Methode . . . . . . . . . . . . . . . . . . .
7.4.2 POST-Methode . . . . . . . . . . . . . . . . . . .
7.4.3 Wann sollte welche Methode verwendet werden?
7.5 Sessionmanagement . . . . . . . . . . . . . . . . . . . .
7.6 Beurteilung von CGI . . . . . . . . . . . . . . . . . . . .
.
.
.
.
.
.
.
.
.
43
44
44
44
44
45
46
46
47
47
8 Synchronisation durch Tupelräume
8.1 Java-Spaces . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
8.2 Implementierung von Tupelräumen in Erlang . . . . . . . . . . . . . . . . . . . . . . .
47
49
50
9 Spezifikation und Testen von Systemeigenschaften
51
10 Linear Time Logic (LTL)
10.1 Implementierung von LTL zum Testen . . . . . . . . . . . . . . . . . . . . . . . . . . .
10.2 Verifikation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
10.3 Simulation einer Turingmaschine . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
52
54
57
58
11 Transaktionsbasierte Kommunikation
11.1 Ein Bankbeispiel in Concurrent Haskell . . . . . . .
11.1.1 Gefahren bei der Programmierung mit Locks
11.2 Transaktionsbasierte Kommunikation (in Concurrent
11.2.1 Beispielprogramm . . . . . . . . . . . . . . .
11.3 Synchronisation mit Transaktionen . . . . . . . . . .
11.4 MVars . . . . . . . . . . . . . . . . . . . . . . . . . .
11.5 Alternative Komposition . . . . . . . . . . . . . . . .
11.6 Implementierung von STM . . . . . . . . . . . . . .
60
60
61
61
62
63
64
65
66
ii
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
. . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . .
Haskell als anderen Ansatz)
. . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . .
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
1 Einleitung
1.1 Sequentielle Programmierung
In der sequentiellen Programmierung führen Programme Berechnungen Schritt für Schritt aus. Hierdurch ergeben sich insbesondere Probleme bei der Entwicklung reaktiver Systeme, wie z.B. GUIs,
Betriebssystemprogrammen oder verteilten/Web-Applikationen.
Hier sollen alle möglichen Anfragen/Eingaben quasi “gleichzeitig” behandelt werden können.
Ein erster Ansatz hierzu war Polling. Alle möglichen Anfragen werden in einer großen Schleife bearbeitet.
Ein reaktives System läuft potentiell unendlich lange und reagiert auf seine Umgebung (z. B. GUI,
Textverarbeitung, Webserver, Betriebssystemprozesse). Eine Besonderheit ist hierbei dass meist mehrere Anfragen/Eingaben möglichst gleichzeitig bearbeitet werden sollen. Wie ist es möglich Reaktivität
zu gewährleisten?
Ein erster Ansatz wäre mittels Polling möglich, d. h. der Abfrage aller möglichen Anfragen in einer
großen Schleife, welcher jedoch folgende Nachteile hat:
• Busy Waiting verbraucht Systemressourcen.
• Code ist schlecht strukturiert, da alle Anfragen in die Schleife integriert werden müssen. Modularität ist nur eingeschränkt über Unterprozeduren möglich.
• Bearbeitung einer Anfrage blockiert andere Anfragen, da Rückkehr in Schleife erst nach Beendigung vorheriger Anfragen. Somit sequentielles Bearbeiten der Anfragen und keine Reaktivität
während der Bearbeitung einer Anfrage.
Lösung:Nebenläufigkeit (Concurrency)
Mehrere Threads (Fäden) können nebenläufig ausgeführt werden und die einzelnen Dienste eines
reaktiven Systems zur Verfügung stellen. Hierdurch ergeben sich folgende Vorteile:
• kein Busy Waiting (auf Interruptebene verschoben)
• Bessere Strukturierung: Dienst =
b (ggf. mehreren) Threads
• eventuell können mehrere Anfragen gleichzeitig bearbeitet werden
• System ist immer reaktiv
Beachte hierbei, dass die letzten beiden Punkte von der Implementierung der Threads auf der (Betriebs-)Systemebene abhängen.
1.2 Begriffe
1.2.1 Nebenläufigkeit (Concurrency)
Durch die Verwendung von Threads/Prozessen können einzelne Aufgaben/Dienste eines Programms
unabhängig von anderen Aufgaben/Diensten programmiert werden.
1.2.2 Parallelität
Meist in Verbindung mit High-Performance-Computing. Durch die parallele Ausführung mehrerer
Prozesse (auf in der Regel mehreren Prozessoren) wird eine schnellere Ausführung angestrebt. Beachte
aber, dass manchmal auch auf Ein-Prozessor-Systemen durch Nebenläufigkeit Performance-Gewinne
erreicht werden können, z. B. bei geringerer Performance von Netzwerkkarten.
Früher wurden insbesondere Shared-Memory-Systeme betrachtet, welche heute aber wieder in Form
der Multicore-Prozessoren an Relevanz gewinnen. Allerdings werden oft auch Rechner-Cluster zur
parallelen Programmierung eingesetzt (Distributed-Memory-Systeme) bis hin zu Netzwerken im Internet. Der Vorteil ist die Unabhängigkeit von Spezialhardware, sowie eine einfache Steigerung der
zur Verfügung stehenden Prozessoren.
Das größte Problem bei der parallelen Programmierung ist in der Regel eine möglichst gleichmäßige
Ausnutzung der Prozessoren (Vermeidung von Sequentialisierung) und eine Minimierung der Kommunikation.
1
Wir werden in dieser Vorlesung nicht weiter speziell auf die parallele Programmierung eingehen. Die
präsentierten Techniken finden hierbei aber durchaus auch Anwendung, wenngleich der Fokus sicherlich meist auf andere Aspekte gerichtet werden muss.
1.2.3 Verteilte Programmierung (Distributed Programming)
Ein verteiltes System besteht aus mehreren physisch getrennten Prozessoren, welche über ein Netzwerk
verbunden sind. Zum einen werden solche Systeme wie zuvor erwähnt zur parallelen Programmierung
eingesetzt. Zum anderen sind viele Anwendungen von ihrer Aufgabenstellung schon verteilt, so dass sie
auch entsprechend implementiert werden müssen. Beispiele sind Telefonsysteme oder Chats. Manchmal
stehen auch bestimmte Ressourcen nur an bestimmten Stellen zur Verfügung, so dass ebenfalls eine
Verteilung der Anwendung notwendig ist, z. B. Geldautomaten, spezielle Datenserver.
Eine Verteilung wird manchmal auch zur Performancesteigerung, z. B. Lastverteilung eingesetzt und
kann auch zur Fehlertoleranz durch redundante Informationsvorhaltung eingesetzt werden.
Verteilte Systeme werden in der Regel mit lokaler Nebenläufigkeit kombiniert, um Reaktivität gegenüber der Netzwerkkommunikation zu ermöglichen. Durch mehrere verteilte Prozesse kann es aber
auch ohne Nebenläufigkeit zu den gleichen Problemen wie bei rein nebenläufigen Systemen kommen.
1.3 Inhalt der Vorlesung
• Vergleich unterschiedlicher Konzepte zur nebenläufigen und verteilten Programmierung am Beispiel von Java, Erlang, Concurrent Haskell und weiteren Konzepten.
• Ausgewählte verteilte Algorithmen.
2 Nebenläufige Systeme
Generell sind immer drei Aspekte zu berücksichtigen:
• Definition und Start von Threads (ggf. auch deren Terminierung)
• Kommunikation zwischen Threads
• Synchronisation von Threads
Hierbei hängen die beiden letzten Punkte stark miteinander zusammen, da Kommunikation nur
schwerlich ohne Synchonisation möglich ist und jede Synchronisation natürlich auch eine Art Kommunikation ist.
Wir betrachten folgendes Beispiel in Pseudocode:
1
2
par { while true do w r i t e ( ” a ” ) }
{ while true do w r i t e ( ”b” ) }
wobei Semantik des par-Konstrukts: Spaltet Thread in zwei Unterthreads auf und führt diese nebenläufig aus.
ababab ...
aaabaaabaabbbaa ...
aaaaa ...
bbbbbb ...
Oft (aber nicht immer) deterministische Ausführung d. h. gleiche Ausgabe bei allen Läufen), die durch
den Scheduler geregelt wird. Dies muss aber nicht immer der Fall sein und ein Programmierer darf
sich hierauf auf keinen Fall verlassen. Der Schedule teilt den Prozessen die Prozessorzeit zu. Im Wesentlichen werden dabei folgende Arten von Schedulern unterschieden:
• kooperatives Multitasking ⇒ ein Thread rechnet solange, bis er die Kontrolle abgibt (z. B. mittels
yield()) oder auf Nachrichten wartet (suspend()) (z. B. aaaaa ... und bbbbbb ...). In Java
finden wir dies bei den sogenannte green threads.
• präemptives Multitasking ⇒ der Scheduler entzieht den Prozessen regelmäßig die Kontrolle.
Dies erfordert eine aufwändigere Implementierung, bietet aber mehr Programmierkomfort zur
2
Gewährleistung der Reaktivität des Systems, da man sich nicht so viele Gedanken machen muss,
wo überall die Kontrolle wieder abgegeben werden sollte (z. B. alle Ausgaben außer aaaaa ...
und bbbbbb ...)
Prinzipiell sollten Programmierer nebenläufiger Systeme möglichst für beliebige Scheduler entwickeln.
Einzige Ausnahme stellt manchmal die Einschränkung auf präemptives Multitasking dar. Aber selbst
dies ist oft nicht notwendig, da durch Synchronisation die Kontrolle häufig abgegeben wird und somit
auch kooperatives Multitasking ausreicht.
2.1 Interprozesskommunikation und Synchronisation
Neben der Generierung von Threads bzw. Prozessen ist auch die Kommunikation zwischen diesen
wichtig. Sie geschieht meist über geteilten Speicher bzw. Variablen.
Wir betrachten folgendes Beispiel in Pseudocode:
Listing 1: Beispiel für Race-Condition
1
2
3
int i = 0;
par { i := i + 1 } { i := i ∗ 2 }
write ( i ) ;
Semantik: wie zuvor.
Es ergibt sich Nichtdeterminismus mit mehreren Ergebnissen:
i = 0; i = i + 1; i = i * 2
i = 0; i = i * 2; i = i + 1
mit dem Ergebnis 2
mit dem Ergebnis 1
Wann ist ein Programm also korrekt?
• Ein sequentielles Programm ist korrekt, falls es für alle Eingaben ein korrektes Ergebnis liefert.
• Ein nebenläufiges Programm ist korrekt, wenn es für alle Eingaben und alle Schedules das richtige
Ergebnis liefert.
In der Praxis (leider) oft “für einen (den testbaren) Schedule”, wobei folgende Probleme auftreten
können:
• Portierung auf andere Architektur ⇒ anderer Schedule
• Erweiterung des Systems um zusätzliche Threads/zusätzliche Berechnungen ⇒ verändertes Scheduling
• Beeinflussung durch Ein-/Ausgaben
⇒ Deshalb: für alle Schedules !!! (manchmal ist auch eine Einschränkung auf präemptives Multitasking okay)
⇒ Bei Korrektheit für alle Schedules verliert man den Vorteil des präemptiven Multitaskings. Deshalb wird die Korrektheit oft nur “für alle fairen Schedules” verlangt.
Der Begriff fair bedeutet hierbei, dass jeder Thread nach endlicher Zeit wieder an die Reihe kommt,
also falls möglich einen Schritt ausführt.
Als Konsequenz daraus ist das Testen der Programme schwierig, da ggf. unendlich viele Abläufe
betrachtet werden müssen und eine Beeinflussung des Schedules fast unmöglich ist. Deshalb: formale
Spezifikation, formale Verifikation, Testtools und Debugger mit Einfluss auf den Scheduler.
Nebenläufigkeit macht Programme also nicht-deterministisch, d.h. es können je nach Scheduling unterschiedliche Ergebnisse herauskommen. So kann das obige Programm die Ausgaben 1 oder 2 erzeugen,
je nachdem, wie der Scheduler die beiden nebenläufigen Prozesse ausführt. Hängt das Ergebnis eines
Programmablaufs von der Reihenfolge des Schedulings ab, so nennt man dies eine Race-Condition.
Neben den beiden Ergebnissen 1 und 2 ist aber auch noch ein weiteres Ergebnis, nämlich 0, möglich.
Dies liegt daran, dass noch nicht klar spezifiziert wurde, welche Aktionen wirklich atomar ausgeführt
werden. Durch Übersetzung des Programms in Byte- oder Maschinencode können sich folgende Instruktionen ergeben:
Listing 2: Stackmaschinencode des Programms
3
1
2
3
int i = 0;
par { LOAD i ; LIT 1 ; ADD; STORE i }
{ LOAD i ; LIT 2 ; MULT; STORE i }
Aufgrund dieses Codeaufbaus ist dann ebenfalls folgende Ausführung möglich:
LOAD i; LIT 2; MULT;
LOAD i; LIT 1; ADD; STORE i;
STORE i
Diese liefert das Ergebnis i = 0, was sicherlich nicht das gewünschte Ergebnis ist. Bei dieser Ausführung
ist der 1. Thread in den 2. Thread “eingebettet”. Zuweisungen können aber nicht generell atomar
gemacht werden (wegen z. B. großer Berechnungen, Funktionsaufrufe).
Wir benötigen also Synchronisation zur Gewährleistung der atomaren Ausführung bestimmter Codeabschnitte, welche nebenläufig auf gleichen Ressourcen arbeiten. Solche Codeabschnitte nennen wir
kritische Bereiche.
2.1.1 Synchronisation mit Semaphoren
Ein bekanntes Konzept zur Synchronisation nebenläufiger Threads oder Prozesse geht auf Dijkstra
aus dem Jahre 1968 zurück. Dijkstra entwickelte einen abstrakten Datentyp mit dem Ziel, die atomare (ununterbrochene) Ausführung bestimmter Programmabschnitte zu garantieren. Diese Semaphore
stellen zwei atomare Operationen zur Verfügung:
Listing 3: Semaphor-Operationen
1
2
3
4
5
P( s ) :
IF ( s >= 1 ) THEN { s := s − 1 ; }
ELSE { P r o z e s s wird i n Warteschlange zu s e i n g e t r a g e n
und s u s p e n d i e r t ;
s := s − 1 ; }
6
7
8
9
10
V( s ) :
s := s + 1 ;
IF ( Warteschlange zu s n i c h t l e e r ) THEN { Erwecke e r s t e n P r o z e s s
d e r Warteschlange zu s ; }
Dabei steht P(s) für passieren oder passeer, V(s) steht für verlassen oder verlaat. Nun können wir
das obige Programm mittels eines Semaphor implementieren und so die Ausgabe 0 verhindern:
1
2
3
int i = 0;
semaphor s := 1 ;
par { P( s ) ; i := i + 1 ; V( s ) ; } { P( s ) ; i := i ∗ 2 ; V( s ) ; }
Der Initialwert des Semaphors bestimmt dabei die maximale Anzahl der Prozesse im kritischen Bereich. Meist finden wir hier den Wert 1, solche Semaphore nennen wir binäre Semaphore.
Eine andere Anwendung von Semaphoren ist das Producer-Consumer-Problem: n Producer erzeugen
Waren, die von m Consumern verbraucht werden. Eine einfache Lösung für dieses Problem verwendet
einen unbeschränkten Puffer:
Listing 4: Producer-Consumer-Problem mit unbeschränktem Puffer ohne Synchronisation
1
semaphor num := 0 ;
2
3
4
5
6
7
8
Producer :
while ( true ) {
produce p r o d u c t ;
push p r o d u c t t o b u f f e r ;
V(num ) ;
}
9
10
11
Consumer :
while ( true ) {
4
P(num ) ;
p u l l p r o d u c t from b u f f e r ;
consume p r o d u c t ;
12
13
14
15
}
Bemerkungen:
Zeile 1: Füllstand des Puffers
Zeile 12: an dieser Stelle findet ggf. eine Suspension statt
Der Semaphor fungiert als Produktzähler für den Puffer. Hochzählen mittel V im Producer und
Herunterzählen mittels P im Consumer.
Wir haben aber zunächst noch keine Synchronisation auf dem Puffer modelliert (kritischer Bereich).
Um hier zusätzlich synchronisieren zu können, fügen wir einen zweiten Semaphor ein, welcher wie
oben skiziert den kritischen Bereich schützt.
Listing 5: Producer-Consumer-Problem mit Synchronisation
1
2
semaphor num := 0 ;
semaphor b a c c e s s := 1 ;
3
4
5
6
7
8
9
10
11
Producer :
while ( true ) {
produce p r o d u c t ;
P( b a c c e s s ) ;
push p r o d u c t t o b u f f e r ;
V( b a c c e s s ) ;
V(num ) ;
}
12
13
14
15
16
17
18
19
20
Consumer :
while ( true ) {
P(num ) ;
P( b a c c e s s ) ;
p u l l p r o d u c t from b u f f e r ;
V( b a c c e s s ) ;
consume p r o d u c t ;
}
Bemerkung:
Zeile 1: Füllstand des Puffers
Zeile 15: an dieser Stelle findet ggf. eine Suspension statt
Die Semaphore bringen jedoch auch einige Nachteile mit sich. Der Code mit Semaphoren wirkt schnell
unstrukturiert und unübersichtlich. Außerdem können wir Semaphore nicht kompositionell verwenden: So kann der einfache Code P(s); P(s); auf einem binären Semaphor s bereits einen Deadlock
erzeugen.
2.1.2 Dinierende Philosophen
An einem Tisch mit Essen sitzen 5 Philosophen. Es gibt allerdings nur 5 Stäbchen zum Essen, wobei
eine Person immer 2 Stäbchen benötigt, so dass jeweils 1 Stäbchen mit dem Nachbarn geteilt werden muss. Ein Philosoph denkt so lange, bis er hungrig wird und versucht dann nacheinander beide
Stäbchen aufzunehmen und zu essen. Es findet keine zusätzliche Kommunikation untereiander statt.
Die Implementierung dieses Problems mit Semaphoren sieht wie folgt aus:
Listing 6: Code für Philosoph i von n
1
2
3
while ( true ) {
think ( ) ;
g e t hungry ( ) ;
5
P( s t i c k ( i
P( s t i c k ( (
eat ( ) ;
V( s t i c k ( i
V( s t i c k ( (
4
5
6
7
8
9
));
i +1)%n ) ) ;
));
i +1)%n ) ) ;
}
Die Reihenfolge der V -Operationen ist hierbei egal. Es kann jedoch ein Deadlock (Verklemmung)
auftreten, falls alle Philosophen gleichzeitig ihr linkes Stäbchen nehmen. Diesen Deadlock können wir
durch Zurücklegen vermeiden:
Ein Philosoph legt das erste Stäbchen zurück, falls das Zweite nicht verfügbar ist.
Listing 7: Dinierende Philosophen mit Zurücklegen
1
2
3
4
5
6
7
8
9
10
11
while ( true ) {
think ( ) ;
g e t hungry ( ) ;
P( s t i c k ( i ) ) ;
IF ( s t i c k ( ( i +1)%n ) == 0 ) THEN { V( s t i c k ( i
ELSE { P( s t i c k ( (
eat ( ) ;
V( s t i c k ( i
V( s t i c k ( (
}
}
)); }
i +1)%n ) ) ;
));
i +1)%n ) ) ;
An der Stelle P(stick((i+1)%n)) kann ein anderer Philosoph das Stäbchen wegnehmen. Der letzte
Philosoph muss sein Stäbchen aber immer wieder weglegen. Livelocks sind aber nicht ausgeschlossen, d. h. ein oder mehrere Philosophen können verhungern (trotz eines fairen Schedules). Nachteile
von Semaphoren: P-/V-Operatoren sind schwer zuordenbar (sind eigentlich wie Klammern, aber nur
während der Laufzeit; nicht in der Syntax) ⇒ Codestrukturierung ist aus softwaretechnischer Sicht
schwer und bietet viele mögliche Fehlerquellen ⇒ besser: strukturiertes Konzept
2.1.3 Monitore (Dijkstra ’71, Hoare ’74)
Monitore entsprechen einer Menge von Prozeduren und Datenstrukturen, die als Betriebsmittel betrachtet mehreren Threads zugänglich sind, aber nur von einem Thread zur Zeit benutzt werden
können ⇒ in einem Monitor befindet sich höchstens ein Thread.
Als Beispiel betrachten wir die Monitor-Implementierung eines Puffers:
Listing 8: Implementierung eines beschränkten Puffers als Monitor
1
2
3
4
5
6
7
8
9
10
11
monitor b u f f e r {
[ i n t ] [ 0 . . . ( n −1)] c o n t e n t s ;
i n t num , wp , rp = 0 ;
queue s e n d e r , r e c e i v e r ;
p r o c e d u r e push ( i n t item ) {
IF (num == n ) THEN { d e l a y ( s e n d e r ) ; }
c o n t e n t s [ wp ] = item ;
wp := (wp+1) mod n ;
num := num + 1 ;
continue ( r e c e i v e r ) ;
}
12
13
14
15
16
17
18
function int pull () {
IF (num == 0 ) THEN { d e l a y ( r e c e i v e r ) ; }
i n t item := c o n t e n t [ rp ] ;
rp := ( rp +1) mod n ;
num := num − 1 ;
continue ( s e n d e r ) ;
6
r e t u r n item ;
19
}
20
21
}
Bemerkungen:
Zeile 4: Prozessqueues
Zeile 6: Puffer ist voll
Zeile 14: Puffer ist leer
Semantik:
• num Anzahl der Elemente im Puffer
• wp Pointer zum Schreiben
• rp Pointer zum Lesen
• delay(Q) ⇒ der aufrufende Prozess wird in die Warteschlange Q eingefügt
• continue(Q) ⇒ aktiviert den ersten Thread in der Warteschlange Q, falls ein solcher vorhanden
ist
Vorteile:
• innerhalb der Monitorprozesse können Synchronisationsprobleme ignoriert werden
• alle kritischen Bereiche/Datenstrukturen befinden sich in einem abstrakten Datentyp
Nachteile:
• häufig zu viel Synchronisation ⇒ sequentielles Programm oder Deadlocks
• wird in Programmiersprachen selten unterstützt (in Java existiert ein monitorähnliches Konzept)
3 Nebenläufige Programmierung in Java
3.1 Java
Programmiersprache des Internets ⇒ Reaktivität notwendig ⇒ Nebenläufigkeit.
3.1.1 Die Klasse Thread
Die API von Java bietet im Package java.lang eine Klasse Thread an. Eigene Threads können von
dieser abgeleitet werden. Der Code, der dann nebenläufig ausgeführt werden soll, wird in die Methode
run() geschrieben. Nachdem wir einen neuen Thread einfach mit Hilfe seines Konstruktors erzeugt
haben, können wir ihn zur nebenläufigen Ausführung mit der Methode start() starten.
Wir betrachten als Beispiel folgenden einfachen Thread:
1
2
public c l a s s C o n c u r r e n t P r i n t extends Thread {
private S t r i n g s t r ;
3
4
5
6
public C o n c u r r e n t P r i n t ( S t r i n g s t r 1 ) {
str = str1 ;
}
7
8
9
10
11
12
public void run ( ) {
while ( true ) {
System . out . p r i n t ( s t r + ” ” ) ;
}
}
13
14
15
16
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
new C o n c u r r e n t P r i n t ( ” a ” ) . s t a r t ( ) ;
new C o n c u r r e n t P r i n t ( ”b” ) . s t a r t ( ) ;
7
}
17
18
}
Ausgabe: Menge aus a’s und b’s. Bei der Verwendung von green Threads, ggf. auch nur a’s oder nur
b’s.
3.1.2 Das Interface Runnable
Wegen fehlender Mehrfachvererbung in Java ist oft das Erben von der Threadklasse problematisch.
Deshalb besteht alternativ die Möglichkeit der Implementierung des Interfaces Runnable:
1
2
public c l a s s C o n c u r r e n t P r i n t implements Runnable {
String str ;
3
public C o n c u r r e n t P r i n t ( S t r i n g s t r 1 ) {
str = str1 ;
}
4
5
6
7
public void run ( ) {
while ( true ) {
System . out . p r i n t ( s t r + ” ” ) ;
}
}
8
9
10
11
12
13
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
Runnable aThread = new C o n c u r r e n t P r i n t ( ” a ” ) ;
Runnable bThread = new C o n c u r r e n t P r i n t ( ”b” ) ;
new Thread ( aThread ) . s t a r t ( ) ;
new Thread ( bThread ) . s t a r t ( ) ;
}
14
15
16
17
18
19
20
}
Beachte: Innerhalb der obigen Implementierung von ConcurrentPrint liefert this kein Objekt vom
Typ Thread mehr. Das aktuelle Thread-Objekt erreicht man dann über die statische Methode
Thread.currentThread().
3.2 Eigenschaften von Thread-Objekten
Jedes Thread-Objekt in Java hat eine Reihe von Eigenschaften:
Name Jeder Thread besitzt eine Namen, Beispiele sind main-Thread“, Thread-0“ oder Thread-1“.
”
”
”
Der Zugriff auf den Namen eines Threads erfolgt über die Methoden getName() und
setName(String). Man verwendet sie in der Regel zum Debuggen.
Zustand Jeder Thread befindet sich stets in einem bestimmten Zustand. Eine Übersicht dieser Zustände
und der Zustandsübergänge ist in Abbildung 1 dargestellt. Ein Thread-Objekt bleibt auch im
Zustand terminiert noch solange erhalten, bis alle Referenzen auf ihn verworfen wurden.
Dämon Ein Thread kann mit Hilfe des Aufrufs setDaemon(true) vor Aufruf der start()-Methode
als Hintergrundthread deklariert werden. Die JVM terminiert, sobald nur noch Dämonenthreads
laufen. Beispiele für solche Threads sind AWT-Threads oder der Garbage Collector.
Priorität Jeder Thread in Java hat eine bestimmte Priorität. Die genaue Staffelung ist plattformspezifisch.
Threadgruppe Threads können zur gleichzeitigen Behandlung auch in Gruppen eingeteilt werden.
Methode sleep(long) Lässt den Thread die angegebene Zeit schlafen. Ein Aufruf dieser Methode
kann eine InterruptedException werfen, welche aufgefangen werden muss.
Das Thread-Objekt bleibt erhalten, solange es referenziert wird. Während der Ausführung existiert
eine “Eigenreferenz”.
8
Abbildung 1: Zustände von Threads
Durch Aufruf der Methode yield wird dem Scheduler signalisiert, dass der Prozess die Kontrolle
abgeben kann. Bei green Threads führt dieser Aufruf in der Regel zum Threadswitch, falls dies möglich
ist. Bei präemptivem Scheduling kann yield auch keinen Efekt haben.
Scheduler: Es gibt unterschiedliche Implementierungen
• green Threads ⇒ kooperativer Scheduler in JVM (wird in der Regel nicht mehr unterstützt)
• native Threads ⇒ Verwendung von Betriebssystemprozessen mit präemptivem Multitasking
Vorteil:
• Ausnutzung moderner Multicore-Architekturen
• Wiederverwendung des Betriebssystem-Schedulers
Nachteil:
• größerer Verbrauch von Systemressourcen (keine light-weight Threads)
3.3 Synchronisation von Threads
Zur Synchronisation von Threads bietet Java ein Monitor-ähnliches Konzept, das es erlaubt, Locks
auf Objekten zu setzen und wieder freizugeben.
Die Methoden eines Threads können in Java als synchronized deklariert werden. In allen synchronisierten Methoden eines Objektes darf sich dann maximal ein Thread zur Zeit befinden. Hierzu zählen
auch Berechnungen, die in einer synchronisierten Methode aufgerufen werden und auch unsynchronisierte Methoden des gleichen Objektes. Dabei wird eine synchronisierte Methode nicht durch einen
Aufruf von sleep(long) oder yield() verlassen.
Ferner besitzt jedes Objekt ein eigenes Lock. Beim Versuch der Ausführung einer Methode, die als
synchronized deklariert ist, unterscheiden wir drei Fälle:
1. Ist das Lock freigegeben, so nimmt der Thread es sich.
2. Besitzt der Thread das Lock bereits, so macht er weiter.
3. Ansonsten wird der Thread suspendiert.
Das Lock wird wieder freigegeben, falls die Methode verlassen wird, in der es genommen wurde. Im
Vergleich zu Semaphoren wirkt der Ansatz von Java strukturierter, man kann kein Unlock vergessen
und es kommt nicht zur Suspension beim mehrfachen Nehmes des Locks. Dennoch ist er weniger
flexibel.
9
3.4 Die Beispielklasse Account
Ein einfaches Beispiel soll die Verwendung von synchronized-Methoden veranschaulichen. Wir betrachten eine Implementierung einer Klasse für ein Bankkonto:
Listing 9: Implementierung eines Bankkontos
1
2
c l a s s Account {
private double b a l a n c e ;
3
public Account ( double i n i t i a l ) {
balance = i n i t i a l ;
}
4
5
6
7
public synchronized double g e t B a l a n c e ( ) {
return b a l a n c e ;
}
8
9
10
11
public synchronized void d e p o s i t ( double amount ) {
b a l a n c e += amount ;
}
12
13
14
15
}
Wir möchten die Klasse nun wie folgt verwenden:
1
2
3
4
5
6
7
Account a = new Account ( 3 0 0 ) ;
...
a . d e p o s i t ( 1 0 0 ) ; // n e b e n l a e u f i g , z w e i t e r Thread
...
a . d e p o s i t ( 1 0 0 ) ; // n e b e n l a e u f i g , z w e i t e r Thread
...
System . out . p r i n t l n ( a . g e t B a l a n c e ( ) ) ;
Die Aufrufe der Methode deposit(double) sollen dabei nebenläufig von verschiedenen Threads aus
erfolgen. Ohne das Schlüsselwort synchronized wäre die Ausgabe 500, aber auch die Ausgabe 400
denkbar (vgl. dazu Abschnitt Interprozesskommunikation und Synchronisation). Mit Anwendung des
Schlüsselwortes ist eine Ausgabe von 500 garantiert.
3.5 Genauere Betrachtung von synchronized
Vererbte synchronisierte Methoden müssen nicht zwingend wieder synchronisiert sein. Wenn man
solche Methoden überschreibt, so kann man das Schlüsselwort synchronized auch weglassen. Dies bezeichnet man als verfeinerte Implementierung. Die Methode der Oberklasse bleibt dabei synchronized.
Andererseits können unsynchronisierte Methoden auch durch synchronisierte überschrieben werden.
Klassenmethoden, die als synchronisiert deklariert werden (static synchronized), haben keine Wechselwirkung mit synchronisierten Objektmethoden. Die Klasse hat also ein eigenes Lock.
Man kann sogar auf einzelne Anweisungen synchronisieren:
1
synchronized ( expr ) b l o c k
Dabei muss expr zu einem Objekt auswerten, dessen Lock dann zur Synchronisation verwendet wird.
Streng genommen sind synchronisierte Methoden also nur syntaktischer Zucker: So steht die Methodendeklaration
1
synchronized A m( a r g s ) b l o c k
eigentlich für
1
A m( a r g s ) { synchronized ( t h i s ) b l o c k }
Einzelne Anweisungen zu synchronisieren ist sinnvoll, um weniger Code synchronisieren bzw. sequentialisieren zu müssen:
10
1
private double s t a t e ;
2
3
4
public void c a l c ( ) {
double r e s ;
5
// do some r e a l l y e x p e n s i v e c o m p u t a t i o n
...
// s a v e t h e r e s u l t t o an i n s t a n c e v a r i a b l e
synchronized ( t h i s ) {
state = res ;
}
6
7
8
9
10
11
12
}
Synchronisierung auf einzelne Anweisungen ist auch nützlich, um auf andere Objekte zu synchronisieren. Wir betrachten als Beispiel eine einfache Implementierung einer synchronisierten Collection:
1
2
3
4
class Store {
public synchronized boolean hasSpace ( ) {
...
}
5
public synchronized void i n s e r t ( int i )
throws N o S p a c e A v a i l a b l e E x c e p t i o n {
6
7
8
...
9
}
10
11
}
Wir möchten diese Collection nun wie folgt verwenden:
1
2
3
4
5
S t o r e s = new S t o r e ( ) ;
...
i f ( s . hasSpace ( ) ) {
s . insert (42);
}
Dies führt jedoch zu Problemen, da wir nicht ausschließen können, dass zwischen den Aufrufen von
hasSpace() und insert(int) ein Re-Schedule geschieht. Da sich das Definieren spezieller Methoden
für solche Fälle oft als unpraktikabel herausstellt, verwenden wir die obige Collection also besser
folgendermaßen:
1
2
3
4
5
synchronized ( s ) {
i f ( s . hasSpace ( ) ) {
s . insert (42);
}
}
3.6 Unterscheidung der Synchronisation im OO-Kontext
Wir bezeichnen synchronisierte Methoden und synchronisierte Anweisungen in Objektmethoden als
server-side synchronisation. Synchronisation der Aufrufe eines Objektes bezeichnen wir als client-side
synchronisation.
Aus Effizenzgründen werden Objekte der Java-API, insbesondere Collections, nicht mehr synchronisiert. Für Collections stehen aber synchronisierte Versionen über Wrapper wie synchronizedCollection, synchronizedSet, synchronizedSortedSet, synchronizedList, synchronizedMap oder synchronizedSortedMap zur Verfügung.
Sicheres Kopieren einer Liste in ein Array kann nun also auf zwei verschiedene Weisen bewerkstelligt
werden: Als erstes legen wir eine Instanz einer synchronisierten Liste an:
11
1
L i s t <I n t e g e r > u n s y n c L i s t = new L i s t <I n t e g e r > ( ) ;
2
3
4
// f i l l t h e l i s t
...
5
6
L i s t <I n t e g e r > l i s t = C o l l e c t i o n s . s y n c h r o n i z e d L i s t ( u n s y n c L i s t ) ;
Nun können wir diese Liste entweder mit der einfachen Zeile
1
I n t e g e r [ ] a = l i s t . toArray (new I n t e g e r [ 0 ] ) ;
oder über
1
Integer [ ] b ;
2
3
4
5
6
synchronized ( l i s t ) {
b = new I n t e g e r [ l i s t . s i z e ( ) ] ;
l i s t . toArray ( b ) ;
}
in ein Array kopieren. Bei der zweiten, zweizeiligen Variante ist die Synchronisierung auf die Liste
unabdingbar: Wir greifen in beiden Zeilen auf die Collection zu, und wir können nicht garantieren,
dass nicht ein anderer Thread die Collection zwischenzeitig verändert. Dies ist ein klassisches Beispiel
für client-side synchronisation.
3.7 Kommunikation zwischen Threads
Threads kommunizieren über geteilte Objekte miteinander. Wie finden wir nun aber heraus, wann
eine Variable einen Wert enthält? Dafür gibt es mehrere Lösungsmöglichkeiten.
Die erste Möglichkeit ist die Anzeige des Veränderns einer Komponente des Objektes, beispielsweise
durch Setzen eines Flags (boolean).
1
2
3
class C {
private i n t s t a t e = 0 ;
private boolean m o d i f i e d = f a l s e ;
4
public synchronized void p r i n t N e w S t a t e ( ) {
while ( ! m o d i f i e d ) { } ;
System . out . p r i n t l n ( s t a t e ) ;
modified = false ;
}
5
6
7
8
9
10
public synchronized void s e t V a l u e ( i n t v ) {
state = v ;
m o d i f i e d = true ;
System . out . p r i n t l n ( ” v a l u e s e t ” ) ;
}
11
12
13
14
15
16
}
Diese erste Lösung zeigt den erfolgten Versand durch Veränderung eines geteilten Objekts, hier boolesches Flag modified an. Dies hat jedoch den Nachteil, dass das Prüfen auf das Flag zu busy waiting
führt. Busy waiting bedeutet, dass ein Thread auf das Eintreffen eines Ereignisses wartet, dabei
aber weiterrechnet und somit Ressourcen wie Prozessorzeit verbraucht. Deshalb suspendiert man
einen Thread mittels der Methode wait() des Objektes, und weckt ihn später mittels notify() oder
notifyAll() wieder auf.
1
2
class C {
private i n t s t a t e = 0 ;
3
4
5
public synchronized void p r i n t N e w S t a t e ( )
throws I n t e r r u p t e d E x c e p t i o n {
12
6
wait ( ) ;
System . out . p r i n t l n ( s t a t e ) ;
7
8
}
9
10
public synchronized void s e t V a l u e ( i n t v ) {
state = v ;
notify ();
System . out . p r i n t l n ( ” v a l u e s e t ” ) ;
}
11
12
13
14
15
16
}
Zwei Threads führen nun die Methodenaufrufe printNewState() und setValue(42) nebenläufig aus.
Nun ist die einzig mögliche Ausgabe “value set”, “42”.
Falls der Aufruf von wait() erst kommt, nachdem die Methode setValue(int) vom ersten Thread
schon verlassen wurde, so führt dies nur zur Ausgabe value set.
Die Methoden wait(), notify() und notifyAll() dürfen nur innerhalb von synchronized-Methoden
oder -Blöcken benutzt werden und sind Methoden für das gesperrte Objekt. Sie haben dabei folgende
Semantik:
wait() suspendiert den ausführenden Thread und gibt das Lock des Objektes wieder frei.
notify() erweckt einen 1 schlafenden Thread des Objekts und fährt mit der eigenen Berechnung fort.
Der erweckte Thread bewirbt sich nun um das Lock. Wenn kein Thread schläft, dann geht das
notify() verloren.
notifyAll() tut das gleiche wie notify(), nur für alle Threads, die für dieses Objekt mit wait()
schlafen gelegt wurden.
Dabei ist zu beachten, dass diese drei Methoden nur auf Objekten aufgerufen werden dürfen, deren
Lock man vorher erhalten hat. Der Aufruf muss daher in einer synchronized-Methode bzw. in einem
synchronized-Block erfolgen, ansonsten wird zur Laufzeit eine IllegalMonitorStateException geworfen.
Wir möchten nun ein Programm schreiben, das alle Veränderungen des Zustands ausgibt:
Wie bei der “busy waiting”-Idee verwenden wir dazu zusätzlich noch ein Flag:
1
private boolean m o d i f i e d = f a l s e ; // z u r A n z e i g e d e r Z u s t a n d s a e n d e r u n g e n
2
3
4
5
6
public synchronized void p r i n t N e w S t a t e ( ) {
i f ( ! modified ) {
wait ( ) ;
}
7
System . out . p r i n t l n ( s t a t e ) ;
modified = false ;
8
9
10
}
11
12
13
14
15
16
17
public synchronized void s e t V a l u e ( i n t v ) {
state = v ;
notify ();
m o d i f i e d = true ;
System . out . p r i n t l n ( ” v a l u e s e t ” ) ;
}
Ein Thread führt nun printNewState() aus, andere Threads verändern den Zustand mittels
setValue(int). Dies führt zu einem Problem: Bei mehreren setzenden Threads kann die Ausgabe
einzelner Zwischenzustände verloren gehen. Also muss auch setValue(int) ggf. warten und wieder
aufgeweckt werden:
1
2
3
public synchronized void s e t V a l u e ( i n t v ) {
i f ( modified ) {
wait ( ) ;
1 es
ist nicht genau festgelegt welchen
13
}
4
5
state = v ;
notify ();
m o d i f i e d = true ;
System . out . p r i n t l n ( ” v a l u e s e t ” ) ;
6
7
8
9
10
}
Nun ist es aber nicht gewährleistet, dass der Aufruf von notify() in der Methode setValue(int)
den printNewState-Thread aufweckt! In Java lösen wir dieses Probem mit Hilfe von notifyAll()
und nehmen dabei ein wenig busy waiting in Kauf:
1
2
3
class C {
private i n t s t a t e = 0 ;
private m o d i f i e d = f a l s e ;
4
public synchronized void p r i n t N e w S t a t e ( ) {
while ( ! m o d i f i e d ) {
wait ( ) ;
}
5
6
7
8
9
System . out . p r i n t l n ( s t a t e ) ;
modified = false ;
notify ();
10
11
12
}
13
14
public synchronized void s e t V a l u e ( i n t v ) {
while ( m o d i f i e d ) {
wait ( ) ;
}
15
16
17
18
19
state = v ;
notifyAll ();
m o d i f i e d = true ;
System . out . p r i n t l n ( ” v a l u e s e t ” ) ;
20
21
22
23
}
24
25
}
⇒ Es werden also alle suspendierten Objekte aufgeweckt und das richtige erkennt sich selber.
Dies ist der Java-Stil zur synchronisierten Kommunikation: Verwendung von wait() in Kombination
mit notifyAll() und einen bisschen Busy Waiting:
while(!Nachricht erhalten)
wait(); und notifyAll();
Dies hat aber folgende Nachteile:
• sehr unkontrollierte Kommunikation
• Verschwendung von Systemressourcen (Busy Waiting)
• selbst bei passender wait-/notify()-Verwendung können von außen mit synchronisierten Anweisungen weitere wait()-/notify-Aufrufe hinzugefügt werden ⇒ falscher Thread wird aufgeweckt
Die Methode wait() ist in Java außerdem mehrmals überladen:
wait(long) unterbricht die Ausführung für die angegebene Anzahl an Millisekunden.
wait(long, int) unterbricht die Ausführung für die angegebende Anzahl an Milli- und Nanosekunden.
Anmerkung: Es ist dringend davon abzuraten, die Korrektheit des Programms auf diese Überladungen
zu stützen!
Die Aufrufe wait(0), wait(0, 0) und wait() führen alle dazu, dass der Thread solange wartet, bis
er wieder aufgeweckt wird.
14
3.7.1 Fallstudie: Einelementiger Puffer
Ein einelementiger Puffer ist günstig zur Kommunikation zwischen Threads. Da der Puffer einelementig ist, kann er nur leer oder voll sein. In einen leeren Puffer kann über eine Methode put ein
Wert geschrieben werden, aus einem vollen Puffer kann mittels take der Wert entfernt werden. take
suspendiert auf einem leeren Puffer, put suspendiert auf einem vollen Puffer.
Der einelementige Puffer kann auch als veränderbare Variable gesehen werden. Wir nennen ihn deshalb
MVar (mutable variable).
Listing 10: Implementierung einer veränderbaren Variable
1
2
3
public c l a s s MVar <T> {
private T c o n t e n t ;
private boolean empty ;
4
public MVar ( ) {
empty = true ;
}
5
6
7
8
public MVar(T o ) {
empty = f a l s e ;
content = o ;
}
9
10
11
12
13
public synchronized T t a k e ( ) throws I n t e r r u p t e d E x c e p t i o n {
while ( empty ) {
wait ( ) ;
}
14
15
16
17
18
empty = true ;
notifyAll ();
19
20
21
return c o n t e n t ;
22
}
23
24
public synchronized void put (T o ) throws I n t e r r u p t e d E x c e p t i o n {
while ( ! empty ) {
wait ( ) ;
}
25
26
27
28
29
empty = f a l s e ;
notifyAll ();
content = o ;
30
31
32
}
33
34
}
Bemerkungen:
Zeile 1: das <T> ist eine Typvariable, d. h. eine Variable über Typen, die mit Objekttypen gefüllt
werden kann ⇒ wirkt als Polymorphie
Zeile 9: das Argument ist hier ein Objekt o vom Typ T
Zeile 20: hier werden die Schreiber aufgeweckt, da der Puffer jetzt leer ist
Zeile 31: hier werden die Leser aufgeweckt, da der Puffer jetzt wieder voll ist
1. Verbesserung
Unschön an der obigen Lösung ist, dass zuviele Threads erweckt werden, d. h. es werden durch
notifyAll() immer sowohl alle lesenden als auch alle schreibenden Threads erweckt, von denen dann
die meisten sofort wieder schlafen gelegt werden. Können wir Threads denn auch gezielt erwecken?
Ja! Dazu verwenden wir spezielle Objekte zur Synchronisation der take- und put-Threads.
Listing 11: Verwendung von Synchronisationsobjekten zur zielgerichteten Erweckung der Threads
15
1
2
3
public c l a s s MVar <T> {
private T c o n t e n t ;
private boolean empty ;
4
private Ob ject r = new Obj ect ( ) ;
private Ob ject w = new Obj ect ( ) ;
5
6
7
public MVar ( ) {
empty = true ;
}
8
9
10
11
public MVar(T o ) {
empty = f a l s e ;
content = o ;
}
12
13
14
15
16
public T t a k e ( ) throws I n t e r r u p t e d E x c e p t i o n {
synchronized ( r ) {
while ( empty ) {
r . wait ( ) ;
}
17
18
19
20
21
22
synchronized (w) {
empty = true ;
w. n o t i f y A l l ( ) ;
23
24
25
26
return c o n t e n t ;
27
}
28
}
29
}
30
31
public void put (T o ) throws I n t e r r u p t e d E x c e p t i o n {
synchronized (w) {
while ( ! empty ) {
w. wait ( ) ;
}
32
33
34
35
36
37
synchronized ( r ) {
empty = f a l s e ;
r . notifyAll ();
content = o ;
}
38
39
40
41
42
}
43
}
44
45
}
Bemerkungen:
Zeile 5/6: dies sind die beiden Synchronisationsobjekte
Zeie 20: r.wait() darf nur ausgeführt werden, wenn man einen Lock auf r hat
Zeile 23: hier befindet sich maximal 1 Thread und es gilt empty = false
Zeile 23-26 dies stellt einen kritischen Bereich dar, der über r und wait() abgesichert ist
Zeile 38: hier befindet sich maximal 1 Thread und es gilt empty = true
Achtung: take() hält ein Lock auf r und wartet ggf. auf w und put() hält Lock auf w und wartet auf
r (race condition!) ⇒ Gefahr eines Deadlocks! Es ist aber kein Deadlock möglich, da an relevanten
Programmpunkten unterschiedliche Werte für empty vorliegen müssen ⇒ es können keine zwei Threads
gleichzeitig an diesem Punkt sein!
2. Verbesserung
Da wir jetzt schon spezielle Lockobjekte zur Unterscheidung der Leser und Schreiber eingeführt haben,
sollte es doch auch möglich sein, gezielt nur einen Leser bzw. Schreiber zu erwecken, d. h. notify()
16
statt notifyAll() zu verwenden.
1
2
3
public c l a s s MVar <T> {
private T c o n t e n t ;
private boolean empty ;
4
private Ob ject r , w ;
5
6
public MVar ( ) {
empty = true ;
r = new Ob ject ( ) ;
w = new Ob ject ( ) ;
}
7
8
9
10
11
12
public MVar(T o ) {
empty = f a l s e ;
content = o ;
r = new Ob ject ( ) ;
w = new Ob ject ( ) ;
}
13
14
15
16
17
18
19
public T t a k e ( ) throws I n t e r r u p t e d E x c e p t i o n {
synchronized ( r ) {
while ( empty ) {
r . wait ( ) ;
}
20
21
22
23
24
25
synchronized (w) {
empty = true ;
w. n o t i f y ( ) ;
26
27
28
29
return c o n t e n t ;
30
}
31
}
32
}
33
34
public void put (T o ) throws I n t e r r u p t e d E x c e p t i o n {
synchronized (w) {
while ( ! empty ) {
w. wait ( ) ;
}
35
36
37
38
39
40
synchronized ( r ) {
empty = f a l s e ;
r . notify ();
content = o ;
}
41
42
43
44
45
}
46
}
47
48
}
Auf den ersten Blick scheint es auch möglich, dieses Programm weiter zu vereinfachen und anstelle
von while nur ein if zu verwenden. Dies ist aber nicht möglich, wenn man sich nochmal die genaue
Semantik des wait- und notify-Mechanismus anschaut. Bei notify wird ein schlafender Thread
nicht unmittelbar erweckt und weiterlaufen. Vielmehr bewirbt sich der erweckte Thread um den Lock,
welcher zunächst noch vom notify-ausführenden Thread gehalten wird. Der Lock kann ihm aber von
einem gerade neu initiierten Leser (oder Schreiber) weggeschnappt werden, wodurch die Semantik der
MVar verletzt würde und ggf. zwei Threads einen Wert auslesen oder setzen würden.
Nach außen bieten MVars eine gute Abstraktionsstruktur, welche durchaus sinnvoll zur synchronisierten Threadkommunikation eingesetzt werden kannn. [⇒] Verwende in Zukunft besser MVars anstatt synchronized, wait() und notify(). Ähnliche Kommunikationsabstraktionen werden im Paket
java.util.concurrent.atomic zur Verfügung gestellt. Allerdings Java-typisch mit sehr vielen Kontrollmöglichkeiten.
17
Abbildung 2: Verwendung von MVars beim Producer-/Consumer-Problem
Als Anwendungsbeispiel für MVars können wir erneut das Producer-Consumer-Problem betrachten.
Auf den ersten Blick scheint ein einelementiger Puffer nicht besonders geeignet zu sein, da Produzenten
blockiert werden. Dieses Konzept ist in der Praxis aber oft gut geeignet, da auch “automatisch”
eine Lastbalancierung stattfindet. Ein Überangebot führt zu einer Sequentialisierung auf Seiten der
Producer, wodurch mehr Rechenzeit für die Consumer zur Verfügung steht (siehe Abbildung 3).
3.8 Beenden von Threads
Die Methode isAlive() liefert true, falls der Thread noch läuft (auch wenn er gerade suspendiert
ist). Java bietet mehrere Möglichkeiten, Threads zu beenden:
1. Beenden der run()-Methode
2. Abbruch der run()-Methode
3. Aufruf der destroy()-Methode (deprecated, z. T. nicht mehr implementiert)
4. Dämonthread und Programmende
Bei 1. und 2. werden alle Locks freigegeben. Bei 3. werden Locks nicht freigegeben, was diese Methode
unkontrollierbar macht. Aus diesem Grund sollte diese Methode auch nicht benutzt werden. Bei 4.
sind die Locks egal.
Unterbrechung von Threads
Java sieht zusätzlich eine Möglichkeit zum Unterbrechen von Threads über Interrupts vor. Jeder
Thread hat ein Flag, welches Interrupts anzeigt.
Die Methode interrupt() sendet einen Interrupt an einen Thread, das Flag wird gesetzt. Falls
der Thread aufgrund eines Aufrufs von sleep() oder wait() schläft, wird er erweckt und eine
InterruptedException geworfen.
1
2
3
4
5
6
7
8
9
10
synchronized ( o ) {
...
try {
...
o . wait ( ) ;
...
} catch ( I n t e r r u p t e d E x c e p t i o n e ) {
...
}
}
Bei einem Interrupt nach dem Aufruf von wait() wird der catch-Block erst betreten, wenn der Thread
das Lock auf das Objekt o des umschließenden synchronized-Blocks wieder erlangt hat!
Im Gegensatz dazu wird bei der Suspension durch synchronized der Thread nicht erweckt, sondern
nur das Flag gesetzt.
Die Methode public boolean isInterrupted() testet, ob ein Thread Interrupts erhalten hat. public
static boolean interrupted() testet den aktuellen Thread auf einen Interrupt und löscht das
Interrupted-Flag.
Falls man also in einer synchronized-Methode auf Interrupts reagieren möchte, ist dies wie folgt
möglich:
18
Abbildung 3:
1
2
3
4
5
6
synchronized void m ( . . . ) {
...
i f ( Thread . c u r r e n t T h r e a d ( ) . i s I n t e r r u p t e d ( ) ) {
throw new I n t e r r u p t e d E x c e p t i o n ( ) ;
}
}
Falls eine InterruptedException aufgefangen wird, wird das Flag ebenfalls gelöscht. Dann muss man
das Flag erneut setzen!
3.9 Warten auf Ergebnisse
Wenn eine Threadberechnung terminiert, kann sie Ergebnisse anderen Threads zur Verfügung stellen,
z. B. als Attribut oder über spezielle Selektor-Methoden. Um dies zu synchronisieren, könnte man
isAlive() und busy waiting verwenden. D. h. Threads, welche auf das Ergebnis eines anderen Threads
warten, überprüfen, ob der Thread noch lebt, und fahren fort, wenn dies nicht mehr der Fall ist. Dies
verschwendet aber Systemressourcen.
Alternative 1: Kommunikation und Synchronisation über MVars
Alternative 2: join-Methode der Thread-Klasse. Ein Thread, der die join-Methode eines anderen
Threads aufruft, suspendiert, bis der andere Thread terminiert.
1
2
c l a s s Calc extends Thread {
private i n t r e s u l t ;
3
public void run ( ) {
result = . . . ;
}
4
5
6
7
public i n t g e t R e s u l t ( ) {
return r e s u l t ;
}
8
9
10
11
}
12
13
14
15
16
c l a s s showJoin {
void run ( ) {
Calc c a l c = new Calc ;
calc . start ();
17
try {
calc . join ();
System . out . p r i n t l n ( ” R e s u l t i s ”+ c a l c . g e t R e s u l t ( ) ) ;
} catch ( I n t e r r u p t e d E x c e p t i o n e ) { . . . }
18
19
20
21
}
22
23
}
Wie sleep() und wait() kann join() durch einen Interrupt unterbrochen werden. Die Semantik von
join() ist ähnlich:
while(calc.isAlive()) wait(); mit zugehörigem notifyAll(); bei Terminierung.
19
Variante mit Timeout
join(long Nanosekunden)
join(long Nanosekunden, int Millisekunden)
Der Grund für die Wiedererweckung (Terminierung oder Zeitablauf) ist nicht unterscheidbar. Durch
ein zusätzliches isAlive() kann dies herausgefunden werden.
Weitere Konzepte
Lese- und Schreiboperationen auf int und boolean sind atomar. Die Kombination von mehreren aber
nicht, genauso wie dies nicht für long und double gilt. Dies kann ggf. zu inkonsistenten Werten führen
(z. B. wenn die erste Hälfte geschrieben wurde)
Lösung: Synchronisieren oder Werte als volatile deklarieren ⇒ atomare Veränderungen
weiteres Problem: Compileroptimierung
Wir betrachten einen Thread, der den Code in Listing 3.9 ausführt. Wenn nun andere Threads den
Wert der Variablen currentValue verändern, so sollte sich auch die Ausgabe des Threads ändern.
1
2
3
4
5
currentValue = 5;
while ( true ) {
System . out . p r i n t l n ( c u r r e n t V a l u e ) ;
Thread . s l e e p ( 1 0 0 0 ) ;
}
Dies ist aber nicht unbedingt der Fall und es ist möglich, dass weiterhin nur 5 und nicht der neu
gesetzte Wert ausgegeben wird.
Dies liegt an einer Compileroptimierung: Konstantenpropagation. Dadurch wird der Wert der Variablen currentValue vom Compiler direkt in das System.out.println() hinein geschrieben, da der
Wert ja immer gleich ist. Als Lösung sollte die Variable currentValue als volatile deklariert werden,
was eine Compileroptimierung verhindert.
3.10 ThreadGroups
Threads können in Gruppen zusammengefasst werden, was die Strukturierung von Threadstrukturen
ermöglicht. Es handelt sich um eine hierarchische Struktur, was bedeutet, dass ThreadGroups sowohl
Threads als auch ThreadGroups enthalten können.
Prinzipiell gehört jeder Thread zu einer ThreadGroup. Standardmäßig ist dies die gleiche ThreadGroup
wie die des Elternthreads. Die ThreadGroup kann verändert werden mittels
public Thread(ThreadGroup g [, String name]), wobei der Name optional ist.
ThreadGroups können mittels public ThreadGroup(String name) als Untergruppe zur aktuellen
Threadgruppe generiert werden. Sie bieten folgende Methoden:
• getName()
• getParent()
• setDaemon(boolean daemon)
• setMaxPriority(int maxPri)
• int activeCount() ⇒ gibt die Anzahl der Threads aus; die Gruppe bekommt man mittels
isAlive()
• int enumerate(Thread[] threadsInGroup, boolean recursive), wobei das int die
Anzahl der Threads, Thread[] die Threads der Gruppe und boolean = true ist (auch bei
Untergruppen)
Die Semantik ist hier etwas anders als bei Threads: ThreadGroups werden gelöscht, wenn sie leer sind.
Außerdem gibt es SecurityHandler auf Gruppen (z. B. Wer darf Thread in ThreadGroups erzeugen?“)
”
20
ThreadPools
Fast das gleiche Konzept wie ThreadGroups gibt es im Paket java.util.concurrent mit dem ThreadPool noch einmal. Hierbei handelt es sich allerdings um eine Abstraktion zur Verwaltung von laufenden Threads. Sie ermöglichen eine Einsparung von System-Ressourcen durch Wiederverwendung von
Threads.
Executer Interface
Das Executer Interface stellt Möglichkeiten zum Ausführen von Threads im Rahmen von ThreadPools
zur Verfügung:
• void execute(Runnable o) ⇒ nimmt einen Task (das Runnable o / nicht Thread!) in einen
ThreadPool auf; also e.execute(r) mit e als Executer-Objekt und r als Runnable-Objekt statt
new(Thread(r)).start()
• die Klasse Executer stellt statische Methoden zur Generierung von Executoren zur Verfügung:
static ExecuterService newFixedThreadPool(int n) startet den ThreadPool mit n
Threads und stellt eine Verfeinerung von Executer dar.
• Ein neuer Task (mit execute hinzugefügt) wird von einem dieser Threads ausgeführt, falls ein
Thread verfügbar ist. Ansonsten wird er in eine Warteschlange eingefügt und gestartet, falls ein
anderer Task beendet wird ⇒ kein Overhead durch Threadstart. Die Threads werden erst beim
Shutdown beendet.
• Es sind alternative Methoden verfügbar (z. B. zum bedarfsgesteuerten Starten von Threads [mit
Obergrenze] oder dynamischen ThreadPools)
Debugging
Es gibt nur eingeschränkte Infomationen, die für ein printf-Debugging genutzt werden können:
• die Klasse Thread stellt einige Methoden zum Debuggen zur Verfügung:
– public String toString() ⇒ liefert Stringrepräsentation des Thread inklusive Name,
Priorität und ThreadGroup
– public static void dumpStack() ⇒ liefert einen Stacktrace des aktuellen Threads auf
System.out
• für ThreadGroups gibt es:
– public String toString()
– public void list() ⇒ druckt rekursiv die toString()-Ergebnisse aller Threads/ThreadGroups aus
Beurteilung von Java Threads
• Kommunikation über geteilten Speicher (Objekte)
• gute Kapselung des wechselseitigen Ausschlusses in Objekten mittels synchronized
• Synchronisation der Kommunikation mittels wait(), notify() und notifyAll(), was allerdings
sehr ungerichtet ist
• meistens Verwendung von notifyAll() und busy waiting
• viele zusätzliche Konzepte: Prioritäten, Dämonen, sleep()/yield(), Server-Side- und ClientSide-Synchronisation mittels synchronized-Ausdrücken, Interrupts, isAlive(), join(), ThreadGroups, ThreadPools, Synchronisation auf Klassenebene, . . .
• Vererbung und synchronized-Methoden passen nicht gut zusammen
• Kommunikationsabstraktion (MVar, Chan oder entsprechende Konstrukte in
java.util.concurrent) ersetzen inzwischen viele andere Konzepte und ermöglichen eine Programmierung auf höherem Niveau (Message Passing). Auch hier gibt es aber wieder viele Konfigurationsmöglichkeiten (z. B. timeouts).
21
Abbildung 4:
Unbeschränkter Puffer (Chan)
Ein Chan dient zur Kommunikation zwischen Schreibern und Lesern, so dass die Schreiber nie suspendieren.
Implementierung mittels verketteter Liste
Leser müssen ggf. suspendieren, falls der Chan leer ist. Leser und Schreiber müssen synchronisiert
werden. Zur Implementierung (der Zeiger) wollen wir MVars verwenden (siehe Abbildung 4).
Listing 12: Chan-Implementierung
1
2
c l a s s Chan<T> {
private MVar<MVar<ChanElem<T>>> read , w r i t e ;
3
4
private c l a s s ChanElem<T> {
5
private T v a l u e ;
private MVar<ChanElem<T>> next ;
6
7
8
public ChanElem (T v , MVar<ChanElem<T>> n ) {
value = v ;
next = n ;
}
9
10
11
12
13
public T v a l u e ( ) {
return v a l u e ;
}
14
15
16
17
public MVar<ChanElem<T>> next ( ) {
return next ;
}
18
19
20
21
}
22
23
24
25
26
27
public Chan ( ) throws I n t e r r u p t e d E x c e p t i o n {
MVar<ChanElem<T>> h o l e = new MVar<ChanElem<T> >();
r e a d = new MVar<MVar<ChanElem<T>>>(h o l e ) ;
w r i t e = new MVar<MVar<ChanElem<T>>>(h o l e ) ;
}
28
29
30
31
32
33
34
public T r e a d ( ) throws I n t e r r u p t e d E x c e p t i o n {
MVar<ChanElem<T>> rEnd = r e a d . t a k e ( ) ;
ChanElem<T> item = rEnd . r e a d ( ) ;
r e a d . put ( item . next ( ) ) ;
return item . v a l u e ( ) ;
}
35
36
37
38
39
40
41
public void w r i t e (T o ) {
MVar<ChanElem<T>> newHole = new MVar<ChanElem<T>>(),
oldHole = write . take ( ) ;
o l d H o l e . put (new ChanElem<T>(o , newHole ) ) ;
w r i t e . put ( newHole ) ;
}
22
Abbildung 5: Chan-Operationen
42
}
Bemerkungen:
Zeile 30: hier findet eine Synchronisation der Leser statt, da durch read die MVar kurzzeitig leer
wird, was dann andere Leser suspendiert
Zeile 31: liest die erste MVar s und liefert das erste Element (v1 )
Zeile 32: hier wird die Blockade der anderen Leser wieder aufgehoben
Zeile 38: hier werden die anderen Schreiber suspendiert
Siehe Abbildung 4
Beachte: Die Implementierung ermöglicht gleichzeitiges Lesen und Schreiben des nicht-leeren Chans!
Als Erweiterung können wir zum Chan noch eine Methode isEmpty hinzufügen, welche den aktuellen
Zustand des Chans analysiert. Sicherlich muss bei der Verwendung einer solchen Methode bedacht
werden, dass erhaltene Information nur eine sehr begrenzte Gültigkeit hat. Man kann sich aber dennoch
Fälle vorstellen, in denen solch eine Methode nützlich sein kann.
1
2
3
4
5
6
7
public boolean isEmpty ( ) {
MVar<ChanElem<T>> rEnd = r e a d . t a k e ( ) ;
MVar<ChanElem<T>> wEnd = w r i t e . t a k e ( ) ;
w r i t e . put (wEnd ) ;
r e a d . put ( rEnd ) ;
return ( rEnd==wEnd ) ;
}
In der Übung wird die MVar-Implementierung um eine read-Methode, welche die Mvar nicht leert
erweitert. Mit dieser Methode ist eine Vereinfachung dieser Implementierung möglich.
Beachte aber, dass diese Implementierung nicht in allen Situationen das erwartete Verhalten hat.
Durch die Synchronisierung mit anderen lesenden Threads, kann es dazu kommen, dass isEmpty
blockiert, nämlich genau dann, wenn ein anderer Thread bereits von einem leeren Chan lesen will und
hierbei natürlich suspendiert.
Lösungsideen für dieses Problem werden in den Übungen besprochen. Eine weitere Lösung werden wir
bei abstrakteren Konzepten später in dieser Vorlesung kennen lernen.
Noch ein Konzept: Pipes (oder ein alter Schritt in Richtung Message Passing)
Pipes werden meist für Datei- oder Netzwerkkommunikation verwendet. Mit ihnen ist aber auch eine
Kommunikation zwischen Threads möglich. Eine Pipe besteht aus zwei Strömen:
(i) PipedInputStream
(ii) PipedOutputStream
, welche bei der Konstruktion durch:
PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);
oder mittels in.connect(out) verbunden werden können.
23
Pipes haben in der Regel eine beschränkte Größe. Threads, die aus einer leeren Pipe lesen oder in eine
volle Pipe schreiben wollen, werden suspendiert. Das Schreiben erfolgt mittels:
• write(int b)
• write(byte[], int off, int len) ⇒ schreibt b[of f ] . . . b[of f + len]
• flush() ⇒ setzt die gepufferte write-Methode um
• close() ⇒ schließt das Schreibende
das Lesen mittels:
• int read() ⇒ liefert -1, falls die Pipe geschlossen ist
• int read(byte[], int off, int len) ⇒ suspendiert bei einer unzureichenden Zahl an Bytes
auf der Pipe
• int available() ⇒ liefert die Anzahl der Bytes in der Pipe
• close()
Andere Daten als Bytes können mittels DataOutputStream oder PipeWriter übertragen werden. Ein
Vergleich mit Chan und MVar findet in der Übung statt.
4 Nebenläufige Programmierung in Haskell
Auch Haskell bietet in Form der Bibliothek Concurrent Haskell die Möglichkeit nebenläufig zu
programmieren. Hierbei stellt eine MVar, wie wir sie in Java implementiert haben die grundlegende
Datenstruktur dar.
4.1 Concurrent Haskell
Erweiterung von Haskell 98 um Konzepte zur nebenläufigen Programmierung.
Modul: Control.Concurrent. Zur Generierung neuer Threads steht die Funktion
forkIO :: IO () -> IO ThreadId
zur Verfügung. Das Argument vom Typ IO() stellt den abzuspaltenden Prozess dar. Die Aktion ist
sofort beendet. Die ThreadId identifiziert den neu abgespalteten Thread. Ein Thread kann seine eigene
ThreadId mit folgenden Funktion abfragen:
myThreadId :: IO ThreadId
Threads werden bei Terminierung beendet oder mittels killThread::ThreadId->IO() abgeschos”
sen“. Im Gegensatz zu Erlang werden die ThreadIds aber nicht zur Kommunikation verwendet.
Listing 13: Beispiel
1
2
3
4
5
6
7
main : : IO ( )
main = do
id <− f o r k I O ( l o o p 0 )
s t r <− getLine
i f s t r == ” s t o p ”
then k i l l T h r e a d id
e l s e return ( )
8
9
10
11
12
l o o p : : Int−>IO ( )
l o o p n = do
print n
l o o p ( n+1)
Das Beispiel ist nicht ganz so schön, da das gesamte Programm ohnehin terminiert, falls der Hauptthread terminiert.
24
4.2 Kommunikation
Haskell bietet Kommunikationsabstraktionen namens MVar und Chan, wie wir sie auch schon in Java
kennengelernt (und dort auch implementiert) haben.
newEmptyMVar :: IO(MVar a) kreiert eine neue leere MVar
newMVar :: a -> IO(MVar a) kreiert neue gefüllte MVar.
takeMVar :: MVar a -> IO a
putMVar :: Mvar a - > a -> IO()
readMVar :: MVar a -> IO a
tryPutMVar :: Mvar a -> a -> IO Bool
...
Dinierende Philosophen
Listing 14: Dinierende Philosophen
1
2
3
4
5
6
7
8
p h i l : : MVar ( ) −> MVar ( ) −> IO ( )
p h i l l r = do
takeMVar l
takeMVar r
−− e a t
putMVar l ( )
putMVar r ( )
phil l r
9
10
11
12
13
14
main : : IO ( )
main = do
s t i c k s <− c r e a t e S t i c k s 5
startPhils sticks
p h i l ( l a s t s t i c k s ) ( head s t i c k s )
15
16
17
18
19
20
21
c r e a t e S t i c k s : : I n t −> IO ( [ MVar ( ) ] )
c r e a t e S t i c k s 0 = return [ ]
c r e a t e S t i c k s n = do
s t i c k s <− c r e a t e S t i c k s ( n−1)
s t i c k <− newMVar ( )
return ( s t i c k : s t i c k s )
22
23
24
25
26
27
s t a r t P h i l s : : [ MVar ( ) ] −> IO ( )
s t a r t P h i l s [ ] = return ( )
s t a r t P h i l s ( l : r : s t i c k s ) = do
forkIO ( p h i l l r )
startPhils ( r : sticks )
--eat ist ein Kommentar
[_] matched eine ein-elementige Liste
4.3 Kanäle in Haskell
Da die MVar als grundlegende Datenstruktur auch in Haskell vorhanden ist, ist eine Übertragung der
Chan-Implementierung recht einfach möglich:
1
module Chan where
2
3
import C o n t r o l . Concurrent . MVar
4
5
6
data Chan a = Chan (MVar ( Stream a ) ) −− r e a d end
(MVar ( Stream a ) ) −− w r i t e end
7
8
ty pe Stream a = MVar ( ChanElem a )
9
10
data ChanElem a = ChanElem a ( Stream a )
11
25
12
13
14
15
16
17
newChan : : IO ( Chan a )
newChan = do
h o l e <− newEmptyMVar
r e a d <− newMVar h o l e
w r i t e <− newMVar h o l e
return ( Chan r e a d w r i t e )
18
19
20
21
22
23
24
writeChan
writeChan
newHole
oldHole
putMVar
putMVar
: : Chan a −> a −> IO ( )
( Chan
w r i t e ) v = do
<−newEmptyMVar
<− takeMVar w r i t e
o l d H o l e ( ChanElem v newHole )
w r i t e newHole
25
26
27
28
29
30
31
readChan : : Chan a −> IO a
readChan ( Chan r e a d ) = do
rEnd <− takeMVar r e a d
( ChanElem v newReadEnd ) <− readMVar rEnd
putMVar r e a d newReadEnd
return v
32
33
34
35
36
37
38
39
isEmptyChan : : Chan a −> IO Bool
isEmptyChan ( Chan r e a d w r i t e ) = do
rEnd <− takeMVar r e a d
wEnd <− takeMVar w r i t e
putMVar w r i t e wEnd
putMVar r e a d rEnd
return ( rEnd==wEnd)
Die Verwendung von Kanälen ermöglicht ein anderes Programmierparadigma, das so genannte Message Passing. Message Passing wird z. B. in Erlang und Scala zur nebenläufigen und auch verteilten
Programmierung eingesetzt. Im folgenden wollen wir uns zunächst intensiver mit Erlang auseinandersetzen.
5 Erlang
Erlang wurde von der Firma Ericsson entwickelt. Es ist eine funktionale Programmiersprache mit
speziellen Erweiterungen zur nebenläufigen und verteilten Programmierung. Sie wurde gezielt zur
Entwicklung von Telekommunikationsanwendungen entwickelt, wird inzwischen aber auch in anderen
Bereichen zur Entwicklung insbesondere verteilter Anwendungen eingesetzt.
5.1 Sequentielle Programmierung in Erlang
Erlang ist eine funktionale Programmiersprache mit strikter Auswertung, d. h. es werden zuerst die
Argumente ausgewertet und erst anschließend die Funktion aufgerufen, wie in imperativen Sprachen
auch. Erlang ist ungetypt/(dynamisch) getypt ⇒ Laufzeittypfehler für unpassende Basistypen (z. B.
true + 7 ⇒ Widerspruch).
Erlang ist verfügbar unter www.erlang.org und auf den SUN’s unter /home/erlang/bin/erl installiert.
Listing 15: Fakultätsfunktion (factorial) als Beispiel
1
2
f a c ( 0 ) −> 1 ;
f a c (N) when N>0 −> N ∗ f a c (N−1).
Erklärungen:
• “;” trennt die Regeln der Funktionsdefinition
• Variablen werden groß geschrieben
• der guard when N>0 gehört zur Auswahl der Regel mit hinzu
• jede Funktionsdefinition endet mit einem Punkt.
26
Statt let-Ausdrücken bind-once-Variablen:
1
2
3
f a c (N) when N>0 −> N1 = N−1,
F1 = f a c (N1 ) ,
N ∗ F1 .
Erklärungen:
• “,” als Trennzeichen der Sequenzen
• die letzte Zeile ist das Ergebnis der Berechnung, welches automatisch ausgegeben wird
Funktionen werden nach ihrer Stelligkeit unterschieden:
fac(N,M) -> ... ist eine andere Funktion als fac/1, was für eine einstellige Funktion wie z. B. fac(N)
steht. Jedes Erlang-Programm benötigt einen Modulkopf:
• -modul(math). ⇒ das Modul befindet sich in der Datei math.erl
• -export([fac/1]). ⇒ erlaubt den Zugriff von außen
Ablauf:
> erl
...
> c(math).
> math:fac(6).
720
Erklärungen:
• c(math). compiliert das Modul math in der Datei math.erl.
Alternativ kann in der Shell mittels erlc math.erl kompiliert werden.
• halt(). oder Crtl + C“und dann a führt zum Verlassen der Erlang-Umgebung.
”
Datenstrukturen/Werte:
• Atome: a, b, c, 0, 1 ,true, false, ..., ’Hallo’, ’willi@mickey’
⇒ beginnen mit Kleinbuchstaben oder stehen in Hochkommata.
• Tupel: { t1 , . . . , tn } mit beliebigen Werten ti , z. B. Bäume:
{node,{node,empty,1,empty},2,empty}
• Listen: [e|l], wobei e das Kopfelement der Liste ist. l ist eine Restliste und [] steht für die
leere Liste. Als Abkürzung können wir auch [1,2,3] statt [1|[2|[3|[]]]] schreiben.
Beispiel: Konkatenation zweier Listen
app([], Ys) -> Ys;
app([X|Xs], Ys) -> [X|app(Xs, Ys)].
Patern Matching:
• Pat = e, wobei Pat eine der folgenden Formen hat:
– Variable
– Atom
– Tupel {P at1 , . . . , P atn }
– [ P at1 |P ate ]
– []
– [ P at1 , . . . , P atn ]
Das Matching eines Pattern bindet ungebundene Variablen an Teilstrukturen der Werte; z.B:
– {X,[1|Ys]} = {42,[1,2,3]}
das X gebunden wird
[X/42, Ys/[2,3]], wobei X/42 bedeutet, dass die 42 an
– {X,{1,Y},Y} = {3,{1,[]},42}
müsste
matcht nicht, da Y an [] und 42 gebunden werden
– {X,{1,Y},Y} = {3,{1,42},42}
[X/3, Y/42]
Beachte: Die Substitution wird auf alle vorkommenden Variablen angewendet, auch in Pattern.
Ein “Umbinden” ist nicht mehr möglich!
27
Beispiel:
{X,[Y|Ys]} = {42,[1,2]}
[X|Xs] = [3,4]
wobei die zweite Zeile zum Fehler führt, da X bereits an 42 gebunden ist.
5.2 Nebenläufige Programmierung
Erlang-Prozesse sind light-weight, so dass davon gleichzeitig sehr viele existieren können.
Ein Erlang-Prozess besteht aus folgenden drei Bestandteilen:
• Programmterm t, der “ausgewertet” wird
• Prozess-Identifier pid
• Mailbox (Message Queue), die Nachrichten aufnimmt
Starten von Erlang-Prozessen:
• spawn(module, func,[a1 , . . . , an ]) ⇒ startet einen nebenläufigen Prozess, dessen Berechnung
mit module:func(a1 , . . . , an ) startet
• spawn terminiert sofort und liefert den pid des neuen Prozesses zurück
• Beachte: func muss in module exportiert werden (auch wenn spawn im selben Modul aufgerufen
wird)
Alternativ kann man spawn auch eine parameterlose Funktion übergeben, welche dann als separater
Prozess gestartet wird: spawn(fun () -> module:fun(a1 ,. . . ,an ) end). Der Funktionsrumpf muss
hierbei natürlich keine definierte Funktion sein. Es kann ein beliebiger Erlang-Ausdruck sein.
Seinen eigenen pid erhält ein Prozess durch Aufruf von self().
Senden von Nachrichten:
• pid!v ⇒ sendet den Wert v an den Prozess pid. Der Wert wird hinten in die Mailbox von pid
eingetragen.
• pids sind Werte wie Atome oder Listen und können somit auch in Datenstrukturen gespeichert
oder auch verschickt werden
• es ist gewährleistet, dass Nachrichten nicht verloren gehen
Empfangen von Nachrichten: receive-Ausdruck
1
2
3
4
5
6
receive
Pat1 −> e1 ;
...
Patn −> e n
[ a f t e r t −> e ]
end
• die Pattern werden der Reihe nach gegen die Werte der eigenen Mailbox gematcht (exakte
Reihenfolge siehe Übung)
• erstes passendes Pattern P ati wird gewählt (→ Substitution σ). Das gesamte receive-Statement
wird durch σ(ei ) ersetzt. Die Berechnung macht mit ei weiter.
• bei after t -> e findet ein Timeout nach t Millisekunden statt und es geht mit e weiter
Spezialfall: t = 0 ⇒ die Pats werden nur genau einmal gegen alle Mailbox-Einträge gematcht
5.3 Ein einfacher Key-Value-Store
Listing 16: Server des Key-Value-Stores
1
2
3
4
5
6
−module ( dataBase ) .
−e x p o r t ( s t a r t / 0 ) .
s t a r t ( ) −> dataBase ( [ ] ) .
dataBase (KVs) −>
receive
{ a l l o c a t e , Key , P} −>
28
7
8
9
10
11
12
13
14
15
16
case lo oku p ( Key , KVs) o f
n o t h i n g −> P ! f r e e ,
r e c e i v e { v a l u e , V, P} −> dataBase ( [ { Key ,V} | KVs ] )
end ;
{ j u s t ,V} −> P ! a l l o c a t e d ,
dataBase (KVs)
end ;
{ lookup , Key , P} −> P ! lo oku p ( Key , KVs ) ,
dataBase (KVs)
end .
17
18
19
20
l o o k u p (K , [ ] ) −> n o t h i n g ;
l o o k u p (K, [ { K,V} | ] ) −> { j u s t ,V} ;
l o o k u p (K, [ | KVs ] ) −> lo oku p (K, KVs ) ;
Listing 17: Beispiel-Client zum Key-Value-Stores
1
2
3
−module ( c l i e n t ) .
−e x p o r t ( [ s t a r t / 0 ] ) .
−import ( base , [ g e t L i n e / 1 , p r i n t L n / 1 ] )
4
5
6
7
s t a r t ( ) −>
DB = spawn ( database , s t a r t , [ ] ) ,
c l i e n t (DB) .
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
c l i e n t (DB) −>
I n p u t = g e t L i n e ( ” ( l ) ookup / ( i ) n s e r t > ” ) ,
case I nput o f
” i ” −> K = g e t L i n e ( ” key > ” ) ,
DB! { a l l o c a t e , K, s e l f ( ) } ,
receive
f r e e −>
V = getLine ( ” value > ” ) ,
DB! { v a l u e , V, s e l f ( ) } ;
a l l o c a t e d −>
p r i n t L n ( ” key a l l o c a t e d ” )
end ;
” l ” −> K = g e t L i n e ( ” key > ” ) ,
DB! { lookup , K, s e l f ( ) } ,
receive
n o t h i n g −> p r i n t L n ( ”Key not a l l o c a t e d ” ) ;
{ j u s t , V} −> p r i n t L n (V)
end ;
X −> p r i n t L n (X)
end ,
c l i e n t (DB) .
Bemerkungen:
Zeile 3: base ist ein Modul mit IO-Funktionen (wird auf der Web-Site bereit gestellt)
Zeile 6: DB enthält den zugehörigen Prozess-Identifier
Zeile 27: dieser Fall fängt alle “sonst”-Fälle ab. Hier kann auch _ -> 42 stehen.
Beachte: Es können auch mehrere Clienten mit einem Server kommunizieren. Dies entspricht einem
Nameserver, kann aber auch als geteilter Speicher gesehen werden.
Dieser Key-Value-Store stellt zum Einen eine einfache erste Anwendung zur nebenläufigen Programmierung in Erlang dar. Er zeigt aber auch, dass man mit Hilfe eines Server-Prozesses und Message
Passing, das Modell eines geteilten Speichers simulieren kann. Der Schlüssel entspricht hierbei der
Speicheradresse und der Wert dem abgelegten Wert. Im nächsten Schritt sollten wir also untersuchen,
ob auch ähnliche Synchronisationsmechanismen, wie sie bei geteiltem Speicher verwendet werden,
simuliert werden können.
29
5.4 Wie können Prozesse in Erlang synchronisiert werden?
Als Beispiel: Implementierung der dinierenden Philosophen
Idee: Repräsentiere den Zustand eines Sticks innerhalb eines speziellen Stick-Prozesses. Die Funktionen
zum Nehmen und Hinlegen eines Essstäbchens können dann wie folgt definiert werden:
Erlang-Prozesse sind seiteneffektsfrei, bis auf das Senden von Nachrichten!
Listing 18: Essstäbchen
1
2
3
4
5
6
stickDown ( ) −>
receive
{ take , P} −> P ! took ,
stickUp ( ) .
−> stickDown ( )
end .
7
8
9
10
11
s t i c k U p ( ) −>
receive
put −> stickDown ( )
end .
12
13
14
15
16
t a k e ( St ) −> St ! { take , s e l f ( ) } ,
receive
took−> ok
end .
17
18
put ( St ) −> St ! put .
Hierbei ist zu beachten, dass ein entsprechender Stabprozess zwischen den Zuständen stickUp()
und stickDown() hin- und herwechselt. Die take-Nachricht wird aber nur im stickDown()-Zustand
verarbeitet. Bei stickUp() verbleibt sie in der Mailbox. Außerdem wartet der Prozess, der take/2
ausführt, im receive, bis eine entsprechende Bestätigungsnachricht (took) verschickt wurde.
Nun ist es einfach, die Philosophen zu realisieren:
Listing 19: Dinierende Philosophen
1
2
3
4
5
−module ( d i n i n g P h i l s ) .
−e x p o r t ( [ s t a r t / 1 , stickDown / 0 , p h i l / 3 ] ) .
s t a r t (N) when N >= 2 −>
S t i c k s = c r e a t e S t i c k s (N) ,
startPhils ([ l i s t s : last ( Sticks )| Sticks ] , 0).
6
7
c r e a t e S t i c k s ( 0 ) −> [ ] ;
8
9
10
11
12
c r e a t e S t i c k s (N) −>
S t i c k = spawn ( d i n i n g P h i l s , stickDown , [ ] ) ,
S t i c k s = c r e a t e S t i c k s (N−1) ,
[ Stick | Sticks ] .
13
14
s t a r t P h i l s ( [ ] , N) −> ok ;
15
16
17
18
s t a r t P h i l s ( [ SL , SR | S t i c k s ] , N) −>
spawn ( d i n i n g P h i l s , p h i l , [ SL , SR , N] ) ,
s t a r t P h i l s ( [ SR | S t i c k s ] , N+1).
19
20
21
22
23
24
25
26
27
28
29
p h i l ( SL , SR , N) −>
b a s e : p r i n t (N) ,
base : printLn ( ” i s thinking ” ) ,
t i m e r : s l e p (10+N) ,
// #4
t a k e ( SL ) ,
t a k e (SR ) ,
b a s e : p r i n t (N) ,
base : printLn ( ” i s e a t i n g ” ) ,
put ( SL ) ,
put (SR ) ,
30
30
p h i l ( SL , SR , N ) .
Bemerkungen:
Zeile 2: stickDown/0 und phil/3 müssen exportiert werden, damit diese gestartet werden können
Zeile 23: die Zeile ist zur direkten “Deadlockvermeidung” notwendig
An diesem Beispiel erkennt man gut das Prinzip der Synchronisation in Erlang. Als weiteres Beispiel
betrachten wir noch die Implementierung einer MVar in Erlang.
5.5 MVar
Als weiteres Beispiel wollen wir eine MVar in Erlang implementieren. Hierbei können wir genau wie
bei der Stick-Implementierung vorgehen. Wir müssen nur zusätzlich einen Wert als Zustand des MVarProzesses vorsehen.
Listing 20: MVar in Erlang
1
−module (mVar ) .
2
3
−e x p o r t ( [ new/ 0 ,new/ 1 ,mVar/ 1 , t a k e / 1 , put / 2 ] ) .
4
5
new( ) −> spawn (mVar , mVar , [ empty ] ) .
6
7
new(V)−> spawn (mVar , mVar , [ { f u l l ,V } ] ) .
8
9
10
11
12
13
14
mVar( empty ) −> r e c e i v e
{ put , V, P} −> P ! put , mVar( { f u l l ,V} )
end ;
mVar( { f u l l ,V} ) −> r e c e i v e
{ take , P} −> P ! { took ,V} , mVar( empty )
end .
15
16
17
18
19
20
t a k e (MVar) −>
MVar ! { take , s e l f ( ) } ,
receive
{ took ,V} −> V
end .
21
22
23
24
25
26
put (MVar ,V) −>
MVar ! { put , V, s e l f ( ) } ,
receive
put −> ok
end .
Im Gegensatz zur Implementierung der Stäbchen verwenden wir hier keine zwei sich abwechselnd
aufrufenden Funktionen, sondern unterscheiden die Zustände durch zwei unterschiedliche Argumente
(empty und {full,...}).
5.6 Nebenläufige Programmierung in Erlang
Wir haben nun gesehen, wie man geteilten Speicher und Lock-basierte Synchronisation in Erlang
simulieren kann. In der Praxis greift man allerdings selten auf diesen Ansatz zurück. Vielmehr verwendet man ausgezeichnete Server-Prozesse, welche die Anfragen an ein Objekt sequentialisieren und
hierdurch den geteilten Zustand schützen.
5.7 Verteilte Programmierung in Erlang
Nebenläufige Prozesse haben keinen gemeinsamen Speicher und können deshalb problemlos auf mehreren Rechnern verteilt werden. Nachrichten werden automatisch in TCP-Nachrichten umgewandelt
(siehe Abbildung 6). Es ist auch möglich, dass mehrere Knoten auf einem Rechner laufen.
Starten eines Erlang-Knotens: erl -name willi ⇒ da ansonsten keine veteilte Kommunikation
31
möglich ist, da der Prozess nicht bekannt ist
Alternative: erl -sname willi ⇒ analog, funktioniert aber nur im gleichen Subnetz
Starten von Prozessen auf anderen Knoten:
• Voraussetzung: Erlang-Shell muss auf dem anderen Knoten/Rechner laufen
• z. B. DB = spawn(’[email protected]’, database, start,[]) in obigem Beispiel
• allgemein: spawn(Knoten, module, func, args) wobei das funktionale Ergebnis wieder eine PID ist, welche jetzt auch IP-Adresse und Knotennamen mitkodiert. Dies eignet sich zum
Auslagern von Berechnungen (z. B. zur Lastverteilung).
5.8 Verbinden unabhängiger Erlang-Prozesse
In offenen Systemen (z. B. Telefon, Chat) ist es nötig, Verbindungen zur Laufzeit herzustellen. Dazu
ist das globale Registrieren eines Prozesses auf einem Knoten notwendig:
register(name, pid).
Senden an restliche Prozesse: {name, knoten}! msg, was meistens nur für den “Erstkontakt” verwendet wird. Danach findet ein Austausch der PID’s statt (z. B. Database-Server läuft auf Knoten
“servermickey” ⇒ erl -sname server auf mickey).
5.8.1 Veränderungen des Clients
Listing 21: Veränderung des Clients
1
2
3
4
5
s t a r t −>
{ dbServer , ’ s e rv e r @m i c ke y ’ } ! { connect , s e l f ( ) } ,
receive
{ c onnected , DB} −> c l i e n t (DB)
end .
5.8.2 Veränderungen des Servers
Listing 22: Veränderung des Servers
1
2
3
s t a r t ( ) −>
r e g i s t e r ( dbServer , s e l f ( ) ) ,
database ( [ ] ) .
4
5
6
7
8
9
10
11
d a t a b a s e (L) −>
receive
{ a l l o c a t e , . . . } −> . . . ;
{ lookup , . . . } −> . . . ;
{ connect , P} −> P ! { connected , s e l f ( ) } ,
d a t a b a s e (L)
end .
Es sind also nur wenige Veränderungen beim Übergang von nebenläufiger zu verteilter Programmierung notwendig.
Als weiteres Beispiel wollen wir einen verteilten Chat implementieren, der aus einem (registrierten)
Server und beliebig vielen passenden Clients besteht.
Listing 23: Chat-Server
1
2
−module ( c h a t S e r v e r ) .
−e x p o r t ( [ s t a r t / 0 ] ) .
3
4
5
s t a r t ( ) −> r e g i s t e r ( chat , s e l f ( ) ) ,
run ( [ ] ) .
6
7
8
run ( C l i e n t s ) −>
receive
32
Abbildung 6:
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{ connect , Pid , Name} −>
case l oo k up 2 (Name , C l i e n t s ) o f
n o t h i n g −>
Pid ! { connected , names ( C l i e n t s ) , s e l f ( ) } ,
b r o a d c a s t ( C l i e n t s , { l o g i n , Name } ) ,
run ( [ { Pid , Name } | C l i e n t s ] ) ;
{ j u s t , } −>
Pid ! nameOccupied ,
run ( C l i e n t s )
end ;
{ message , Nonsens , Pid } −>
case lo oku p ( Pid , C l i e n t s ) o f
{ j u s t , Name} −> b r o a d c a s t ( C l i e n t s , { msg , Name , Nonsens } ) ,
run ( C l i e n t s ) ;
n o t h i n g −> run ( C l i e n t s )
end ;
{ l o g o u t , Pid } −>
case lo oku p ( Pid , C l i e n t s ) o f
{ j u s t , Name} −> R e m a i n i n g C l i e n t s = remove ( Pid , C l i e n t s ) ,
b r o a d c a s t ( R e m a i n i n g C l i e n t s , { l o g o u t , Name } ) ,
run ( R e m a i n i n g C l i e n t s ) ;
n o t h i n g −> run ( C l i e n t s )
end
end .
33
34
35
36
names ( [ ] ) −> [ ] ;
names ( [ { , Name } | C l i e n t s ] ) −> [ Name | names ( C l i e n t s ) ] .
%names ( C l i e n t s ) −> l i s t s : map( fun ( { ,X})−>X end , C l i e n t s ) .
37
38
39
40
b r o a d c a s t ( [ ] , ) −> ok ;
b r o a d c a s t ( [ { Pid , } | C l i e n t s ] , Msg ) −> Pid ! Msg ,
b r o a d c a s t ( C l i e n t s , Msg ) .
41
42
43
44
l o o k u p ( , [ ] ) −> n o t h i n g ;
l o o k u p (K, [ { K,V} | ] ) −> { j u s t ,V} ;
l o o k u p (K, [ | KVs ] ) −> lo oku p (K, KVs ) .
45
46
47
48
l o o k u p 2 ( , [ ] ) −> n o t h i n g ;
l o o k u p 2 (V, [ { K,V} | ] ) −> { j u s t ,K} ;
l o o k u p 2 (V , [ | KVs ] ) −> l oo k up 2 (V, KVs ) .
49
50
51
52
remove ( , [ ] ) −> [ ] ;
remove (K, [ { K, } | KVs ] ) −> remove (K, KVs ) ;
remove (K, [ KV| KVs ] ) −> [KV| remove (K, KVs ) ] .
Listing 24: Chat-Client
1
2
−module ( c h a t C l i e n t ) .
−e x p o r t ( [ c o n n e c t / 2 , keyboard / 2 ] ) .
33
3
4
5
6
7
8
9
10
11
12
c o n n e c t (Name , Node ) −>
{ chat , Node } ! { connect , s e l f ( ) , Name} ,
receive
{ c onnected , Names , S e r v e r P i d } −>
b a s e : p r i n t ( Names ) ,
spawn ( c h a t C l i e n t , keyboard , [ S e r v e r P i d , Name ] ) ,
run ( S e r v e r P i d ) ;
nameOccupied −> 42
end .
13
14
15
16
17
run ( S e r v e r P i d ) −>
receive
??? −> . . .
end .
18
19
20
21
22
23
24
25
26
keyboard ( S e r v e r P i d , Name) −>
S t r = b a s e : g e t L i n e (Name ) ,
case S t r o f
” bye ” −> S e r v e r P i d ! { l o g o u t , s e l f ( ) } ;
−> S e r v e r P i d ! { message , Str , s e l f ( ) } ,
keyboard ( S e r v e r P i d , Name)
end .
5.9 Robuste Programmierung
In Erlang sterben Prozesse normalerweise unbemerkt; Nachrichten an terminierte Prozesse gehen ohne
Fehlermeldungen verloren.
Prozesse können aber auch mittels link(pid) verbunden werden. Durch einen Link wird eine bidirektionale Verbindung zwischen dem ausführendem Prozess und dem Prozess pid aufgebaut. Stirbt
einer der gelinkten Prozesse (auch bei Terminierung), so wird der andere ebenfalls terminiert. Dies ist
zum Aufräumen bei Prozessende praktisch.
Alternativ ist aber auch der Empfang einer Benachrichtigung anstelle des “kooperativen Mitsterbens”
möglich, falls der andere gelinkte Prozess abstürzt.
Wurde mit folgendem Funktionsaufruf process_flag(trap_exit, true) ein Prozessflag gelöscht,
so erhält man anstelle des “Mitsterbens” eine Nachricht der Form: {’EXIT’, reason, pid}, wobei
reason den Grund für die Prozessterminierung und pid den abgestürzten Prozess angibt.
5.10 Systemabstraktionen
Bei der Entwicklung von Erlang-Anwendungen fällt auf, dass in verteilten Systemen immer wieder
ähnliche Prozessmuster auftreten. Diese wurden als generische Varianten heraus gearbeitet und stehen
als Bibliothek in Erlang zur Verfügung. In der Vorlesung haben wir uns den gen_server etwas genauer
angeschaut. Er eignet sich zur Entwicklung von Client-Server-Systemen. Es werden insbesondere zwei
unterschiedliche Arten der Anfrage von Clienten beim Server unterstützt: synchrone Kommunikation
(wartet auf Antwort; gen_server:call) und asynchrone Kommunikation (kein Warten auf Antwort;
gen_server:cast). Außerdem werden natürlich auch die anderen Erlang-Mechanismen, wie Linking
und Code-Austausch zur Laufzeit unterstützt.
Der Vorteil der Verwendung des Frameworks ist, dass bei der Implementierung des Protokolls weniger
Fehler gemacht werden können und aufbauend noch weitere Abstraktionen, wie z.B. der Supervisiontree, definiert werden können.
Als Beispiel für die Verwendung eines gen_servers realisieren wir nun unseren Chat in diesem Framework:
Listing 25: Chat unter Verwendung des generischen Servers
1
2
−module ( genChat ) .
−b e h a v i o u r ( g e n s e r v e r ) .
34
3
4
5
6
7
−e x p o r t ( [ s t a r t / 0 ] ) .
−e x p o r t ( [ l o g i n / 2 , msg / 2 , l o g o u t / 1 , who / 1 ] ) .
−e x p o r t ( [ i n i t / 1 , h a n d l e c a l l / 3 , h a n d l e c a s t / 2 , h a n d l e i n f o / 2 ,
terminate /2 , code change / 3 ] ) .
8
9
10
s t a r t ( ) −> b a s e : putStrLn ( ” S e r v e r s t a r t e d ” ) ,
g e n s e r v e r : s t a r t l i n k ( { l o c a l , c h a t } , genChat , [ ] , [ ] ) .
11
12
13
i n i t ( ) −> b a s e : putStrLn ( ” S e r v e r i n i t i a l i z e d ” ) ,
{ok , [ ] } .
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
% Asynchronous i n t e r f a c e
h a n d l e c a s t ( { message , Text , P} , C l i e n t s ) −>
case l o oku p (P , C l i e n t s ) o f
nothing
−> ok ;
{ j u s t , Name} −> b r o a d c a s t ( { message , Text , Name} , C l i e n t s )
end ,
{ noreply , C l i e n t s } ;
h a n d l e c a s t ( { l o g o u t , P} , C l i e n t s ) −>
case l o oku p (P , C l i e n t s ) o f
nothing
−> { n o r e p l y , C l i e n t s } ;
{ j u s t , Name} −> NewClients=remove (P , C l i e n t s ) ,
b r o a d c a s t ( { l o g o u t , Name} , NewClients ) ,
{ n o r e p l y , NewClients }
end .
29
30
31
32
33
34
35
36
37
38
39
40
% Synchronous i n t e r f a c e
h a n d l e c a l l ( { l o g i n , Name} , { ClientP , } , C l i e n t s ) −>
base : p r i n t ( ClientP ) ,
b r o a d c a s t ( { l o g i n , Name} , C l i e n t s ) ,
{ reply ,
{ l o g g e d I n , s e l f ( ) , l i s t s : map( fun ( { ,N} ) −> N end , C l i e n t s ) } ,
[ { ClientP , Name } | C l i e n t s ] } ;
h a n d l e c a l l ( who , C l i e n t P , C l i e n t s ) −>
{ reply ,
{ o n l i n e , l i s t s : map( fun ( { ,N} ) −> N end , C l i e n t s ) } ,
Clients }.
41
42
43
44
h a n d l e i n f o ( Msg , C l i e n t s ) −>
b a s e : putStrLn ( ” Unexpected Message : ”++b a s e : show ( Msg ) ) ,
{ noreply , C l i e n t s } .
45
46
t e r m i n a t e ( shutdown , C l i e n t s ) −> ok .
47
48
49
50
c o d e c h a n g e ( OldVsn , S t a t e , E x t r a ) −>
% Code t o change S t a t e
{ok , S t a t e } .
51
52
53
%C l i e n t i n t e r f a c e
l o g i n ( Node , Name) −> g e n s e r v e r : c a l l ( { chat , Node } , { l o g i n , Name } ) .
54
55
msg ( Pid , Msg ) −> g e n s e r v e r : c a s t ( Pid , { message , Msg , s e l f ( ) } ) .
56
57
l o g o u t ( Pid ) −> g e n s e r v e r : c a s t ( Pid , { l o g o u t , s e l f ( ) } ) .
58
59
who ( Pid ) −> g e n s e r v e r : c a l l ( Pid , who ) .
60
61
62
63
64
%h e l p e r f u n c t i o n s
b r o a d c a s t ( , [ ] ) −> ok ;
b r o a d c a s t ( Message , [ { P , } | C l i e n t s ] ) −> P ! Message ,
b r o a d c a s t ( Message , C l i e n t s ) .
65
66
67
remove ( , [ ] ) −> [ ] ;
remove (K, [ { K, } | KVs ] ) −> remove (K, KVs ) ;
35
68
remove (K, [ KV| KVs ] ) −> [KV| remove (K, KVs ) ] .
69
70
71
72
l o o k u p ( , [ ] ) −> n o t h i n g ;
l o o k u p (K, [ { K,V} | ] ) −> { j u s t ,V} ;
l o o k u p (K, [ | KVs ] ) −> lo oku p (K, KVs ) .
Ähnlich funktionieren die generischen Abstraktionen einer Finite-State-Maschine (gen_fsm) und des
Event Handlings (gen_event). Die Prozesse der generischen Prozessabstraktionen können dann auch
noch mit Hilfe der Supervisiontree-Abstraktion strukturiert werden. Hier können dann Strategien, wie
die Anwendung auf den Ausfall einzelner Komponenten reagieren soll, definiert werden. Hierdurch wird
die Entwicklung robuster Anwendungen zusätzlich unterstützt.
6 Grundlagen der Verteilten Programmierung
Computer bilden Knoten in Netzwerken und können miteinander kommunizieren. Wegen der Komplexität der Netzwerktheorie wurden Schichten (Layer) zur Aufgabenverteilung eingeführt. Gängigster
Ansatz: Open Systems Interconnection (OSI)-Modell der International Standards Organisation (ISO)
6.1 7 Schichten des ISO-OSI-Referenzmodells
1. Physical Layer
Stellt den Strom auf der Leitung bzw. die 0 und 1 dar
2. Data Link Layer
Gruppierung von Bits/Bytes in Frames (Startmarken, Endmarken, Checksummen)
Versand zwischen benachbarten Knoten
defekte Frames werden verworfen
3. Network Layer
Bildung von Datenpaketen statt Frames (Netzwerkadresse, Routing)
4. Transport Layer
automatische Fehlererkennung und Fehlerkorrektur
Flusskontrolle zur Begrenzung der Datenmenge (⇒ Vermeidung von Überlast)
5. Session Layer
Anwendung-zu-Anwendung-Verbindung
Kommunikationssitzung, aber auch verbindungslose Kommunikation möglich
6. Presentation Layer
Representation von Daten und ihrer Konvertierung (z. B. Kompression, Verschlüsselung)
7. Application Layer
Anwendungen (z. B. Java-Anwendung)
6.2 Protokolle des Internets
1. Internet Protocol (IP):
• befindet sich auf Schicht 3 (network layer)
• wird sowohl in local area networks (LANs) als auch in wide area networks (WANs) verwendet
• Informationsaustausch zwischen Endknoten (Hosts) mit IP-Paketen oder IP-Datagrammen
• keine Abhängigkeit zwischen Paketen
• verbindungslos
• Versionen: IPv4 (32 Bit-Adressen), IPv6 (128 Bit-Adressen)
• IP-Adresse: z. B. 134.245.248.202 für falbala“
”
• Die IP-Adresse ist für uns eine eindeutige Adresse im Internet. Sie wird beim Routing
verwendet.
36
• Alternativ: Hostnames, Domain Name System (DNS)
“falbala.informatik.uni-kiel.de” wobei “falbala” der Rechnername und
“informatik.uni-kiel.de” die (Sub-) Domain sowie “uni-kiel.de” die Domain sind
2. Transmisson Control Protocol (TCP):
• befindet sich auf Schicht 4 (transport layer)
• garantiert die Zustellung (durch Wiederholung) und richtige Reihenfolge der gesendeten
Bytes
• verwendet IP
• sendet Byte-Sequenz
• sorgt für faire Lastverteilung
• arbeitet mit Ports (in der Regel) von 0 bis 65535
• hält die Verbindung zwischen Hosts
3. User Datagram Protocol (UDP):
• befindet sich auf Schicht 4 (transport layer)
• sendet nur Datenpakete
• arbeitet mit Ports
• keine garantierte Zustellung
• keine garantierte richtige Reihenfolge
• verbindungslos
• schneller als TCP
• ist fast nur ein Wrapper für IP
6.3 Darstellung von IP-Adressen/Hostnames in Java
Die Klasse InetAddress im Paket java.net behandelt sowohl IPv4 als auch IPv6. Es stehen zwei
Klassenmethoden zur Konstruktion zur Verfügung:
• static InetAddress getByName(String hostname) throws
java.net.UnknownHostException mit folgenden Möglichkeiten für den hostname:
– IP-Adresse ⇒ das Objekt wird angelegt, falls die IP-Adresse bekannt ist
– Domain-Name ⇒ DNS-Lookup; das Objekt wird angelegt, falls die IP-Adresse bekannt ist
– “localhost” ⇒ das Objekt wird mit der IP-Adresse 127.0.0.1 angelegt
• getAllByName(String hostname) ⇒ liefert alle IP-Adressen eines Hosts
Zur Ausgabe stehen weiterhin folgende Methoden zur Verfügung:
• String getHostAddress() ⇒ liefert eine IP-Adresse des Hosts
• String getHostName() ⇒ liefert den bei der Konstruktion angegebenen hostname
• boolean equals(InetAddress i) ⇒ vergleicht zwei IP-Adressen
6.4 Netzwerkkommunikation
Der Rechner kommuniziert über sogenannte Ports (durchnummeriert von 0 bis 65535) mit dem Netzwerk. Auf Programmiersprachenseite werden die Ports über Sockets angesprochen, wobei pro Port nur
eine Socketverbindung möglich ist.
6.4.1 UDP in Java
Z. B. nützlich bei DNS-Anfragen, Audio-/Videodaten, Zeitsignalen, bei zeitkritischen Anwendungen.
Eigentlich nur Wrapper für IP-Pakete.
Datagram Packet Klasse:
• repräsentiert UDP-Pakete
37
Abbildung 7:
• Verwendung beim Senden und Empfangen ⇒ unterschiedliche Bedeutung der Adresse:
– Senden ⇒ Zieladresse
– Empfangen ⇒ Sourceadresse (günstig für Antwort)
• Aufbau: siehe Abbildung 7
Konstruktoren:
• DatagramPacket(byte[] buffer, int length) ⇒ zum Empfang von Paketen
• DatagramPacket(byte[] buffer, int length, InetAdress dest_addr,
int dest_port) ⇒ zum Senden
Methoden zum Modifizieren:
getAddress, setAddress, getData, setData, getLength, setLength, getPort, setPort
zur Anbindung an Ports: Datagram Socket Klasse:
• zum Senden und Empfangen von Datagram Paketen
• Aufbau siehe Abbildung 8
• Konstruktoren:
? DatagramSocket() throws SocketException ⇒ nur zum Senden von UDP-Paketen (freier Port wird verwendet und belegt); eine Exception wird geworfen, falls kein freier Port
mehr vorhanden ist
? DatagramSocket(int port) throws SocketException ⇒ zum Senden und Empfangen
geeignet (ist aber eher für Server gedacht); eine Exception wird geworfen, falls der Port
belegt ist
• wichtigste Methoden:
? close() ⇒ Schließen des Sockets und Freigeben des Ports
? getLocalPort, setLocalPort
? getLocalAddress
? getReceiveBuffersize, setReceiveBuffersize ⇒ Abfragen bzw. Setzen der maximalen
Größe des Puffers
? void receive(DatagrammPacket p) throws IOException ⇒ liest UDP-Paket
(gepuffert) und schreibt dieses in p
1. IP- und PortAdresse werden mit Senderadresse und Senderport überschrieben
2. das length-Attribut enthält die tatsächliche Länge, die kleiner oder gleich der Größe
des Pakets ist
3. blockiert, bis das Paket empfangen wurde
Programmierung mit UDP
verwendete Klassen:
38
Abbildung 8:
• DatagramPacket mit receive und send ⇒ das, was übermittelt werden soll
• DatagramSocket
Für receive gibt es auch einen Timeout, welcher mit der Methode int getSoTimeout() gelesen und
mit setSoTimeout(int t) gesetzt werden kann. Die Zeit wird hierbei in Millisekunden angegeben
und definiert, wie lange receive maximal blockiert; falls keine Nachricht in dieser Zeit eintrifft, so
gibt es eine java.io.InterruptedIOException.
Die möglichen Verbindungen können auch eingeschränkt werden:
• connect(InetAddress remoteAddr, int remotePort) ⇒ es sind nur noch Verbindungen mit
dem angegebenen Rechner möglich (Filter für ein- und ausgehende Pakete)
• disconnect
• getInetAddr
• getPort
Statt byte[] können auch Streams angebunden werden. Als Beispiel betrachten wir eine einfache
Client-/Server-Architektur, die einen Inkrementserver zur Verfügung stellt. Der Client schickt einen
Wert hin und erhält den inkrementierten Wert zurück.
1
import j a v a . n e t . ∗ ;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public c l a s s Sender {
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
byte [ ] b = new byte [ 1 ] ;
b [ 0 ] = Byte . p a r s e B y t e ( a r g s [ 0 ] ) ;
DatagramPacket p a c k e t = new DatagramPacket ( b , 1 ) ;
try {
p a c k e t . s e t A d d e s s ( I n e t A d d r e s s . getByName ( ” mickey .
i n f o r m a t i k . uni−k i e l . de ” ) ) ;
packet . setPort ( 6 0 0 0 1 ) ;
DatagramSocket s o c k e t = new DatagramSocket ( ) ;
s o c k e t . send ( p a c k e t ) ;
socket . r e c e i v e ( packet ) ;
System . out . p r i n t l n ( ” R e s u l t : ”+b [ 0 ] ) ;
} catch ( E x c e p t i o n e ) { System . out . p r i n t l n ( ”Bad I n t e r n e t c o n n e c t i o n ” ) ; }
}
}
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public c l a s s R e c e i v e r {
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
byte [ ] b = new byte [ 1 ] ;
b [ 0 ] = 42;
try {
DatagramPacket p a c k e t = new DatagramPacket ( b , 1 ) ;
DatagramSocket s o c k e t = new DatagramSocket ( 6 0 0 0 1 ) ;
while ( b [ 0 ] ! = 0 ) {
socket . r e c e i v e ( packet ) ;
System . out . p r i n t l n ( ” R e c e i v e d : ”+b [ 0 ] ) ;
b[0]++;
s o c k e t . send ( p a c k e t ) ;
}
} catch ( E x c e p t i o n e ) { System . out . p r i n t l n ( ”Bad I n t e r n e t c o n n e c t i o n ” ) ; }
39
}
34
35
}
Bemerkungen:
Zeile 5: Byte-Array mit einem Eintrag
Zeile 6: parseByte(...) übersetzt das Hauptargument als Byte (-128 bis 127)
Zeile 7: das erste Argument ist das Byte-Array; das zweite Argument die Packetlänge
Zeile 13: Anfrage an den Server
Zeile 14: Abwarten der Antwort (blockiert bis irgendein Packet ankommt)
Zeile 23: der Inhalt ist hier egal, da als Erstes etwas empfangen und der Inhalt dabei überschrieben
wird
Zeile 26: dient der Definition des zu hörenden Ports (“Server-Port” des Dienstes)
Zeile 27: Beenden des Receivers über java Sender -1 möglich
Zeile 28: blockiert, bis eine Anfrage reinkommt
Zeile 30: Inkrement-Service
Zeile 31: Absenden der Antwort (Adresse und Port sind durch den Paketeingang bereits definiert;
Verwendung der Adresse des Senders für die Antwort)
Problem: falls der Sender keine Antwort bekommt, so blockiert dieser dauerhaft ⇒ eine Lösungsmöglichkeit wäre ein Timeout ggf. mit erneuter Anfrage (unsichere Kommunikation).
Transmission Control Protocol (TCP)
Eigenschaften:
• verbindungsorientiertes Protokoll
• sichere Übertragung ist garantiert (es gehen keine Pakete verloren)
• Versenden von Datenströmen ⇒ werden automatisch in TCP-Pakete aufgebrochen
• richtige Reihenfolge ist garantiert
• die Bandbreite wird zwischen mehreren TCP-Verbindungen fair geteilt
• TCP unterscheidet zwischen Client und Server einer Verbindung
• der TCP-Client initiiert eine Verbindung
• der TCP-Server antwortet auf diese Verbindung ⇒ bidirektionale Verbindung ist etabliert
• TCP-Sockets in java.net ⇒ verbinden zwei Ports auf (eventuell) unterschiedlichen Rechnern
(kann auch für UDP genutzt werden)
Konstruktoren:
• Socket(InetAddr addr, int port) throws java.net.IOException ⇒ stellt Verbindung zu
addr und port her, wobei als lokaler Port dieser Verbindung ein freier Port verwendet wird
• Socket(InetAddr addr, int port, InetAddr localAddr, int localPort) throws
java.net.IOException ⇒ analog wie oben, aber mit festem lokalem Port (→ besser nicht
verwenden, da ein Fehler auftritt, falls Port besetzt ist)
• Socket(String host, int port) throws java.net.IOException
Beachte: Socket blockiert, bis die Verbindung aufgebaut ist.
Methoden:
• close() ⇒ schließt die Verbindung (die Daten sollten vorher “geflushed” werden, d. h. der Puffer
sollte geleert und die Daten übertragen werden)
• InetAddr getInetAddress() ⇒ liefert remoteAddress
• int getPort() ⇒ liefert den remotePort
• getLocalInetAddress
• getLocalPort
• InputStream getInputStream()
40
• OutputStream getOutputStream()
• es ist auch eine Konfiguration der Socket-Verbindung (z. B. Timeout, etc.) möglich
Lesen und Schreiben geschieht über die Streams.
Listing 26: Im Beispiel verbindet sich der Client mit dem Server und empfängt eine Nachricht
1
2
import j a v a . n e t . ∗ ;
import j a v a . i o . ∗ ;
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public c l a s s D a y t i m e C l i e n t {
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
try {
S o c k e t s o c k e t = new S o c k e t ( ” mickey ” , 6 0 0 0 1 ) ;
System . out . p r i n t l n ( ” Connection e s t a b l i s h e d ” ) ;
B u f f e r e d R e a d e r r e a d e r = new B u f f e r e d R e a d e r (
new InputStreamReader ( s o c k e t . g e t I n p u t S t r e a m ( ) )
);
System . out . p r i n t l n ( ” R e s u l t : ”+r e a d e r . r e a d L i n e ( ) ) ;
socket . close ( ) ;
} catch ( IOException i o e ) { System . out . p r i n t l n ( ” E r r o r : ”+i o e ) ; }
}
}
TCP-Server: Klasse ServerSocket
Konstruktoren:
• ServerSocket(int port) throws java.io.IOException ⇒ “horcht” auf dem Port
• ServerSocket(int port, int NumberOfClients) throws java.io.IOException,
wobei numberOfClients die Größe der Backlog-Queue (Anzahl der wartenden Clients; default:
50) darstellt
Zum Akzeptieren von Verbindungen:
• Socket accept() throws java.io.IOException ⇒ wartet auf den Client und akzeptiert diesen → Verbindung
• close() ⇒ schließt den ServerSocket
• getSoTimeout()
• setSoTimeout(int ms) → Setzen eines Timeout für das accept
Listing 27: Server des Beispiels
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public c l a s s DayTimeServer {
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
try {
S e r v e r S o c k e t s e r v e r = new S e r v e r S o c k e t ( 6 0 0 0 1 ) ;
while ( true ) {
Socket nextClient = s e r v e r . accept ( ) ;
OutputStream out = n e x t C l i e n t . getOutputStream ( ) ;
new P r i n t S t r e a m ( out ) . p r i n t (new j a v a . u t i l . Date ( ) ) ;
out . c l o s e ( ) ;
nextClient . close ( ) ;
// Ende d e r S o c k e t −Verbindung
}
} catch ( BindException be ) {
System . out . p r i n t l n ( ” S e r v i c e a l r e a d y r u n n i n g on p o r t 60001 ” ) ;
}
catch ( IOException i o e ) { System . out . p r i n t l n ( ” I /O e r r o r : ”+i o e ) ; }
}
}
6.5 Socket-Optionen
• Zugriff/Einstellung über Socket-Methoden, d. h. Methoden der Klasse Socket
• SO_KEEPALIVE: setKeepAlive(true) ⇒ regelmäßiges Pollen (Anfragen) zum Verbindungstest
41
• SO_RCVBUF: setReceiveBuffersize(size) ⇒ Betriebssystempuffer wird in seiner Größe beeinflusst (size ist in Byte)
• SO_SNDBUF: setSendBuffersize(size) ⇒ Betriebssystempuffer wird in seiner Größe beeinflusst
(size ist in Byte)
• SO_LINGER: setSoLinger(true,50) ⇒ auch beim Schließen des sockets wird noch für 50 Millisekunden versucht, die Daten zu senden
• TCP_NODELAY ⇒ Veränderung des TCP-Protokolls, so dass keine Transmissionskontrolle (Warten, bis ein Paket voll ist, bevor es gesendet wird) mehr stattfindet
ACHTUNG: die Performance kann sinken
• SO_TIMEOUT: setSoTimeout(int time) ⇒ Leseoperation versucht time Millisekunden zu lesen.
Eventuell tritt eine InterruptedIOException auf, falls keine Antwort kommt (time=0 entspricht
keinem Timeout).
Listing 28: Beispiel
1
2
3
4
5
6
7
try {
s . setSoTimeout ( 2 0 0 0 ) ;
. . . l e s e aus S o c k e t s . . .
} catch ( I n t e r r u p t e d I O E x c e p t i o n i o e ) {
System . e r r . p r i n t l n ( ” S e r v e r a n t w o r t e t n i c h t ! ” ) ;
}
catch ( IOException i o ) { . . . }
6.6 Sockets in Erlang
In der Vorlesung haben wir die TCP-Kommunikation über Sockets in Erlang kennen gelernt. Die Prinzipien sind natürlich identisch, mit denen in Java. Als Beispielanwendung haben wir einen einfachen
Tupelserver entwickelt, welchen wir später auch als Registry verwenden können:
Listing 29: TCP-Version eines Key-Value-Servers in Erlang
1
2
−module ( t c p ) .
−e x p o r t ( [ s e r v e r / 0 , c l i e n t / 0 , r e q u e s t H a n d l e r / 3 ] ) .
3
4
5
6
7
8
s e r v e r ( ) −>
{ok , LSock } = g e n t c p : l i s t e n ( 5 0 4 2 , [ l i s t , { packet , l i n e } ,
{ r e u s e a d d r , true } ] ) ,
DB = spawn ( k e y V a l u e S t o r e , s t a r t , [ ] ) ,
acceptLoop ( LSock ,DB) .
9
10
11
12
13
14
acceptLoop ( LSock ,DB) −>
spawn ( tcp , r e q u e s t H a n d l e r , [ LSock ,DB, s e l f ( ) ] ) ,
receive
nex t −> acceptLoop ( LSock ,DB)
end .
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
r e q u e s t H a n d l e r ( LSock ,DB, Parent ) −>
{ok , Sock } = g e n t c p : a c c e p t ( LSock ) ,
Parent ! next ,
receive
{ tcp , Sock , S t r } −>
case l i s t s : s p l i t w i t h ( fun (C) −> C/=44 end , l i s t s : d e l e t e ( 1 0 , S t r ) ) o f
{ ” s t o r e ” , [ | Arg ] } −>
{Key , [ | Value ] } = l i s t s : s p l i t w i t h ( fun (C) −> C/=44 end , Arg ) ,
DB! { s t o r e , Key , Value } ;
{ ” loo kup ” , [ | Key ] } −> DB! { lookup , Key , s e l f ( ) } ,
receive
Ans −> g e n t c p : send ( Sock , b a s e : show ( Ans)++” \n” )
end
end ,
g e n t c p : c l o s e ( Sock ) ;
Other −> b a s e : p r i n t ( Other )
42
Abbildung 9:
32
end .
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
c l i e n t ( ) −>
case b a s e : g e t L i n e ( ” ( s ) t o r e , ( l ) ookup : ” ) o f
” s ” −> Key = b a s e : g e t L i n e ( ”Key : ” ) ,
Value = b a s e : g e t L i n e ( ” Val : ” ) ,
{ok , Sock } = g e n t c p : c o n n e c t ( ” l o c a l h o s t ” , 5 0 4 2 , [ l i s t , { packet , l i n e } ] ) ,
g e n t c p : send ( Sock , ” s t o r e , ”++Key++” , ”++Value++” \n” ) ,
g e n t c p : c l o s e ( Sock ) ;
” l ” −> Key = b a s e : g e t L i n e ( ”Key : ” ) ,
{ok , Sock } = g e n t c p : c o n n e c t ( ” l o c a l h o s t ” , 5 0 4 2 , [ l i s t , { packet , l i n e } ,
{ active , false }]) ,
g e n t c p : send ( Sock , ” lookup , ”++Key++” \n” ) ,
{ok , Res } = g e n t c p : r e c v ( Sock , 0 ) ,
b a s e : putStrLn ( Res ) ,
g e n t c p : c l o s e ( Sock )
end .
6.7 Verteilung von Kommunikationsabstraktionen
Nachdem wir nun verstanden haben, wie eine verteilte Kommunikation über TCP funktioniert, können
wir uns überlegen, wie Kommunikationsabstraktionen transparent in ein verteiltes System eingebettet
werden können. Hierbei ist es egal, ob es sich um Pids in Erlang, Remoteobjekte bei RMI oder
Ähnliches handelt. Man benötigt eine externe Darstellung des entsprechenden Objekts im Netzwerk,
welche über IP-Adresse des Rechners, einen Port und ggf. noch eine weitere Identifikationsnummer,
einen Zugriff auf das entsprechende Objekt von Remote ermöglicht.
Wird ein entsprechendes (lokales) Objekt serialisiert, wandelt man es in seine Remote-String-Darstellung um und verschickt diese. Alle Kommunikation auf ein Remote-Objekt wird dann über TCP
serialisiert, wodurch entsprechend auch wieder andere Kommunikationsabstraktionen verteilt werden
können.
Als Beispiel hierfür haben wir in Vorlesung und Übung Channels in Java, Erlang bzw. Haskell für
einen Zugriff von Remote erweitert. Hierbei haben wir insbesondere die folgenden Aspekte diskutiert
und realisiert:
• Verwendung des Key-Value-Stores als Registry
• Schreiben eines Channels von Remote
• Optimierung der Kommunikation durch Zusammenlegen der Kommunikation auf einen Port
• Garbage Collection von nicht mehr verwendeten Channels
• Lesen aus einem remote Channel
• Realisierung von Linking
7 Web-Applikationen
• siehe Abbildung 9
• Hyper Text Tranfer Protocoll (HTTP) auf der Schicht des Application Layers
• HTTP 1.0 und HTTP 1.1 sind Client/Server wechselseitig kompatibel
43
Abbildung 10:
7.1 HTTP-Client
• es gibt drei Anfragetypen:
– GET file ⇒ fordert file an
– HEAD file ⇒ fordert nur den HTTP-Kopf von file an; z. B. zum Vergleich mit Datum
einer gecachten Version oder zur Ermittlung der Größe
– POST file content ⇒ wie GET, aber zusätzlich wird der content (url-encoded) übermittelt
7.2 Client-Request-Header
• Cookie-Feld: später
• From-Feld: E-Mail-Adresse (meistens nicht implementiert)
• If-Modified-Since-Feld: meistens mit Datum der gecachten Version wie GET/HEAD entsprechend
• Referrer-Feld: URL, welche den Link auf den Request enthielt
• User-Agent-Feld: gibt an, welcher Browser verwendet wird
• Host-Feld: angefragter Rechner (IP-Adresse oder Domain)
7.3 Webserver-Antwort
• Statuszeile: Fehlercodes (z. B. 200 erfolgreich)
• Antwortfelder:
– location: location des Servers
– server: Servertyp
– content-length: Größe des Inhalts
– content-type: Typ des Inhalts als MIME-Typ (z. B. text/html)
– Expires: Gültigkeit des gecachten Inhalts
– last-modified: Datum der letzten Änderung für Proxies/Caches
– Pragma: Verhinderung von jeglichem Caching (z. B. bei dynamischen Web-Siten)
– Set-Cookie: später
– Entity-Body: Bytefolge der Größe Content-Length; Interpretation gemäß Content-Type
7.4 Common Gateway Interface (CGI)
Idee:
• dynamisch Webseiten durch Programme generieren
• Benutzereingaben im Browser können im Server verarbeitet werden.
• das CGI-Skript kann in unterschiedlichen Sprachen geschrieben werden:
z. B.: PHP, Perl, Erlang, ...
44
Abbildung 11:
7.4.1 GET-Methode
Der Anfrage-String wird an die URL angehängt, z. B. http://www.server.de/cgi-bin/inc.cgi?
41. Hierbei ist der Teil hinter dem Fragezeichen ein beliebiger URL-encodeter String, dessen Länge
beschränkt ist. Die Anfrage wird gestartet durch das Eintippen der URL in den Browser oder durch
Forms im HTML-Code:
1
2
3
4
5
<form a c t i o n=” u r l ” method=get>
Value :
<i n p u t name=” v a l u e ” v a l u e=” 0 ”>
<i n p u t type=” submit ” v a l u e=” I n c ”>
</form>
method=get ist hierbei optional und kann ggf. durch post ersetzt werden.
Der Value des Submit-Buttons wird übergeben, falls dieses einen Namen hat.
Beim Drücken des Inc-Buttons übermittelt der Browser: url?value=0. Bei mehreren Eingabefeldern
mit name="value": url?value1=0&value2=1&....
& trennt die unterschiedlichen Felder voneinander. Das URL-Skript unter url wird gestartet. Die Parameter befinden sich in der Umgebungsvariablen QUERY_STRING(unix) und die Länge in QUERY_LENGTH.
Listing 30: passendes CGI-Skript in Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public c l a s s Counter {
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
S t r i n g parameter = System . g e t P r o p e r t y ( ”CGI . QUERY STRING” ) ;
int value = I n t e g e r . p a r s e I n t (
p a r a m e t e r s u b s t r i n g ( 6 , parameter . l e n g t h ( ) )
);
System . out . p r i n t l n ( ” Content−type : t e x t / html \n \n”+
”<html><head></head><t i t l e ></ t i t l e >\n”+
”<body> \n”+
”<form a c t i o n =\” h t t p : / /www. s e r v e r . de / c g i −b i n / i n c . c g i \”
method=get >\n”+
” Value : <i n p u t name=\” v a l u e \” v a l u e =\””+( v a l u e +1)+”\”> \n”+
” i n p u t type =\”submit \” v a l u e =\” I n c \”> \n” ) +
”</form> \n” +
”</body> \n” +
”</html> \n”
);
}
}
Bemerkungen:
Zeile 3: Java hat eigene Umgebungsvariablen, die später von außerhalb noch gesetzt werden müssen
Es wäre besser, zuerst eine Datenstruktur aufzubauen und dann mit ToString umzuwandeln. Bei
jedem Drücken des Inc-Buttons soll der Zähler erhöht werden.
Java-Programme sind nicht direkt ausführbar ⇒ kleines Startskript:
1
2
3
inc . cgi :
#!/ b i n / bash
j a v a B i n P l a t z / j a v a −DCGI . QUERY STRING= ’$QUERY STRING ’ Counter
Bemerkungen:
Zeile 3: vor Counter muss ggf. noch -classpath filepath angegeben werden.
Bei BinPlatz kommt hin, was bei which java ausgegeben wird. Die Hochkommata bewirken,
dass der Inhalt nur als Zeichenfolge interpretiert und nicht ausgeführt wird.
45
7.4.2 POST-Methode
Übersenden beliebig langer Werte:
1
<form a c t i o n=” u r l ” method=post >
Der Browser schickt zunächst nur einen Request ohne Parameter. Danach werden die Parameter separat übermittelt. Das CGI-Skript erhält die Parameter über die Standardausgabe. Die Länge bekommt
man (falls benötigt) über CONTENT_LENGTH.
Listing 31: Counter-Beispiel in Java
1
2
3
4
5
6
7
8
int value = 0 ;
try {
i n t l e n g t h = I n t e g e r . p a r s e I n t ( System . g e t P r o p e r t y ( ”CGI .CONTENT LENGTH” ) ) ;
byte [ ] i n = new byte [ l e n g t h ] ;
i n t n = System . i n . r e a d ( i n ) ;
S t r i n g parameter = new S t r i n g ( i n ) ;
v a l u e = I n t e g e r . p a r s e I n t ( parameter . s u b s t r i n g ( 6 , l e n g t h ) ) ;
} catch ( E x c e p t i o n e ) {}
7.4.3 Wann sollte welche Methode verwendet werden?
POST:
• bei beliebig großen Parametern
• sind nur per Button in Form erzeugbar
GET:
• auch Parameterweitergabe per Link möglich
Es sind auch Zwitter-CGI-Skripte möglich. Die Umgebungsvariable: REQUEST_METHOD ist entweder GET
oder POST und gibt an, welche Übergabemöglichkeit verwendet wurde.
weitere Elemente in Forms sind: Radiobuttons, Checklisten, Textfelder, Auswahllisten, Hidden Fields
bisher: FORM-Parameter ⇒ HTML-Dokument
Es gibt keinen Zustand, da jedes CGI-Skript immer terminiert. Daten zwischen mehreren Programmläufen
können über das Filesystem ausgetauscht werden.
Listing 32: Ein globaler Zähler als Beispiel
1
2
3
4
5
6
7
8
9
int value = 0 ;
try {
F i l e R e a d e r i n F i l e = new F i l e R e a d e r ( ” v a l u e . dat ” ) ;
value = i n F i l e . read ( ) ;
inFile . close ();
F i l e W r i t e r o u t F i l e = new F i l e W r i t e r ( ” v a l u e . dat ” ) ;
o u t F i l e . w r i t e ( v a l u e +1);
outFile . close ( ) ;
} catch ( E x c e p t i o n e ) {}
Dann ist ein Zählen ohne Eingabefeld im Browser möglich. Mehrere CGI-Skripte können gleichzeitig
ausgeführt werden ⇒ Synchronisation ist notwendig, da die Datei geteilte Ressource ist. Unix bietet
hierfür folgende Möglichkeit: File-Semaphoren
lockfile filename
Semantik: Falls die Datei filename existiert, dann suspendiere bis die Datei nicht mehr existiert.
Ansonsten lege die Datei an. Dies ist in Kombination mit rm wie einer Semaphore mit P als lockfile
und V als rm möglich. Hierbei entstehen alle Gefahren der lockbasierten nebenläufigen Synchronisation
(z. B. Deadlocks). Gefährlich ist insbesondere auch ein Programmabsturz ohne Lockfreigabe.
Bei größeren Systemen gibt es weitere Probleme: nebenläufige Datenmanipulation (siehe Abbildung
12) Beachte: Locks lösen dieses Problem nicht. Zustandsinkonsistenz muss spätestens beim Schreiben
erkannt werden.
46
Abbildung 12:
7.5 Sessionmanagement
Z. B. für Warenkörbe, Benutzerinteraktion, aber auch zum Erstellen von Benutzerprofilen
Zunächst gab es nur Formulare (hidden-fields) für interaktive Webseiten.
Browserhersteller entwickelten dann den Cookie-Mechanismus: der Server kann Informationen im
Browser als Cookie speichern und erhält diese später zurück
Setzen eines Cookies:
Server Responce-Header-Field:
Set-Cookie:user = Frank; {mehrere möglich} vor dem Content-Type
Zu jeder weiteren Anfrage fügt der Browser alle gültigen Cookies mit Werten hinzu. Ein CGI-Skript
kann die Cookies aus der Umgebungsvariable HTTP_COOKIE auslesen.
Cookie-Parameter:
• expires = DATE ⇒ falls dies nicht gesetzt ist, läuft das Cookie nach Ablauf des Sessionendes
ab
• domain = DOMAINNAME ⇒ normal ist nur der Versand an den HOST, der das Cookie gesetzt hat.
Hiermit ist eine Erweiterung möglich, nicht aber auf de.
• path = PATH ⇒ normalerweise werden Cookies nur an den gleichen Pfad geschickt. Eine Verallgemeinerung ist bis auf "/" möglich.
• secure ⇒ versende nur über sichere Verbindung (zur Zeit nur https)
Cookies können auch verwendet werden, um Zustände beim Clienten zu speichern. Aber im Gegensatz
zu Forms (mit hidden Fields) sind nur lineare Zustandsfolgen möglich. Ebenfalls sind nicht mehrere
Fenster mit dem gleichen Skript möglich.
7.6 Beurteilung von CGI
CGI ist ein einfacher und sprachunabhängiger Ansatz.
Nachteile:
• stdout-Ausgabe verführt zu hässlichem Programmierstil
• hohe Latenzzeit durch immer wiederkehrende Programmstarts (insbesondere bei Interpretern,
JVM)
Lösung:
• Fast CGI ⇒ Server für CGI-Skript, der alle Anfragen bearbeitet
• PHP ⇒ Interpreter, der direkt in den Webserver eingebunden ist
• Java Servlets
8 Synchronisation durch Tupelräume
Ansatz ohne Kommunikation zwischen zwei Partnern (Server/Client)
Linda: Kommunikation durch Bekanntmachen und Abfragen von Tupeln. Das grundlegende Modell
sieht wie folgt aus:
• zentraler Speicher (ggf. auch mehrere)
Tupelraum als Multimenge von Tupeln
• viele unabhängige Prozesse, die Tupel in den Tupelraum hinzufügen oder Tupel aus diesem lesen
(entfernen) können
47
• sprachunabhängig: Linda ist ein Kommunikationsmodell, das zu verschiedenen Sprachen hinzugefügt werden kann (z. B. C-Linda, Fortran-Linda, Scheme-Linda, Java-Spaces)
Hierbei basiert Linda auf folgenden Grundoperationen:
• out(t) ⇒ fügt das Tupel t in den Tupelraum ein. Berechnungen in t werden vor dem Einfügen
ausgewertet.
Beispiel: out(("hallo",1+3,6.0/4.0)) fügt das Tupel ("hallo",4,1.5) in den Tupelraum
ein.
• in(t) ⇒ entfernt das Tupel t aus dem Tupelraum, falls t dort vorhanden ist. Ansonsten suspendiert es, bis t im Tupelraum eingefügt wurde. t kann auch Variablen enthalten (Pattern
Matching). Diese parsen immer und werden an entsprechende Werte aus dem Tupelraum gebunden.
Beispiel: in(("hallo",?i,?f)) entfernt obig eingefügtes Tupel und bindet i/4 und f/1.5
• rd(t) ⇒ wie in(t), aber das Tupel wird nicht entfernt
• eval(stmt) ⇒ erzeugt einen neuen Prozess, der stmt auswertet
• inp(t), rdp(t) ⇒ wie in, rd, aber ohne Blockade, falls das Tupel nicht vorhanden ist. Als
Rückgabewert erhält man einen boolschen Wert. Dieser ist false, falls t nicht im Tupelraum
vorhanden ist.
• Beispiel: zwei Prozesse, die abwechselnd aktiv sind (ping-pong) in Linda
1
2
3
4
5
6
1
2
3
4
5
6
ping ( ) {
while ( true ) {
out ( ” p i n g ” ) ;
i n ( ” pong ” ) ;
}
}
pong ( ) {
while ( true ) {
in ( ” ping ” ) ;
out ( ” pong ” ) ;
}
}
Listing 33: Hauptprogramm
1
2
3
4
main ( ) {
eval ( ping ( ) ) ;
e v a l ( pong ( ) ) ;
}
Es ist aber auch möglich, Datenstrukturen durch Tupel darzustellen:
Interpretiere bestimmte Werte als Referenzen im Speicher (Tupelraum), z. B. durch Integer-Werte.
Dann ist z. B. ein Array kodierbar als:
{("A",1,r1),("A",2,r2),...,("A",n,rn);...}
Der Zugriff auf einzelne Arraykomponenten kann dann ermöglicht werden. Z. B. können wir den 4.
Index im Array wie folgt mit drei multiplizieren:
in("A",4,?e);
out("A",4,3*e);
Anwendung: verteilte und parallele Bearbeitung komplexer Datenstrukturen
Listing 34: Rechenoperation auf jedem Element
1
2
3
4
5
f o r ( i =1; i<=n ; i ++) { e v a l ( f ( i ) ) ; }
f ( int i ) {
i n ( ”A” , i , ? r ) ;
out ( ”A” , i , compute ( r ) ) ;
}
48
Dies ist nur sinnvoll, wenn compute aufwendig ist und die Verteilung auf mehrere Prozessoren möglich
ist.
Formulierung von Synchronisationsproblemen: Kodierung der dinierenden Philosophen
1
2
3
4
5
6
7
8
9
10
p h i l ( int i ) {
while ( true ) {
think ( ) ;
in ( ” stab ” , i ) ;
i n ( ” s t a b ” , ( i +1)% j ) ;
eat ( ) ;
out ( ” s t a b ” , i ) ;
out ( ” s t a b ” , ( i +1)% j ) ;
}
}
Listing 35: Initialisierung
1
2
3
4
f o r ( i =0; i <s ; i ++) {
out ( ” s t a b ” , i ) ;
eval ( phil ( i ) ) ;
}
Eine Client/Server-Kommunikation ist auch modellierbar
Idee: Zur Zuordnung der Antworten zu Anfragen werden von dem Server eindeutige ID’s verwendet:
1
2
3
4
5
6
7
8
9
10
1
2
3
4
5
6
7
1
2
3
4
5
6
server () {
i n t i =1;
eval ( idserver ( 1 ) ) ;
while ( true ) {
in ( ” request ” , i , ? req ) ;
...
out ( ” r e s p o n s e ” , i , r e s ) ;
i ++;
}
}
i d s e r v e r ( int i ) {
while ( true ) {
i n ( ”newID” ) ;
out ( ”newID” , i ) ;
i ++;
}
}
Client () {
int id ;
out ( ”newID” ) ;
i n ( ”newID” , ? i d ) ;
out ( ” r e q u e s t ” , ? id , 1 ) ;
i n ( ” r e s p o n s e ” , id , ? r e s 1 ) ;
8.1 Java-Spaces
Linda in Java
out t
space.write(Entry e, Transaction tr, long lease) mit:
• Entry e als Interface für Einträge in den Tupelspace
• Transaction tr als Transaktionskonzept zur atomaren Ausführung mehrerer Modelle von Tupelräumen, sonst null
• long lease gibt die Zeit an, wie lange ein Tupelraum verbleibt
in t
space.take(Entry e, Transaction tr, long lease) mit:
49
• Entry e als Template (Pattern)
• long lease als maximale Suspensionszeit
das Template kann auch Variablen/Wildcards in Form von null enthalten. Das Template matcht den
Eintrag im Space, falls:
1. das Template den gleichen Typ oder Obertyp wie der Eintrag hat
2. alle Felder des Templates matchen:
– Wildcard (null) matcht immer
– Werte matchen nur gleiche Werte
Das Matchen der Variablen erfolgt durch das Ersetzen von null durch den korrekten Wert.
rd(t)
space.read(...)
eval(stmt)
Java Threads
inp(t), rdp(t)
über lease=0
Beachte: Die Objektidentität bleibt in Space nicht erhalten. Objekte werden (de-)serialisiert, da es sich
häufig um verteilte Anwendungen handelt. Spaces können lokal oder im Netzwerk gestartet werden.
Sie können persistent sein. Sie können über Jini-lookup-Sevice oder RMI-Registrierung oder andere
gestartet werden.
8.2 Implementierung von Tupelräumen in Erlang
Um zu verstehen, wie Tupelräume realisiert werden können, betrachten wir eine prototypische Implementierung in Erlang. Erlang verfügt zwar auch über Pattern Matching, Pattern sind aber keine
Bürger erster Klasse und können nicht als Nachrichten verschickt werden. Es ist in Erlang aber möglich,
Funktionen zu verschicken. Somit können wir recht einfach partielle Funktionen verschicken, welche
das Pattern Matching im Tupelraum realisieren. Als Beispiel können wir das Pattern {hallo, X, Y}
durch die Funktionsdefinition
1
fun ( { h a l l o , X,Y) −> {X,Y} end
realisieren. Hierbei liefern wir die Bindungen als Paar der gebundenen Variablen zurück. Die Partialität
der Funktion verwenden wir um das Nicht-Matchen des Patterns auszudrücken. Im Server können wir
diesen Fall mit Hilfe eines catch-Konstrukts erkennen und entsprechend verfahren.
Im Tupel-Server benötigen wir zwei Strukturen (Listen) zur Speicherung von Anfragen. In der einen
Liste werden alle Werte im Tupelraum gespeichert. Neue in- bzw. rd-Requests werden zunächst gegen
alle Werte gemacht und ggf. direkt beantwortet. Sollte kein passender Wert gefunden werden, speichern
wir die Requests in einer zweiten Liste. Wird später dann ein neuer Wert eingefügt, wird dieser
entsprechend für alle offenen Requests überprüft und ggf. eine Antwort zurück geschickt.
Die konkrete Implementierung ergibt sich wie folgt:
Listing 36: Erlang Implementierung des Linda-Modells
1
2
−module( l i n d a ) .
−export ( [ s t a r t / 0 , out / 2 , i n / 2 , rd /2 ] ) .
3
4
5
s t a r t ( ) −> r e g i s t e r ( l i n d a , s e l f ( ) ) ,
lindaLoop ( [ ] , [ ] ) .
6
7
8
9
10
11
12
13
14
15
16
17
18
19
l i n d a L o o p ( Ts , Reqs ) −>
receive
{out , T} −> {Reqs1 , KeepT} = findMatchingReq (T, Reqs ) ,
case KeepT of
true −> l i n d a L o o p ( [T | Ts ] , Reqs1 ) ;
f a l s e −> l i n d a L o o p ( Ts , Reqs1 )
end ;
{InRd , F , P} −>
case f i n d M a t c h i n g T u p l e (F , Ts , InRd ) of
nothing
−> l i n d a L o o p ( Ts , [ {F , P , InRd} | Reqs ] ) ;
{ j u s t , {Match , Ts1}} −> P! {tupleMatch , Match} ,
l i n d a L o o p ( Ts1 , Reqs )
end
50
20
end .
21
22
23
24
25
26
27
28
29
30
31
32
33
findMatchingReq ( , [ ] ) −> { [ ] , true} ;
findMatchingReq (T, [ Req | Reqs ] ) −>
{F , P , InRd} = Req ,
case catch F(T) of
{ ’EXIT ’ , } −> {Reqs1 , KeepT} = findMatchingReq (T, Reqs ) ,
{ [ Req | Reqs1 ] , KeepT} ;
Match
−> P! {tupleMatch , Match} ,
case InRd of
i n −> {Reqs , f a l s e } ;
rd −> findMatchingReq (T, Reqs )
end
end .
34
35
36
37
38
39
40
41
42
43
44
45
46
f i n d M a t c h i n g T u p l e ( , [ ] , ) −> n o t h i n g ;
f i n d M a t c h i n g T u p l e (F , [T | Ts ] , InRd ) −>
case catch F(T) of
{ ’EXIT ’ , } −> case f i n d M a t c h i n g T u p l e (F , Ts , InRd ) of
nothing
−> n o t h i n g ;
{ j u s t , {M1, Ts1}} −> { j u s t , {M1, [T | Ts1 ] }}
end ;
Match
−> case InRd of
i n −> { j u s t , {Match , Ts}} ;
rd −> { j u s t , {Match , [T | Ts ] }}
end
end .
47
48
out ( Space , T) −> Space ! {out , T} .
49
50
51
52
53
i n ( Space , F) −> Space ! { in , F , s e l f ( ) } ,
receive
{tupleMatch , Match} −> Match
end .
54
55
56
57
58
rd ( Space , F) −> Space ! {rd , F , s e l f ( ) } ,
receive
{tupleMatch , Match} −> Match
end .
9 Spezifikation und Testen von Systemeigenschaften
Das Testen von Systemeigentschaften von nebenläufigen Systemen geht über Ansätze, wie Unit-Tests
oder Zusicherungen im Quellcode hinaus. Man ist in der Regel daran interessiert, globale Systemeigenschaften zu analysieren, die sich aus den Zuständen der einzelnen Prozesse zusammensetzen. Als
Beispiel können wir noch einmal auf das nebenläufige Verändern eines geteilten Zustands zurückkommen, hier als Client-Server-Struktur in Erlang:
Listing 37: Kritischer Bereich
1
2
−module ( c r i t i c a l ) .
−e x p o r t ( [ s t a r t / 0 ] ) .
3
4
5
6
s t a r t ( ) −> S = spawn ( fun ( ) −> s t o r e ( 4 2 ) end ) ,
spawn ( fun ( ) −> i n c ( S ) end ) ,
dec ( S ) .
7
8
9
10
11
s t o r e (V) −> r e c e i v e
{ lookup , P} −> P ! V, s t o r e (V ) ;
{ s e t , V1}
−> s t o r e (V1)
end .
12
13
i n c ( S ) −> S ! { lookup , s e l f ( ) } ,
51
14
15
16
17
receive
V −> S ! { s e t ,V+1}
end ,
inc (S ) .
18
19
20
21
22
23
dec ( S ) −> S ! { lookup , s e l f ( ) } ,
receive
V −> S ! { s e t , V−1}
end ,
dec ( S ) .
Hierbei stellen die Zustände, bevor der Zustand neu geschrieben wird, sicherlich kritische Zustände dar.
Eigentlich sollten beide Client-Prozesse nie gleichzeitig in diesen Zuständen sein, da dann der Speicher
in Abhängigkeit eines veralteten Wertes verändert wird. Solche Eigenschaften können elegant mit Hilfe
von Temporallogiken ausgedrückt werden.
10 Linear Time Logic (LTL)
Die Lineare temporale Logik (LTL) ermöglicht es, Eigenschaften von Pfaden eines Systems auszudrücken. Ihre Syntax ist wie folgt definiert:
Syntax der Linear Time Logic (LTL)
Sei Props eine (endliche) Menge von Zustandspropositionen. Die Menge der LTL-Formeln ist definiert
als kleinste Menge mit:
• Props ⊆ LT L
Zustandspropositionen
• ϕ, ψ ∈ LT L =⇒ −
−
−
−
¬ϕ ∈ LT L
ϕ ∧ ψ ∈ LT L
Xϕ ∈ LT L
ϕ U ψ ∈ LT L
Negation
Konjunktion
Im nächsten Zustand gilt ϕ
ϕ gilt bis ψ gilt
Eine LTL-Formel wird bzgl. eines Pfades interpretiert. Propositionen gelten, wenn der erste Zustand
des Pfades die Propositionen erfüllt. Der Modaloperator Next Xϕ ist erfüllt, wenn ϕ auf dem Pfad
ab dem nächsten Zustand gilt. Außerdem enthält LTL den Until-Operator, welcher erfüllt ist, wenn
ϕ solange gilt, bis ψ gilt. Hierbei mus ψ aber tatsächlich irgendwann gelten.
Formal ist die Semantik wie folgt definiert:
Pfadsemantik von LTL
Ein unendliches Wort über Propositionsmengen π = p0 p1 p2 . . . ∈ P(Props)ω heißt Pfad. Ein Pfad π
erfüllt eine LTL-Formel ϕ (π |= ϕ) wie folgt:
p0 π
π
π
p0 π
p0 p1 . . .
|=
|=
|=
|=
|=
P
¬ϕ
ϕ∧ψ
Xϕ
ϕU ψ
: ⇐⇒
: ⇐⇒
: ⇐⇒
: ⇐⇒
: ⇐⇒
P ∈ p0
π 6|= ϕ
π |= ϕ ∧ π |= ψ
π |= ϕ
∃i ≥ 0 : pi pi+1 . . . |= ψ ∧ ∀j < i : pj pj+1 . . . |= ϕ
Ausgehend vom Kern von LTL können wir praktische Abkürzungen definieren, welche es häufig einfacher machen, Eigenschaften zu spezifizieren.
Abkürzungen in LTL
52
f alse
true
ϕ∨ψ
ϕ→ψ
F ϕ
Gϕ
F ∞ϕ
G∞ ϕ
ϕWψ
ϕ Rψ
:=
:=
:=
:=
:=
:=
:=
:=
:=
:=
¬P ∧ P 2
¬f alse
¬(¬ϕ ∧ ¬ψ)
¬ϕ ∨ ψ
true U ϕ
¬F ¬ϕ
GF ϕ
F Gϕ
(ϕ U ψ) ∨ (G ϕ)
¬(¬ϕ U ¬ψ)
der Boolesche Wert false
der Boolesche Wert true
Disjunktion
Implikation
Irgendwann (Finally) gilt ϕ
Immer (Globaly) gilt ϕ
Unendlich oft gilt ϕ
Nur endlich oft gilt ϕ nicht
Schwaches Until (weak until)
ϕ lässt ψ frei (release)
Das Release ist hierbei sicherlich nicht ganz so verständlich, wird aber bei der Implementierung recht
nützlich sein.
Ausgehend von der Pfadsemantik, erfüllt ein System eine LTL-Formel, wenn jeder Pfad des Systems
die LTL-Formel erfüllt. Formal werden Systeme hierbei als Kripke-Strukturen repräsentiert:
Kripke-Struktur
K = (S, Props, −→, τ, s0 ), wobei
• S eine Menge von Zuständen,
• Props eine Menge von Propositionen,
• −→⊆ S × S die Transitionsrelation,
• τ : S −→ P(Props) eine Beschriftungsfunktion für die Zustände und
• s0 ∈ S der Startzustand sind,
heißt Kripke-Struktur. Anstatt (s, s0 ) ∈−→ schreibt man in der Regel s −→ s0 .
Ein Zustandspfad von K ist ein unendliches Wort s0 s1 . . . ∈ S ω mit si −→ si+1 und s0 der Startzustand
von K. Wenn s0 s1 . . . ein Zustandspfad von K und pi = τ (si ) für alle i ≥ 0, dann ist das unendliche
Wort p0 p1 . . . ∈ P(Props)ω ein Pfad von K.
Kripke-Struktur-Semantik von LTL
Sei K = (S, →, τ, s0 ) eine Kripke-Struktur. K erfüllt eine LTL-Formel ϕ (K |= ϕ) genau dann, wenn
für alle Pfade π von K: π |= ϕ.
Mit Hilfe von LTL können im Wesentlichen drei unterschiedliche Arten von Eigenschaften definiert
werden:
• Sicherheitseigenschaften (Safety):
Solche Eigenschaften haben in der Regel die Form: G ¬ ϕ
Ein mögliches Beispiel für ϕ kann für zwei Prozesse, für welche der wechselseitige Ausschluss
gewährleistet sein soll, cs1 ∧ cs2 sein, wenn der Prozess i ∈ {1, 2} seinen kritischen Bereich durch
die Proposition csi anzeigt.
• Lebendigkeitseigenschaften (Liveness):
Diese Eigenschaften haben in der Regel die Form: F ϕ, wobei sie oft auch nur bedingt gelten
sollen, z.B. in einer Implikation.
Ein Beispiel ist die Antwort (answ) auf einen Request (req) eines Servers: G (req −→ F answ).
Hierbei ist die eigentlich Lebendigkeitseigenschaft nur erwünscht, wenn eine Anfrage erfolgte.
Mit Hilfe von G fordern wir, dass diese Eigenschaft überall im System gelten soll.
• Fairness:
Fairness kann als F ∞ ϕ beschrieben werden. Hiermit können wir ausdrücken, dass jeder beteiligte
Prozess unendlich oft dran kommt und vom Scheduler nicht ausgeschlossen wird.
Man verwendet Fairnesseigenschaften in der Regel als Vorbedingung einer Implikation, um “unfaire” Pfade nicht zu betrachten. Dies entspricht der Beschränkung auf präemptive Scheduler,
welche wir einleitend in Betracht gezogen haben.
53
10.1 Implementierung von LTL zum Testen
LTL wurde ursprünglich als Spezifikationslogik entwickelt, welche dann mit Hilfe von Model Checking
gegenüber einer endlichen Kripke-Struktur (Systembeschreibung) überprüft werden kann. Das Model
Checking ist aber für reale Systeme im Allgemeinen unentscheidbar, wie folgendes Erlang Beispiel
zeigt:
Listing 38: Unentscheidbarkeit von LTL
1
s t a r t ( ) −> f ( ) , prop ( t e r m i n a t e d ) .
Hierbei soll prop(terminated) bedeuten, dass das Programm nachdem der Funktionsaufruf f() beendet wurde, in einen Zustand überführt wird, in dem die Proposition terminated gilt. Will man für
dieses Programm die LTL-Formel
F terminated
entscheiden, so müsste man ja entscheiden, ob die Funktion f() terminiert. Da Erlang ja eine universelle Programmiersprache ist, ist dies aber direkt ein Widerspruch zum Halteproblem und somit im
Allgemeinen unentscheidbar.
Auch wenn die formale Verifikation also nicht möglich ist, kann es aber dennoch sinnvoll sein, LTLEigenschaften zu testen. Inwieweit die sich ergebenden Aussagen zu interpretieren sind, werden wir
später noch besprechen. Zunächst wollen wir den Ansatz zum LTL-Testen implementieren.
Als ersten Schritt müssen wir uns eine geeignete Darstellung von LTL-Formeln in Erlang überlegen.
Wir beschränken uns hier auf einen Teil der LTL-Operatoren. Die restlichen Operatoren sollen als
Übung ergänzt werden. Bei der Negation des Untils ist die Verwendung des Release hilfreich, da dies
gerade als “Negation” des Until definiert ist.
Listing 39: LTL-Implementierung in Erlang
1
−module ( l t l ) .
2
3
−e x p o r t ( [ prop / 1 , d i s j / 2 , c o n j / 2 , neg / 1 , x / 1 , f / 1 , g / 1 ] ) .
4
5
6
7
8
9
10
11
prop ( Phi ) −> { prop , Phi } .
neg ( Phi ) −> { neg , Phi } .
d i s j ( Phi , P s i ) −> { d i s j , Phi , P s i } .
c o n j ( Phi , P s i ) −> { c o n j , Phi , P s i } .
x ( Phi ) −> {x , Phi } .
g ( Phi ) −> {g , Phi } .
f ( Phi ) −> { f , Phi } .
12
13
14
15
16
17
18
19
20
showLTL ( { prop , P} ) −> b a s e : show (P ) ;
showLTL ( { d i s j , Phi , P s i } ) −> ” ( ”++showLTL ( Phi)++” o r ”++showLTL ( P s i)++” ) ” ;
showLTL ( { c o n j , Phi , P s i } ) −> ” ( ”++showLTL ( Phi)++”and”++showLTL ( P s i)++” ) ” ;
showLTL ( { neg , Phi } ) −> ” ( neg ”++showLTL ( Phi)++” ) ” ;
showLTL ( { x , Phi } ) −> ”X”++showLTL ( Phi ) ;
showLTL ( { f , Phi } ) −> ”F”++showLTL ( Phi ) ;
showLTL ( { g , Phi } ) −> ”G”++showLTL ( Phi ) ;
showLTL ( Phi ) −> b a s e : show ( Phi ) .
Das erste Problem bei der Implementierung stellt die Negation dar. Diese ist über die Negation auf
der mathematischen Meta-Ebene definiert und so nur schwer zu implementieren. Es ist aber möglich,
LTL-Formeln so äquivalent umzuformen, dass die Negation nur noch direkt vor Propositionen auftritt.
Die Negation wird vom Prinzip nach Innen geschoben.
Listing 40: LTL-Implementierung in Erlang
1
2
3
4
5
6
7
8
n o r m a l i z e ( true ) −> true ;
n o r m a l i z e ( f a l s e ) −> f a l s e ;
n o r m a l i z e ( {prop , P} ) −> prop (P ) ;
n o r m a l i z e ( { d i s j , Phi , P s i } ) −> d i s j ( n o r m a l i z e ( Phi ) , n o r m a l i z e ( P s i ) ) ;
n o r m a l i z e ( { c o n j , Phi , P s i } ) −> c o n j ( n o r m a l i z e ( Phi ) , n o r m a l i z e ( P s i ) ) ;
n o r m a l i z e ( {x , Phi} ) −> x ( n o r m a l i z e ( Phi ) ) ;
n o r m a l i z e ( { f , Phi} ) −> f ( n o r m a l i z e ( Phi ) ) ;
n o r m a l i z e ( {g , Phi} ) −> g ( n o r m a l i z e ( Phi ) ) ;
54
9
10
11
12
13
14
15
16
n o r m a l i z e ( {neg , true} ) −> f a l s e ;
n o r m a l i z e ( {neg , f a l s e } ) −> true ;
n o r m a l i z e ( {neg , {prop , P}} ) −> neg ( prop (P ) ) ;
n o r m a l i z e ( {neg , { d i s j , Phi , P s i }} ) −> c o n j ( n o r m a l i z e ( neg ( Phi ) ) , n o r m a l i z e ( neg ( P s i ) ) ) ;
n o r m a l i z e ( {neg , { c o n j , Phi , P s i }} ) −> d i s j ( n o r m a l i z e ( neg ( Phi ) ) , n o r m a l i z e ( neg ( P s i ) ) ) ;
n o r m a l i z e ( {neg , {x , Phi}} ) −> x ( n o r m a l i z e ( neg ( Phi ) ) ) ;
n o r m a l i z e ( {neg , { f , Phi}} ) −> g ( n o r m a l i z e ( neg ( Phi ) ) ) ;
n o r m a l i z e ( {neg , {g , Phi}} ) −> f ( n o r m a l i z e ( neg ( Phi} ) ) .
Als Nächstes stellen wir eine Funktion zur Verfügung, welche anhand der gültigen Propositionen
Formeln auf ihre Gültigkeit im aktuellen Zustand überprüft. Mögliche Ergebnisse sind hierbei true,
f alse und noch nicht entschieden. Im letzten Fall erhalten wir eine Formel, welche wir auf dem weiteren
Pfad überprüfen müssen.
Listing 41: LTL-Implementierung in Erlang
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
c h e ck ( true , P r o p s ) −> true ;
c h e ck ( f a l s e , P r o p s ) −> f a l s e ;
c h e ck ( {prop , P} , Props ) −>
l i s t s : member (P , Props ) ;
c h e ck ( {neg , {prop , P}} , Props ) −> not ( check ( prop (P) , Props ) ) ;
c h e ck ( { c o n j , Phi , P s i } , Props ) −>
case check ( Phi , Props ) of
true −> check ( Psi , Props ) ;
f a l s e −> f a l s e ;
Phi1 −> case check ( Psi , Props ) of
true −> Phi1 ;
f a l s e −> f a l s e ;
P s i 1 −> c o n j ( Phi1 , P s i 1 )
end
end ;
c h e ck ( { d i s j , Phi , P s i } , Props ) −>
case check ( Phi , Props ) of
true −> true ;
f a l s e −> check ( Psi , Props ) ;
Phi1 −> case check ( Psi , Props ) of
true −> true ;
f a l s e −> Phi1 ;
P s i 1 −> d i s j ( Phi1 , P s i 1 )
end
end ;
c h e ck ( {x , Phi} , P r o p s ) −> x ( Phi ) ;
c h e ck ( { f , Phi} , Props ) −> check ( d i s j ( Phi , x ( f ( Phi ) ) ) , Props ) ;
c h e ck ( {g , Phi} , Props ) −> check ( c o n j ( Phi , x ( g ( Phi ) ) ) , Props ) ;
c h e ck ( Phi , P r o p s ) −>
b a s e : putStrLn ( ” Unexpected f o r m u l a i n check : ”++showLTL ( Phi ) ) .
Auf Grund der hier verwendeten dreiwertigen Logik, können wir Konjunktion und Disjunktion nicht
auf die entsprechenden Booleschen Operationen zurückführen, sondern müssen sie selbst implementieren. Die Formel X ϕ können wir nicht entscheiden und geben sie unverändert zurück.
Bei den Formeln F ϕ und G ϕ wickeln wir den Fixpunkt, über den diese Formeln definiert sind, ab.
Dies ist auf Grund der folgenden Äquivalenzen möglich:
π |= F ϕ gdw. π |= ϕ ∨ X F ϕ
π |= G ϕ gdw. π |= ϕ ∧ X G ϕ
Beachte, dass die sich ergebenden Formeln nur true, f alse, X ϕ, Disjunktionen oder Konjunktionen
sein können. F , G und prop sind nur unterhalb eines X möglich.
Für den Fall, dass das System einen Schritt macht, können wir nun in den Formeln das X realisieren:
Listing 42: LTL-Implementierung in Erlang
1
s t e p ( {x , Phi} ) −> Phi ;
55
2
3
4
s t e p ( { c o n j , Phi , P s i } ) −> c o n j ( s t e p ( Phi ) , s t e p ( P s i ) ) ;
s t e p ( { d i s j , Phi , P s i } ) −> d i s j ( s t e p ( Phi ) , s t e p ( P s i ) ) ;
s t e p ( Phi ) −> b a s e : putStrLn ( ” Unexpected f o r m u l a i n s t e p : ”++showLTL ( Phi ) ) .
Die letzte Regel sollte eigentlich nicht auftreten und dient nur dem Erkennen von Programmfehlern.
Nun können wir den Server, welcher für das Verwalten und Überprüfen der LTL-Formeln zuständig
ist, implementieren. Er wird global unter dem Atom ltl registriert, damit seine Pid nicht in jeden
Prozess propagiert werden muss und Anwendungen somit einfacher erweitert werden können.
Als Zustand speichern wir in diesem Prozess eine Liste von zur Zeit gültigen Propositionen, sowie die
Formeln, welche in den weiteren Zuständen noch überprüft werden müssen. Außerdem speichern wir
die ursprünglichen mit assert hinzugefügten Formeln, um diese im Fall einer gefundenen Verletzung
ausgeben zu können. Alternativ könnte man hier sicherlich auch einen String, der die Eigenschaft
geeignet benennt hinterlegen.
Bei den Propositionen ermöglichen wir es den Prozessen beliebige Propositionen zu setzen bzw. zu
löschen. Zur Vereinfachung vermeiden wir doppelte Vorkommen, da wir die Propositionsmenge als
einfache Liste implementieren. Zur einfacheren Kommunikation mit dem LTL-Server, stellen wir geeignete Schnittstellen-Funktionen zur Verfügung.
Erkennen wir die Verletzung einer Assertion, geben wir eine Warnmeldung aus. Diese sollte besser
in eine spezielle Datei geschrieben werden, da sie sonst in anderen Ein-/Ausgaben des eigentlichen
Programms untergeht.
Listing 43: LTL-Implementierung in Erlang
1
2
s t a r t ( ) −> LTL = spawn( fun ( ) −> l t l ( [ ] , [ ] , [ ] ) end ) ,
r e g i s t e r ( l t l , LTL ) .
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
l t l ( Phis , A s s e r t s , Props ) −>
receive
{ a s s e r t , Phi} −> case check ( n o r m a l i z e ( Phi ) , Props ) of
true −> l t l ( Phis , A s s e r t s , Props ) ;
f a l s e −> ptong ( Phi ) ,
l t l ( Phis , A s s e r t s , Props ) ;
Phi1 −> l t l ( [ Phi1 | P h i s ] , [ Phi | A s s e r t s ] , Props )
end ;
{newProp , P} −>
NewProps=[ P | Props ] ,
P h i s 1 = l i s t s : map( fun ( Phi ) −> check ( s t e p ( Phi ) , NewProps ) end , P h i s ) ,
{Phis2 , A s s e r t s 2 } = a n a l y z e ( Phis1 , A s s e r t s ) ,
l t l ( Phis2 , A s s e r t s 2 , NewProps ) ;
{ r e l e a s e P r o p , P} −>
NewProps= l i s t s : d e l e t e (P , Props ) ,
P h i s 1 = l i s t s : map( fun ( Phi ) −> check ( s t e p ( Phi ) , NewProps ) end , P h i s ) ,
{Phis2 , A s s e r t s 2 } = a n a l y z e ( Phis1 , A s s e r t s ) ,
l t l ( Phis2 , A s s e r t s 2 , NewProps ) ;
s t a t u s −>
b a s e : putStrLn ( ” Unevaluated A s s e r t i o n s : ” ) ,
l i s t s : z i p w i t h ( fun ( Phi , A s s e r t ) −>
b a s e : putStrLn ( showLTL ( A s s e r t ) ) ,
b a s e : putStrLn ( ”
”++showLTL ( Phi ) ) end ,
Phis , A s s e r t s ) ,
l t l ( Phis , A s s e r t s , Props )
end .
30
31
32
33
34
a s s e r t ( Phi ) −> l t l ! { a s s e r t , Phi} .
newProp (P) −> l t l ! {newProp , P} .
r e l e a s e P r o p (P) −> l t l ! { r e l e a s e P r o p , P} .
s t a t u s ( ) −> l t l ! s t a t u s .
35
36
37
38
39
40
a n a l y z e ( [ ] , [ ] ) −> { [ ] , [ ] } ;
a n a l y z e ( [ true | P h i s ] , [ A s s e r t | A s s e r t s ] ) −>
a n a l y z e ( Phis , A s s e r t s ) ;
a n a l y z e ( [ f a l s e | P h i s ] , [ A s s e r t | A s s e r t s ] ) −>
ptong ( A s s e r t ) ,
56
41
42
43
44
a n a l y z e ( Phis , A s s e r t s ) ;
a n a l y z e ( [ Phi | P h i s ] , [ A s s e r t | A s s e r t s ] ) −>
{Phis1 , A s s e r t s 1 } = a n a l y z e ( Phis , A s s e r t s ) ,
{ [ Phi | P h i s 1 ] , [ A s s e r t | A s s e r t s 1 ] } .
45
46
47
ptong ( Phi ) −> b a s e : putStrLn ( ” A s s e r t i o n v i o l a t e d : ”++showLTL ( Phi ) ) ,
e x i t ( −1).
Die Funktion analyze löscht erfüllte und widerlegte Formeln aus unseren Listenstrukturen. Im Fehlerfall gibt sie außerdem eine entsprechende Fehlermeldung mittels ptong/1 aus. Abschließend geben wir
dem Benutzer mit der Funktion status noch die Möglichkeit alle noch nicht entschiedenen Formeln
auszugeben.
Als Beispiel für die Benutzung der LTL-Bibliothek betrachten wir den wechselseitigen Ausschluss
zweier Prozesse. Auf Grund des Kommunikationsmodells von Erlang ist es nicht so einfach, überhaupt
einen kritischen Bereich zu definieren, in dem sich zwei Prozesse befinden. Wir verwenden hierzu einen
Speicher(-Prozess), auf dem zwei Prozesse nebenläufig Veränderungen vornehmen:
Listing 44: Beispiel zur Benutzung der LTL-Bibliothek
1
2
−module( c r i t i c a l ) .
−export ( [ s t a r t /0 ] ) .
3
4
5
6
7
8
s t a r t ( ) −> t r y l t l : s t a r t ( ) catch
−> 42 end , \% F a l l s LTL−S e r v e r schon l a e u f t
l t l : a s s e r t ( l t l : g ( l t l : neg ( l t l : c o n j ( l t l : prop ( c s 1 ) , l t l : prop ( c s 2 ) ) ) ) ) ,
S = spawn( fun ( ) −> s t o r e ( 4 2 ) end ) ,
spawn( fun ( ) −> i n c ( S ) end ) ,
dec ( S ) .
9
10
11
12
13
s t o r e (V) −> receive
{ lookup , P} −> P!V, s t o r e (V ) ;
{ s e t , V1}
−> s t o r e (V1)
end .
14
15
16
17
18
19
20
i n c ( S ) −> S ! { lookup , s e l f ( ) } ,
receive
V −> l t l : newProp ( c s 1 ) , S ! { s e t ,V+1}
end ,
l t l : releaseProp ( cs1 ) ,
inc (S ) .
21
22
23
24
25
26
27
dec ( S ) −> S ! { lookup , s e l f ( ) } ,
receive
V −> l t l : newProp ( c s 2 ) , S ! { s e t , V−1}
end ,
l t l : releaseProp ( cs2 ) ,
dec ( S ) .
In Abhängigkeit des Schedulings wird die angegebene Eigenschaft nach ein paar Schritten widerlegt.
Bei der bisherigen Implementierung haben wir LTL zum Testen von Systemen eingesetzt. Hierbei
ergibt sich allerdings das Problem, dass einige Formeln nie als falsch (z. B. F ϕ) bzw. andere nie
als korrekt (z. B. Gϕ) bewiesen werden können. Besonders problematisch sind somit Eigenschaften,
welche F ∞ oder G∞ verwenden. Sie können durch Testen weder widerlegt noch gezeigt werden.
Eine Möglichkeit hiermit umzugehen, ist es z. B. obere Schranken einzubauen, nach wie vielen Schritten
(Abwicklungen) bzw. für wie viele Schritte ein F oder G gelten soll. Außerdem kann es sinnvoll sein,
zu protokollieren, wie oft F bzw. G bereits abgewickelt wurden, um einen Eindruck zu bekommen, wie
häufig die Bedingung einer Formel schon getestet wurde. Entsprechende Implementierungen werden
in den Übungen besprochen.
10.2 Verifikation
Ursprünglich wurde LTL aber zur formalen Verifikation entwickelt. Betrachtet man endliche KripkeStrukturen ist es nämlich sehr wohl möglich, Eigenschaften zu beweisen bzw. zu widerlegen: LTL ist
57
für endliche Kripke-Strukturen entscheidbar. Das Verfahren, mit dem dieses entschieden wird, nennt
man Model Checking, näheres hierzu findet man in entsprechenden Theorievorlesungen.
Beim Model Checking geht man in der Regel davon aus, dasss man ein System (insb. sein Protokoll)
mit einer (wenig ausdrucksstarken) Spezifikationsprache beschreibt. Deren Semantik definiert eine
endliche Kripke-Struktur, welche dann zur Überprüfung von Eigenschaften (LTL) verwendet werden
kann. Ein entsprechendes Tool zur Spezifikation/Verifikation mittels Model Checking ist z.B. Spin für
die Sprache Promela und LTL. Ähnliche Tools existieren für Prozessalgebren, wie den Calculus of
Communicating Systems (CCS) und modale Logiken (CTL, CTL∗ oder den modalen µ-Kalkül).
Hierbei ist die Idee, dass man zunächst die Spezifikation erstellt und gegen spezifizierte Eigenschaften
verifiziert. Danach wird diese dann schrittweise zum tatsächlichen System verfeinert. Als Konsequenz
wird die Kommunikation dann aber in der Regel nur wenig von konkreten Werten abhängen dürfen, da
dies auf Ebene der Spezifikation nicht ausgedrückt werden kann. Spezifikationssprachen bieten in der
Regel nur einen sehr eingeschränkten Wertebereich. Datenstrukturen stehen gar nicht zur Verfügung,
da sonst die Semantik keine endliche/kleine Kripke-Struktur bilden würde.
Ein andere möglicher Ansatz zur formalen Verifikation ist der umgekehrte Weg. Ausgehend von einem
existierenden System, versucht man durch Abstraktion eine endliche Kripke-Struktur zu generieren.
Die Idee ist hierbei, dass sämtliche Pfade des echten Systems auch in dem abstrahierten endlichen
System enthalten sind. Hierzu kann man das Programm anstatt über konkreten Werten über abstrakten Werten ausführen. Bei Verzweigungen, welche dann über den abstrakten Werten nicht mehr
entschieden werden können, muss man dann, um tatsächlich alle möglichen Pfade zu repräsentieren,
zusätzlichen Nichtdeterminismus verwenden.
Da LTL ja auf allen Pfaden eines Systems gelten muss und die Abstraktion eine endliche KripkeStruktur liefern kann, welche alle Pfade der tatsächlichen Semantik enthält, können wir LTL-Formeln
mittels Model Checking entscheiden. Hierbei müssen wir aber beachten, dass für den Fall, dass eine
LTL-Eigenschaft nicht gilt, zwei mögliche Fälle berücksichtigt werden müssen:
• Der Pfad, welcher ein Gegenbeispiel darstellt, existiert auch in der konkreten Semantik und zeigt
tatsächlich einen Fehler an.
• Der Pfad existiert in der tatsächlichen Semantik nicht, da eine Programmverzweigung auf Grund
der Abstraktion nicht korrekt entschieden wurde. In diesem Fall kann man durch ein Verfeinerung
der Abstraktion versuchen, diese Verzweigung korrekt zu entscheiden, was aber in der Regel zu
einer Vergrößerung des Zustandsraums der abstrakten, endlichen Kripke-Struktur führen wird.
In der Regel wird dieser Ansatz auch genau in den Fällen erfolgreich sein, in denen auch eine formale
Spezifikation möglich gewesen wäre, bei der genau die entsprechenden abstrakten Werte verwendet
worden wären, d.h. in Fällen, in denen das Systemverhalten, insbesondere die Kommunikation, wenig
von konkreten Werten abhängen.
10.3 Simulation einer Turingmaschine
Die formale Verifikation von Programmen ist schwierig, da fast alle interessanten Programmeigenschaften unentscheidbar sind. Dies gilt bereits für sequentielle Programme, welche mit mehr als zwei
Zahlenwerten rechnen können oder dynamische Datenstrukturen verwenden können.
In diesem Kapitel wollen wir untersuchen, in wie weit auch die Erweiterung einer Programmiersprache um nebenläufige Konzepte die Entscheidung interessanter Eigenschaften, wie z.B. der DeadlockFreiheit, unentscheidbar macht. Wir simulieren hierzu allein unter Verwendung einer endlichen Anzahl
von Atomen und nur drei Prozessen eine Turingmaschine.
Die Idee der Implementierung beruht auf der Darstellung des Bandes der Turingmaschine mit Hilfe
zweier Stacks. Beim bewegen des Kontrollkopfes, wird der oberste Wert des einen Stacks auf den
anderen Stack verschoben. Die Stacks werden jeweils durch einen Thread implementiert.
Ein Thread, welcher die Datenstruktur eines Stacks implementiert, kann wie folgt konstruiert werden:
Listing 45: Stack
1
2
3
4
5
s t a c k (P) −> r e c e i v e
pop −> pop ;
X
−> s t a c k (P) ,
P ! X,
s t a c k (P)
58
6
end .
Falls der Stack einen pop Nachricht erhält, “terminiert” er, d.h. der darunter liegende Wert wird als
Nachricht an einen Kontrollprozess verschickt. Alle anderen Nachrichten werden als push interpretiert
und mit Hilfe des Laufzeitkellers gespeichert.
Die Funktione push und pop können dann einfach wie folgt definiert werden:
Listing 46: Stackoperationen
1
push ( St ,V) −> St !V.
2
3
4
5
6
pop ( St ) −> St ! pop ,
receive
X −> X
end .
Bei der Verwendung eines Stacks zur Simulation einer Turingmaschine ist aber noch wichtig, dass auch
über einen leeren Keller hinaus gelesen werden kann. Das Band enthält hier ja beliebig viele Blanks.
Diese Blanks können aber einfach beim pop auf einem leeren Stack geliefert werden:
Listing 47: Blank Stack
1
2
3
b l a n k S t a c k (P) −> s t a c k (P) ,
P ! blank ,
b l a n k S t a c k (P ) .
Nun kann eine Turingmaschine einfach wie folgt definiert werden: Sei M = hQ, Γ, δ, q0 , F i eine deterministische Turingmaschine mit der Übergangsfunktion δ : Q \ F × Γ −→ Q × Γ × {l, r}. Dann kann
der Kontrollprozess wie folgt als Erlangfunktion ausgedrückt werden:
Für alle q ∈ F , verwende Regeln der Form:
delta(SL,SR,q,_) -> pop.
Für alle q ∈ Q\F mit δ(q, a) = (p, b, r):
delta(SL,SR,q,a) -> push(SL,b),
A = pop(SR),
delta(SL,SR,p,A).
und für alle q ∈ Q \ F mit δ(q, a) = (p, b, l):
delta(SL,SR,q,a) -> push(SR,b),
A = pop(SL),
delta(SL,SR,p,A).
Die Turingmaschine kann dann wie folgt gestartet werden:
start() -> SL=spawn(blankStack,[self()]),
SR=spawn(blankStack,[self()]),
writeInputToStack(SR),
A = pop(SR),
delta(SL,SR,q0 ,A),
outputStack(SR).
Man beachte, dass diese Turingmaschinen-Simulation nur sehr wenige Werte verwendet. Außer dem
Bandalphabet Γ und der Zustandsmenge Q, sind dies das Atom pop und die Pid des Hauptprozesses.
Außerdem werden drei Threads verwendet. Es ist sogar möglich, auf noch einen weiteren Thread zu
verzichten. Dies ist jedoch technisch aufwändiger (Übung).
Entsprechend kann man auch mit endrekursiven Erlangprogrammen, die nur endlich viele Werte verwenden, aber beliebig viele Prozesse erlauben, eine Turingmaschine simulieren (Übung). Eine weitere
Möglichkeit ist die Verwendung eines einzelnen Prozesses und beliebig vieler Werte in der Mailbox,
bzw. dynamischen Datenstrukturen.
Durch die Simulation der Turingmaschine können wir zeigen, dass das Model Checking Problem für
die jeweils betrachteten Systeme im Allgemeinen unentscheidbar ist.
59
11 Transaktionsbasierte Kommunikation
In diesem Kapitel werden wir uns noch einmal mit den Nachteilen der lockkbasierten Kommunikation
beschäftigen und einen alternativen lockfreien Ansatz kennen lernen. Da der Ansatz besonders vielversprechend in Haskell implementiert wurde, betrachten wir in diesem Kapitel zunächst insbesondere
Haskell.
11.1 Ein Bankbeispiel in Concurrent Haskell
Aufgabe: Modellierung von Konten und zugehörigen Operationen auf den Konten.
Idee: Repräsentation eines Kontos als MVar Int mit folgenden Operationen:
1
2
3
4
withdraw : : MVar I n t −> I n t −> IO ( )
withdraw a c c amount = do
b a l a n c e <− takeMVar a c c
putMVar a c c ( b a l a n c e − amount )
takeMVar lockt quasi die MVar und putMvar gibt sie wieder frei.
1
2
3
1
2
1
2
3
4
5
6
7
8
d e p o s i t : : MVar I n t −> I n t −> IO ( )
d e p o s i t a c c amount =
withdraw a c c (−amount )
b a l a n c e : : MVar I n t −> IO I n t
b a l a n c e a c c = readMVar a c c
limitedWithdraw : : MVar I n t −> I n t −> IO Bool
limitedWithdraw a c c amount = do
b <− b a l a n c e a c c
i f b >= amount
then do
withdraw a c c amount
return True
e l s e return F a l s e
Problem: Die atomare Ausführung ist nicht garantiert. Nach der balance-Abfrage kann ein anderer
Thread Geld abheben ⇒ Kontostand ist zwar immer richtig, kann aber negativ werden.
Lösung: Lock müsste über gesamte limitedWithdraw Aktion erhalten werden ⇒ keine Wiederverwendung von withdraw möglich.
In Java werden alle Account-Methoden als synchronized gekennzeichnet, so dass eine Wiederverwendung möglich ist (mehrfaches Nehmen des gleichen Locks ist möglich).
Weitere Operation: sicherer Transfer
1
2
3
4
5
6
7
8
9
10
11
12
l i m i t e d T r a n s f e r : : MVar I n t −> Mvar I n t −> I n t −> IO Bool
l i m i t e d T r a n s f e r from t o amount = do
b <− takeMVar from
i f b >= amount
then do
b <− takeMVar t o
putMVar from ( b − amount )
putMVar t o ( b + amount )
return True
e l s e do
putMVar from b
return F a l s e
Problem: Falls gleichzeitig von A nach B und von B nach A überwiesen wird, landen wir im Deadlock.
Hier hilft auch in Java das synchronized nicht, da unterschiedliche Locks/Objekte beteiligt sind.
Listing 48: Lösung
1
l i m i t e d T r a n s f e r from t o amount = do
60
2
3
4
5
6
7
8
9
10
11
b <− takeMVar from
i f b >= amount
then do
putMVar from ( b − amount )
b <− takeMVar t o
putMVar t o ( b + amount )
return True
e l s e do
putMVar from b
return F a l s e
Wobei dies zu einem inkonsistenten Zwischenzustand führt.
Eine andere Lösung wäre alle Locks zu Beginn der Aktion gemäß einer globalen Ordnung zu nehmen
(zuerst kleine und dann große Locks nehmen, also immer erst A locken und dann B).
1
2
3
4
5
6
7
i f from <
then do
b1 <−
b2 <−
e l s e do
b2 <−
b1 <−
to
takeMVar from
takeMvar t o
takeMVar t o
takeMVar from
Während der gesamten Aktion sollten dann keine weiteren Locks erlaubt sein ⇒ keine Kompositionalität.
11.1.1 Gefahren bei der Programmierung mit Locks
• Verwendung von zu wenig Locks ⇒ Inkonsistenzen
• Verwendung von zu vielen Locks ⇒ übermäßige Sequentialisierung (im besten Fall) und Deadlocks (im schlechtesten Fall)
• Verwendung falscher Locks, da oft der Zusammenhang zwischen Lock und zu sichernden Daten
fehlt (bei Verwendung zusätzlicher Locks/Lockobjekte in Java) ⇒ Konsequenzen wie zuvor
• Nehmen von Locks in falscher Reihenfolge (Race Condition) ⇒ Deadlock
• Robustheit, da beim Absturz von Teilkomponenten Locks eventuell nicht mehr freigegeben werden und inkonsistente Systemzustände zurück bleiben (z. B. Geld wird vernichtet)
• Vergessen der Lockfreigabe ⇒ Deadlock
11.2 Transaktionsbasierte Kommunikation (in Concurrent Haskell als anderen
Ansatz)
Transaktionen sind ein bekanntes Konzept bei Datenbanken zur atomaren Ausführung komplexer
Datenbankanfragen und -modifikationen. Diese Idee kann auch zur nebenläufigen Programmierung
verwendet werden.
Die modifizierbaren Variablen entsprechen der Datenbank und die Operationen entsprechen der Transaktion.
Transaktionen beeinflussen die Welt nur, wenn sie erfolgreich waren. Sie werden in einer eigenen Monade ST M (Software Transactional Memory) definiert.
Das atomare Ausführen einer Transaktion in der Welt geschieht mittels einer Funktion
atomically :: STM a -> IO a. Im Gegensatz zu Datenbanken wird eine ST M immer wieder ausgeführt, bis sie erfolgreich war.
Zur Kommunikation (anstelle der MVars) dienen spezielle Transaktionsvariablen:
1
data TVar a −− abstract
2
3
4
newTVar
readTVar
:: a
: : TVar a
−> STM ( TVar a )
−> STM a
61
5
writeTVar : : TVar a −> a −> STM ( )
Beachte: Im Gegensatz zu MVars sind TVars niemals leer. Es gibt keinen Lock!
Listing 49: Bankbeispiel
1
ty pe Account = TVar I n t
2
3
4
b a l a n c e : : Account −> STM I n t
b a l a n c e a c c = readTVar a c c
5
6
7
8
9
1
2
3
1
2
3
4
5
6
7
8
withdraw : : Account −> I n t −> STM ( )
withdraw a c c amount = do
b <− b a l a n c e a c c
writeTVar a c c ( b − amount )
d e p o s i t : : Account −> I n t −> STM( )
d e p o s i t a c c amount =
withdraw a c c (−amount )
limitedWithdraw : : Account −> I n t −> STM Bool
limitedWithdraw a c c amount = do
b <− b a l a n c e a c c
i f b >= amount
then do
withdraw a c c amount
return True
e l s e return F a l s e
Fassen wir nun noch einmal das STM-Interface von Haskell zusammen:
Atomare Ausführung einer Transaktion: atomically :: STM a -> IO a
TVars als Kommunikationsabstraktion, welche im Rahmen von Transaktionen verändert werden können:
data TVar a ist abstrakt
Erstellen, Lesen und Verändern von TVar-Inhalten:
newTVar
:: a
-> STM (TVar a)
readTVar :: TVar a
-> STM a
writeTVar :: TVar a -> a -> STM ()
11.2.1 Beispielprogramm
Listing 50: Beispielprogramm
1
2
3
4
5
6
7
8
9
10
11
12
main : : IO ( )
main = do
a c c 1 <− a t o m i c a l l y ( newTVar 1 0 0 )
a c c 2 <− a t o m i c a l l y ( newTVar 1 0 0 )
f o r k I O ( do
b <− a t o m i c a l l y ( limitedWithdraw a c c 1 6 0 )
i f b then return ( )
e l s e putStrLn ”Du b i s t p l e i t e ! ”
)
a t o m i c a l l y ( t r a n s f e r acc1 acc2 50)
b <− a t o m i c a l l y ( b a l a n c e a c c 1 )
print b
62
11.3 Synchronisation mit Transaktionen
Als Nächstes wollen wir untersuchen, in wie weit Transaktionen auch geeignet sind, Synchronisationsprobleme zu lösen. Hierzu betrachten wir wieder die dinierenden Philosophen und versuchen eine
STM-basierte Implementierung zu entwickeln.
Zunächst müssen wir uns überlegen, wie die Stäbchen repräsentiert werden können. Da TVars (im
Gegensatz zu MVars) nicht leer sein können, benötigen wir eine TVar, welche boolesche Werte aufnehmen kann. Hierbei bedeutet der Wert True, dasss der Stab verfügbar ist und False, dass er bereits
vergeben ist.
Listing 51: Dinierende Philosophen
1
ty pe S t i c k = TVar Bool
// True = l i e g t a u f dem T i s c h ( i n i t i a l )
Als Nächstes können wir versuchen Funktionen zum Aufnehmen bzw. Zurücklegen des Stäbchens zu
definieren. Während putStick sehr einfach definiert werden kann, erreichen wir bei der Definition von
takeStick einen Punkt, an dem wir nicht weiter wissen:
Listing 52: Dinierende Philosophen
1
2
3
p u t S t i c k : : S t i c k −> STM ( )
p u t S t i c k s = do
writeTVar s True
4
5
6
7
8
9
10
t a k e S t i c k : : S t i c k −> STM ( )
t a k e S t i c k s = do
b <− readTVar s
i f b then do
writeTVar s F a l s e
e l s e ???
// T r a n s a k t i o n e r n e u t v e r s u c h e n
Falls takeStick ausgeführt wird, wenn die TVar den Wert False enthält, kann die gesamte Transaktion
nicht mehr erfolgreich werden. An diesem Punkt hätte der lockbasierte Ansatz gewartet (suspendiert),
bis ein anderer Prozess den Stab zurück legt.
Denkt man in Transaktionen erkennen wir als Anwender der STM-Bibliothek an dieser Stelle, dass
die Transaktion so insgesamt nicht erfolgreich ist, was wir durch Aufruf der Funktion
retry ::
STM ()
realisieren können. Bei Aufruf von retry wird die Transaktion erfolglos abgebrochen und erneut gestartet. Inwieweit hierbei tatsächlich Busy-Waiting notwendig ist, werden wir später klären. Zunächst
besagt die Ausführung von retry, dass die Transaktion an dieser Stelle fehlgeschlagen ist:
Listing 53: Dinierende Philosophen
1
2
3
4
5
6
t a k e S t i c k : : S t i c k −> STM( )
t a k e S t i c k s = do
b <− readTVar s
i f b then do
writeTVar s F a l s e
else retry
// T r a n s a k t i o n e r n e u t v e r s u c h e n
Nun müssen wir nur noch die Philosophen hinzufügen:
Listing 54: Dinierende Philosophen
1
2
3
4
5
6
7
8
9
10
p h i l : : S t i c k −> S t i c k −> IO ( )
p h i l l r = do
// denken
atomically ( takeStick l )
atomically ( takeStick r )
// E a t i n g
a t o m i c a l l y ( do
putStick l
putStick r )
phil l r
63
Die einzelnen Transaktionen werden mittels atomically ausgeführt. Hierbei fügen wir beide putStick
Aktionen zusammen aus. Unter Hinzunahme einer geeigneten Startfunktion für Sticks und Philosophen, läuft das Programm schon sehr schnell nach seinem Start in einen Deadlock. Diesesmal können
wir den Deadlock aber sehr viel einfacher vermeiden, als zuvor. Wir müssen einfach nur fordern, dass
beide Stäbchen atomar, also innerhalb einer Transaktion, genommen werden. Hierzu komponieren wir
beide sequentiell auf STM-Ebene und verwenden nur noch einen atomically Aufruf:
// Think
atomically(do
takeStick l
takeStick r)
Nun läuft das Programm nicht mehr in die Deadlock-Situation. Ein Philosoph versucht, atomar beide
Stäbchen zu nehmen. Ist dies nicht möglich, wird die gesamte Transaktion neu gestartet. Der Zustand,
in dem also das linke Stäbchen aufgenommen wurde, aber das zweit nicht verfügbar ist, wird außerhalb
der Transition nicht mehr sichtbar.
11.4 MVars
Bei der praktischen Programmierung, ist es aber oft doch recht praktisch, wenn unterschiedliche
Prozesse aufeinander warten können. Um dies auch in Transaktionen zu ermöglichen, können wir als
nächstes eine MVar auch auf der Ebene der STM zur Verfügung stellen:
Listing 55: MVar in STM
1
module MVar where
2
3
4
import C o n t r o l . Concurrent .STM
import C o n t r o l . Concurrent ( f o r k I O )
5
6
7
data MVar a = MVar ( TVar ( Maybe a ) )
d e r i v i n g Eq −− H i e r d u r c h koennen auch d i e s e MVars v e r g l i c h e n werden (==,/=)
8
9
10
11
newEmptyMVar : : STM (MVar a )
newEmptyMVar = do t <− newTVar Nothing
return (MVar t )
12
13
14
15
newMVar : : a −> STM (MVar a )
newMVar v = do t <− newTVar ( J u s t v )
return (MVar t )
16
17
18
19
20
21
22
takeMVar : : MVar a −> STM a
takeMVar (MVar t ) = do mv <− readTVar t
case mv o f
Nothing −> r e t r y
J u s t x −> do writeTVar t Nothing
return x
23
24
25
26
27
28
putMVar : : MVar a −> a −> STM ( )
putMVar (MVar t ) v = do mv <− readTVar t
case mv o f
Just
−> r e t r y
Nothing −> writeTVar t ( J u s t v )
29
30
31
32
33
readMVar : : MVar a −> STM a
readMVar m = do v <− takeMVar m
putMVar m v
return v
Wir verwenden eine ähnliche Idee, wie bei der Realisierung der Stäbchen. Hierbei müssen wir allerdings
von Bool nach Maybe für die gefüllte MVar verallgemeinern.
Alle von der MVar bekannten Funktionen sind nun als STM Anweisungen verfügbar (nicht mehr IO).
Die IO-Varianten können einfach mittels atomically definiert werden. Dennoch ist die Definition als
64
atomicallygggggggggg
ggreadTVargt1gggggggggggg
ggwriteTVargt2g42
ggreadTVargt3
ggwriteTVargt4g43
writeSet
readSet
∅
∅
{?t1,gv22)}
{?t2,g42)}
{?t1,gv42),g?t3,gv17)}
{?t2,g42),g?t4,g43)}
GemäßgglobalergOrdnung
lockg?writeSetg∪ readSet)
validate
gVersionsnummerg
allerggelesenengTVarsg
nochgaktuell?
rollback
unlockg?writeSetg∪ readSet)
nein
ja
commit
unlockg?writeSetg∪ readSet)
gSchreibegwriteSetgingechtegTVars
Abbildung 13: Ablauf einer erfolgreichen Transaktion (dank an Nils Ehmke)
STM Anweisungen sinnvoll, da sie so auch elegant mit beliebigen anderen STM Aktionen komponiert
werden können.
Eine leere MVar modellieren wir also durch eine TVar, welche den Wert Nothing enthält. Sowohl bei
takeMVar als auch bei putMVar kann die Transaktion abgebrochen werden.
11.5 Alternative Komposition
Haskell bietet eine weitere Abstraktion, welche es ermöglicht, zu fehlgeschlagenen Transaktionen,
Alternativtransaktionen zu definieren.
orElse :: STM a -> STM a -> STM a
Die Idee ist, dass sich orElse t1 t2 (= t1 ‘orElse‘ t2) genau so verhält, wie t1, falls t1 erfolgreich
ist (d.h. kein retry ausführt) und wie t2, falls t1 nicht erfolgreich war und retry ausführte. Als
einfache Beispiele für die Verwendeung von orElse können wir die MVar-Implementierung um tryTake
und tryPut erweitern:
Listing 56: tryTakeMVar und tryPutMVar
1
2
3
4
5
6
tryTakeMVar : : MVar a −> STM ( Maybe a )
tryTakeMVar tVar = ( do
v <− takeMVar tVar
return ( J u s t v ) )
‘ orElse ‘
return Nothing
7
8
9
10
11
12
13
tryPutMVar : : MVar a −> a −> STM Boolean
tryPutMVar tVar v = ( do
putMVar tVar v
return True )
‘ orElse ‘
return F a l s e
Mit orElse haben wir also, neben der sequentiellen Koposition, eine zweite Möglichkeit, Transaktionen
zu komponieren. Beide können geschachtelt verwendet werden, so dass z.B. tryPutMVar auch wieder
in Sequenzen oder anderen orElses verwendet werden kann.
ToDo: LChan
65
11.6 Implementierung von STM
Es gibt eine Vielzahl von Implementierungen von Transaktionskonzepten. Hierbei geht man häufig
von unterschiedlichen Grundvoraussetzungen aus. Als Beispiel seien hier Datenbank-Transaktionen
genannt, bei welchen bereits vor der Ausführung alle beteiligten Resourcen bekannt sind. Dies können
wir für unsere Transaktionen nicht fordern, da sie dynamisch während der Ausführung zusammengesetzt werden. Insbesondere können einzelne gelesene Werte den weiteren Programmablauf beeiflussen,
was über eine Programmverzweigung bis hin zu völlig anderen STM-Aktionen führt, aus denen die
restliche Transaktion besteht.
In Haskell ist eine optimistische Ausführungsstrategie für die STMs realisiert. Während der Ausführung
gehen wir zunächst davon aus, dass wir erfolgreich sein werden und die Transaktion unabhängig von
anderen nebenläufigen Threads abschließen können. Während der Ausführung werden also zunächst
gar keine Resourcen gelockt.
Dies bedeutet aber auch, dass wir zunächst noch keine TVars persistent (also für andere Threads
sichtbar) verändern dürfen. Ersatzweise protokollieren wir während der Ausführung alle writeTVarAktionen mit und können sie dann später am Ende der Transaktion in einer speziellen Commitphase
realisieren. Falls wir ein retry erreichen, können wir sie dann entsprechend auch verwerfen.
Bevor eine Transaktion in der Commitphase global realisiert werden kann, müssen wir uns aber noch
überzeugen, dass sie auf einem konsistenten Zustand des Gesamtsystems basierte. So könnte es sein,
dass während der Ausführung der Transaktion T1 einige TVars gelesen wurden, dann eine andere
Transaktion T2 einige TVars (darunter auch die bereits gelesenen ändert) und danach T1 noch von
T2 geänderte TVars liest. Dann wäre T1 ja auf Basis einer inkonsistenten Sicht durchgelaufen, was
bedeutet, dass kein Commit erfolgen darf. In diesem Fall sollte ein Rollback ausgeführt, die Änderungen
der Transaktion verworfen und die Transaktion erneut gestartet werden.
Um feststellen zu können, ob gelesene TVars noch aktuell sind, erweitern wir die TVars um Versionsnummern, welche bei jedem Verändern einer TVar im Commit hochgezählt werden. Zur Konstitenzüberprüfung können wir dann dem eigentlichen Commit eine Validierungsphase vorschieben, in der
für alle gelesenen TVars überprüft wird, ob die Versionsnummer beim Lesen während der Ausführung
der Transaktion noch aktuell ist. Um dies durchführen zu können, protokollieren wir während der
Transaktionsausführung neben den geschriebenen TVars auch die gelesenen TVars und ihre Versionsnummer mit.
War die Validierung erfolgreich, führen wir das Commit aus, sonst machen wir ein Rollback. Um aber
zu verhindern, dass mehrere Commits/Validierungen gleichzeitig ausgeführt werden, ist es notwendig
diesen kritischen Teil der Transaktion durch Locks zu sichern. Mögliche Lösungen sind hier das setzen
eines globalen Locks oder das Locken aller gelesenen und geschriebenen TVars. Da zu diesem Zeitpunkt
alle gelesenen/geschriebenen TVars bekannt sind, könen wir Sie gemäß einer globalen Ordnung auf den
TVars locken und vermeiden Verklemmungen. Bezogen auf die dinnierenden Philosophen entspricht
dies der Deadlockvermeidung dadurch, dass ein Philosoph seine Stäbchen in einer anderen Reihenfolge
nimmt, als die anderen.
Der gesamte Ansatz ist noch einmal in Abbildung 13 zusammengefasst.
In der Vorlesung haben wir den Ansatz in Erlang implementiert. Der Code steht auf der Web-Seite
der Vorlesung zur Verfügung.
Eine Besonderheit bei der Implementierung wurde bei der Umsetzung des Chan-Beispiels deutlich.
Wie wir ja bereits erläutert haben, sollte der isEmpty-Bug nicht mehr vorhanden sein. Es zeigt sich
aber, dass readChan in diesem Beispiel doch blockiert. Auch bei anderen Anwendungen von readChan
blockiert der ausführende Thread.
Analysiert man stufenweise einmal, welche MVar-Aktionen bei einem readChan ausgeführt werden
ergibt sich folgende Liste:
readChan ch
takeMVar read
takeMVar rEnd
putMVar read ...
Splittet man dies nun in die hierbei ausgeführten TVar-Aktionen auf, ergibt sich folgendes Bild:
takeMVar read
readTVar read
writeTVar read Nothing
66
readChan ch
takeMVar rEnd
putMVar
readTVar
writeTVar
readTVar
writeTVar
read ...
rEnd
rEnd Nothing
read
read (Just ...)
Die TVar read wird aso zwei Mal gelesen und dazwischen einmal geschrieben. Dies bedeutet, dass
die ausgeführte Transaktion beim zweiten Lesen eigentlich den bereits lokal veränderten Zustand
verwenden muss. Da dies aber in der ersten Implementierung von readTVar nicht geschieht, sondern
erneut der globale, noch unveränderte (gefüllte) MVar-Zustand gelesen wird, suspendiert putMVar
auf der noch gefüllten MVar read, anstatt diese wieder mit dem Zeiger auf den neuen Listenanfang
zu aktualisieren. Es ist also wichtig, beim lesen von TVars die lokalen Änderungen innerhalb der
Transaktion zu berücksichtigen, was in der Erlang Implementierung recht einfach realisiert werden
kann.
Eigentlich erscheint es als überflüssig, in Transaktionen einzelne TVars mehr als einmal zu lesen, da
man sich alternativ ja auch den alten Wert merken könnte. Auf Grund der Kompositionalität von
STMs, tritt dieser Fall in der Praxis aber doch öfter auf und muss für eine korrekte Implementierung
berücksichtigt werden.
Dann kann retry suspendieren, bis eine in der zum retry geführten Transaktion eine gelesene TVar
verändert wurde. Ansonsten würde die Transaktion wieder das gleiche Verhalten zeigen und wieder in
das gleiche retry laufen.
Beachte aber: Validierung und Commit müssen atomar ausgeführt werden. Hierzu lock in jeder TVar.
Diese werden immer bezüglich globaler Ordnung genommen und auch freigegenen ⇒ es ist kein Deadlock mehr möglich (Alternative: globaler lock, aber dann weniger Nebenläufigkeit).
Beachte: Es ist nicht möglich, dass sich alle Aktionen wechselseitg blockieren. Ein Restart findet nur
nach commit durch andere Transaktionen statt. Es gibt aber Probleme bei großen Transaktionen (z. B.
Inventur eines Bücherbestades), da diese durch viele kleine Transaktionen immer wieder unterbrochen
werden und somit verhungern“ würden.
”
Andere Implementierung sind möglich, welche noch optimistischer oder auch pessimistischer sein
können.
67
Herunterladen