Skript zur Vorlesung Nebenläufige und verteilte Programmierung

Werbung
Skript zur Vorlesung
Nebenläufige und verteilte Programmierung
WS 2012/2013
Priv. -Doz. Dr. Frank Huch
17. Juni 2014
Version: 1.1
Dieses Skript basiert auf einer studentischen Vorlesungsmitschrift von Lutz Seemann, welche er dem
Dozenten dankenswerter Weise zur Verfügung gestellt hat.
Inhaltsverzeichnis
1 Einleitung
1.1 Sequentielle Programmierung . . . . . . . . . . . . . . . . . .
1.2 Begriffe . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
1.2.1 Nebenläufigkeit (Concurrency) . . . . . . . . . . . . .
1.2.2 Parallelität . . . . . . . . . . . . . . . . . . . . . . . .
1.2.3 Verteilte Programmierung (Distributed Programming)
1.3 Inhalt der Vorlesung . . . . . . . . . . . . . . . . . . . . . . .
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
1
1
1
1
1
2
2
. . . . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . . . .
Verwendung von Semaphoren)
. . . . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . . . .
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
2
3
4
4
5
6
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
7
7
8
8
8
9
10
10
11
12
12
14
14
17
17
19
in Haskell
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
22
22
23
24
2 Nebenläufige Systeme
2.1 Kommunikation . . . . . . . . . . . . . . . . .
2.1.1 Semaphor (Dijkstra ’68) . . . . . . . .
2.1.2 Producer-/Consumer Problem (andere
2.1.3 Dinierende Philosophen . . . . . . . .
2.1.4 Monitore (Dijkstra ’71, Hoare ’74) . .
3 Nebenläufige Programmierung in Java
3.1 Java . . . . . . . . . . . . . . . . . . . . . . . . . . .
3.2 Interface der Thread-Objekte . . . . . . . . . . . . .
3.2.1 Dämonthreads . . . . . . . . . . . . . . . . .
3.2.2 sleep()-Methode: . . . . . . . . . . . . . . . .
3.3 Synchronisation . . . . . . . . . . . . . . . . . . . . .
3.4 Synchronized Klassen-Methoden . . . . . . . . . . . .
3.5 Synchronized Anweisungen . . . . . . . . . . . . . . .
3.6 Unterscheidung der Synchronisation im OO-Kontext
3.7 Kommunikation zwischen Threads . . . . . . . . . .
3.7.1 Semantik . . . . . . . . . . . . . . . . . . . .
3.7.2 Variante von wait() mit Timeout: . . . . . .
3.7.3 Fallstudie: einelementiger Puffer . . . . . . .
3.8 Beenden von Threadausführungen . . . . . . . . . .
3.9 Warten auf Ergebnisse . . . . . . . . . . . . . . . . .
3.10 ThreadGroups . . . . . . . . . . . . . . . . . . . . .
4 Nebenläufige Programmierung
4.1 Concurrent Haskell . . . .
4.2 Kommunikation . . . . . .
4.3 Kanäle in Haskell . . . . .
5 Erlang
5.1 Sequentielle Programmierung in Erlang . . .
5.2 Nebenläufige Programmierung . . . . . . . . .
5.3 Ein einfacher Key-Value-Store . . . . . . . . .
5.4 Wie können Prozesse in Erlang synchronisiert
5.5 MVar . . . . . . . . . . . . . . . . . . . . . .
5.6 Nebenläufige Programmierung in Erlang . . .
5.7 Verteilte Programmierung in Erlang . . . . .
5.8 Verbinden unabhängiger Erlang-Prozesse . . .
5.8.1 Veränderungen des Clients . . . . . . .
5.8.2 Veränderungen des Servers . . . . . .
5.9 Robuste Programmierung . . . . . . . . . . .
5.10 Systemabstraktionen . . . . . . . . . . . . . .
. . . . .
. . . . .
. . . . .
werden?
. . . . .
. . . . .
. . . . .
. . . . .
. . . . .
. . . . .
. . . . .
. . . . .
6 Grundlagen der Verteilte Programmierung
6.1 7 Schichten des ISO-OSI-Referenzmodells . . . .
6.2 Protokolle des Internets . . . . . . . . . . . . . .
6.3 Darstellung von IP-Adressen/Hostnames in Java
6.4 Netzwerkkommunikation . . . . . . . . . . . . . .
6.4.1 UDP in Java . . . . . . . . . . . . . . . .
6.5 Socket-Optionen . . . . . . . . . . . . . . . . . .
i
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
25
25
26
27
28
29
30
30
30
30
31
32
33
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
34
34
35
36
36
36
40
6.6
6.7
Sockets in Erlang . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Verteilung von Kommunikationsabstraktionen . . . . . . . . . . . . . . . . . . . . . . .
40
41
7 Synchronisation durch Tupelräume
7.1 Java-Spaces . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
7.2 Implementierung von Tupelräumen in Erlang . . . . . . . . . . . . . . . . . . . . . . .
42
44
44
8 Spezifikation und Testen von Systemeigenschaften
46
9 Linear Time Logic (LTL)
9.1 Implementierung von LTL zum Testen . . . . . . . . . . . . . . . . . . . . . . . . . . .
9.2 Verifikation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
9.3 Simulation einer Turing-Maschine . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
46
48
52
53
10 Transaktionsbasierte Kommunikation
10.1 Ein Bankbeispiel in Concurrent Haskell . . . . . . .
10.1.1 Gefahren bei der Programmierung mit Locks
10.2 Transaktionsbasierte Kommunikation (in Concurrent
10.2.1 Beispielprogramm . . . . . . . . . . . . . . .
10.3 Synchronisation mit Transaktionen . . . . . . . . . .
54
54
55
56
57
57
ii
. . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . .
Haskell als anderen Ansatz)
. . . . . . . . . . . . . . . .
. . . . . . . . . . . . . . . .
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
1 Einleitung
1.1 Sequentielle Programmierung
In der sequentiellen Programmierung führen Programme Berechnungen Schritt für Schritt aus. Hierdurch ergeben sich insbesondere Probleme bei der Entwicklung reaktiver Systeme, wie z.B. GUIs,
Betriebssystemprogrammen oder verteilten/Web-Applikationen.
Hier sollen alle möglichen Anfragen/Eingaben quasi “gleichzeitig” behandelt werden können.
Ein erster Ansatz hierzu war Polling. Alle möglichen Anfragen werden in einer großen Schleife bearbeitet.
Ein reaktives System läuft potentiell unendlich lange und reagiert auf seine Umgebung (z. B. GUI,
Textverarbeitung, Webserver, Betriebssystemprozesse). Eine Besonderheit ist hierbei dass meist mehrere Anfragen/Eingaben möglichst gleichzeitig bearbeitet werden sollen. Wie ist es möglich Reaktivität
zu gewährleisten?
Ein erster Ansatz wäre mittels Polling möglich, d. h. der Abfrage aller möglichen Anfragen in einer
großen Schleife, welcher jedoch folgende Nachteile hat:
• Busy Waiting verbraucht Systemressourcen.
• Code ist schlecht strukturiert, da alle alle Anfragen in die Schleife integriert werden müssen.
Modularität ist nur eingeschränkt über Unterprozeduren möglich.
• Bearbeitung einer Anfrage blockiert andere Anfragen, da Rückkehr in Schleife erst nach Beendigung vorheriger Anfragen. Somit sequentielles Bearbeiten der Anfragen und keine Reaktivität
während der Bearbeitung einer Anfrage.
Lösung:Nebenläufigkeit (Concurrency)
Mehrere Threads (Fäden) können nebenläufig ausgeführt werden und die einzelnen Dienste eines
reaktiven Systems zur Verfügung stellen. Hierdurch ergeben sich folgende Vorteile:
• kein Busy Waiting (auf Interruptebene verschoben)
• Bessere Strukturierung: Dienst =
b (ggf. mehreren) Threads
• eventuell können mehrere Anfragen gleichzeitig bearbeitet werden
• System ist immer reaktiv
Beachte hierbei, dass die letzten beiden Punkte von der Implementierung der Threads auf der (Betriebs-)Systemebene abhängen.
1.2 Begriffe
1.2.1 Nebenläufigkeit (Concurrency)
Durch die Verwendung von Threads/Prozessen können einzelne Aufgaben/Dienste eines Programms
unabhängig von anderen Aufgaben/Diensten programmiert werden.
1.2.2 Parallelität
Meist in Verbindung mit High-Performance-Computing. Durch die parallele Ausführung mehrerer
Prozesse (auf in der Regel mehreren Prozessoren) wird eine schnellere Ausführung angestrebt. Beachte
aber, dass manchmal auch auf Ein-Prozessor-Systemen durch Nebenläufigkeit Performance-Gewinne
erreicht werden können, z. B. bei geringerer Performance von Netzwerkkarten.
Früher wurden insbesondere Shared-Memory-Systeme betrachtet, welche heute aber wieder in Form
der Multicore-Processoren an Relevanz gewinnen. Allerdings werden oft auch Rechner-Cluster zur
parallelen Programmierung eingesetzt (Distributed-Memory-Systeme) bis hin zu Netzwerken im Internet. Der Vorteil ist die Unabhängigkeit von Spezialhardware, sowie eine einfache Steigerung der
zur Verfügung stehenden Prozessoren.
Das größte Problem bei der parallelen Programmierung ist in der Regel eine möglichst gleichmäßige
Ausnutzung der Prozessoren (Vermeidung von Sequentialisierung) und eine Minimierung der Kommunikation.
1
Wir werden in dieser Vorlesung nicht weiter speziell auf die parallele Programmierung eingehen. Die
präsentierten Techniken finden hierbei aber durchaus auch Anwendung, wenngleich der Fokus sicherlich meist auf andere Aspekte gerichtet werden muss.
1.2.3 Verteilte Programmierung (Distributed Programming)
Ein verteiltes System besteht aus mehreren physisch getrennten Prozessoren, welche über ein Netzwerk
verbunden sind. Zum Einen werden solche Systeme wie zuvor erwähnt zur parallelen Programmierung
eingesetzt. Darüber hinaus sind aber viele Anwendungen von ihrer Aufgabenstellung schon verteilt, so
dass sie auch entsprechend implementiert werden müssen. Beispiele sind Telefonsysteme oder Chats.
Manchmal stehen auch bestimmte Ressourcen nur an bestimmten Stellen zur Verfügung, so dass
ebenfalls eine Verteilung der Anwendung notwendig ist, z. B. Geldautomaten, spezielle Datenserver.
Eine Verteilung wird manchmal auch zur Performancesteigerung, z. B. Lastverteilung eingesetzt und
kann auch zur Fehlertoleranz durch redundante Informationsvorhaltung eingesetzt werden.
Verteilte Systeme werden in der Regel mit lokaler Nebenläufigkeit kombiniert, um Reaktivität gegenüber der Netzwerkkommunikation zu ermöglichen. Durch mehrere verteilte Prozesse kann es aber
auch ohne Nebenläufigkeit zu den gleichen Problemen wie bei rein nebenläufigen Systemen kommen.
1.3 Inhalt der Vorlesung
• Vergleich unterschiedlicher Konzepte zur nebenläufigen und verteilten Programmierung am Beispiel von Java, Erlang, Concurrent Haskell und weiteren Konzepten.
• Ausgewählte verteilte Algorithmen.
2 Nebenläufige Systeme
Generell sind immer drei Aspekte zu berücksichtigen:
• Definition und Start von Threads (ggf. auch deren Terminierung)
• Kommunikation zwischen Threads
• Synchronisation von Threads
Hierbei hängen die beiden letzten Punkte stark miteinander zusammen, da Kommunikation nur
schwerlich ohne Synchonisation möglich ist und jede Synchronisation natürlich auch eine Art Kommunikation ist.
Wir betrachten folgendes Beispiel in Pseudocode:
1
2
par { while true do w r i t e ( ” a ” ) }
{ while true do w r i t e ( ”b” ) }
wobei Semantik des par-Konstrukts: Spaltet Thread in zwei Unterthreads auf und führt diese nebenläufig aus.
ababab ...
aaabaaabaabbbaa ...
aaaaa ...
bbbbbb ...
Oft (aber nicht immer) deterministische Ausführung d. h. gleiche Ausgabe bei allen Läufen), die durch
den Scheduler geregelt wird. Dies muss aber nicht immer der Fall sein und ein Programmierer darf
sich hierauf auf keinen Fall verlassen. Der Schedule teilt den Prozessen die Prozessorzeit zu. Im Wesentlichen werden dabei folgende Arten von Schedulern unterschieden:
• kooperatives Multitasking ⇒ ein Thread rechnet solange, bis er die Kontrolle abgibt (z. B. mittels
yield). Die anderen warten (suspendiert) auf das Ergebnis (z. B. aaaaa ... und bbbbbb ...)
• präemptives Multitasking ⇒ der Scheduler entzieht den Prozessen regelmäßig die Kontrolle.
Dies erfordert eine aufwändigere Implementierung, bietet aber mehr Programmierkomfort zur
2
Gewährleistung der Reaktivität des Systems (z. B. alle Ausgaben außer aaaaa ... und
bbbbbb ...)
Prinzipiell sollten Programmierer nebenläufiger Systeme möglichst für beliebige Scheduler entwickeln.
Einzige Ausnahme stellt manchmal die Einschränkung auf präemptives Multitasking dar. Aber selbst
dies ist oft nicht notwendig, da durch Synchronisation die Kontrolle häufig abgegeben wird und somit
auch kooperatives Multitasking ausreicht.
2.1 Kommunikation
¿¿¿¿¿¿¿ origin/master Wird oft über geteilte Variablen realisiert.
Listing 1: Beispiel
1
2
3
int i = 0;
par { i := i + 1 } { i := i ∗ 2 }
write ( i ) ;
Semantik: wie zuvor.
Es ergibt sich Nichtdeterminismus mit mehreren Ergebnissen:
i = 0; i = i + 1; i = i * 2
i = 0; i = i * 2; i = i + 1
mit dem Ergebnis 2
mit dem Ergebnis 1
Wann ist ein Programm also korrekt?
• Ein sequentielles Programm ist korrekt, falls es für alle Eingaben ein korrektes Ergebnis liefert.
• Ein nebenläufiges Programm ist korrekt, wenn es für alle Eingaben und alle Schedules das richtige
Ergebnis liefert.
In der Praxis (leider) oft “für einen (den testbaren) Schedule”, wobei folgende Probleme auftreten
können:
• Portierung auf andere Architektur ⇒ anderer Schedule
• Erweiterung des Systems um zusätzliche Threads/zusätzliche Berechnungen ⇒ verändertes Scheduling
• Beeinflussung durch Ein-/Ausgaben
⇒ Deshalb: für alle Schedules !!! (manchmal ist auch eine Einschränkung auf präemptives Multitasking okay)
⇒ Bei Korrektheit für alle Schedules verliert man den Vorteil des präemptiven Multitaskings. Deshalb wird die Korrektheit oft nur “für alle fairen Schedules” verlangt.
Der Begriff fair bedeutet hierbei, dass jeder Thread nach endlicher Zeit wieder an die Reihe kommt,
also falls möglich einen Schritt ausführt.
Als Konsequenz daraus ist das Testen der Programme schwierig, da ggf. unendlich viele Abläufe
betrachtet werden müssen und eine Beeinflussung des Schedules fast unmöglich ist. Deshalb: formale
Spezifikation, formale Verifikation, Testtools und Debugger mit Einfluss auf den Scheduler.
Weiteres Problem im Beispiel: Welche Anweisungen sind atomar? Der Compiler wird ein solches
Programm sicherlich in Maschinencode oder Pseudocode für eine abstrakte Maschine übersetzen.
Listing 2: Stackmaschinencode des Programms
1
2
3
int i = 0;
par { LOAD i ; LIT 1 ; ADD; STORE i }
{ LOAD i ; LIT 2 ; MULT; STORE i }
Aufgrund dieses Codeaufbaus ist dann ebenfalls folgende Ausführung möglich:
LOAD i; LIT 2; MULT;
LOAD i; LIT 1; ADD; STORE i;
STORE i
Diese liefert das Ergebnis i = 0, was sicherlich nicht das gewünschte Ergebnis ist. Bei dieser Ausführung
ist der 1. Thread in den 2. Thread “eingebettet”. Zuweisungen können aber nicht generell atomar
gemacht werden (wegen z. B. großer Berechnungen, Funktionsaufrufe).
3
Lösung: Der Programmierer bekommt die Möglichkeit zur Synchronisation. Im Folgenden werden
zunächst einige bekannte Konzepte wiederholt.
2.1.1 Semaphor (Dijkstra ’68)
Ziel: atomare (ununterbrochene) Ausführung von Programmabschnitten. Semaphore besitzen eine feste Schnittstelle, bestehen aus einem Integer und einer zugeordneten Prozesswarteschlange und stellen
einen abstrakten Datentyp mit zwei atomaren Operationen P und V dar, wobei die Operationen der
Veränderung der Variablen dienen.
Listing 3: Semaphor-Operationen
1
2
3
4
5
P( s ) :
IF ( s >= 1 ) THEN { s := s − 1 ; }
ELSE { P r o z e s s wird i n Warteschlange zu s e i n g e t r a g e n
und s u s p e n d i e r t ;
s := s − 1 ; }
6
7
8
9
10
V( s ) :
s := s + 1 ;
IF ( Warteschlange zu s n i c h t l e e r ) THEN { Erwecke e r s t e n P r o z e s s
d e r Warteschlange zu s ; }
Das P steht hierbei für passieren (passeer) und das V für verlassen (verlaat). Das obige Beispiel lässt
sich mittels Semaphor also wie folgt implementieren, was das Ergebnis i = 0 verhindert:
1
2
3
int i = 0;
semaphor s := 1 ;
par { P( s ) ; i := i + 1 ; V( s ) ; } { P( s ) ; i := i ∗ 2 ; V( s ) ; }
Der initiale Wert des Semaphors bestimmt die maximale Anzahl der Prozesse im kritischen Bereich.
Meist ist dies 1, was zu einem binären Semaphor führt.
2.1.2 Producer-/Consumer Problem (andere Verwendung von Semaphoren)
n Producer erzeugen Waren, die dann von m Consumern verbraucht werden. Zunächst gehen wir bei
der Modellierung von der Verwendung eines unbeschränkten Puffers aus:
Listing 4: Zunächst mit unbeschränktem Puffer
1
semaphor num := 0 ;
2
3
4
5
6
7
8
Producer :
while ( true ) {
produce p r o d u c t ;
push p r o d u c t t o b u f f e r ;
V(num ) ;
}
9
10
11
12
13
14
15
Consumer :
while ( true ) {
P(num ) ;
p u l l p r o d u c t from b u f f e r ;
consume p r o d u c t ;
}
Bemerkungen:
Zeile 1: Füllstand des Puffers
Zeile 12: an dieser Stelle findet ggf. eine Suspension statt
4
Der Semaphor fungiert als Produktzähler für den Puffer. Hochzählen mittel V im Producer und
Herunterzählen mittels P im Consumer.
Wir haben aber zunächst noch keine Synchronisation auf dem Puffer modelliert (kritischer Bereich).
Um hier zusätzlich synchronisieren zu können, fügen wir einen zweiten Semaphor ein, welcher wie
oben skiziert den kritischen Bereich schützt.
Listing 5: Zusätzlich mit Synchronisation
1
2
semaphor num := 0 ;
semaphor b a c c e s s := 1 ;
3
4
5
6
7
8
9
10
11
Producer :
while ( true ) {
produce p r o d u c t ;
P( b a c c e s s ) ;
push p r o d u c t t o b u f f e r ;
V( b a c c e s s ) ;
V(num ) ;
}
12
13
14
15
16
17
18
19
20
Consumer :
while ( true ) {
P(num ) ;
P( b a c c e s s ) ;
p u l l p r o d u c t from b u f f e r ;
V( b a c c e s s ) ;
consume p r o d u c t ;
}
Bemerkung:
Zeile 1: Füllstand des Puffers
Zeile 15: an dieser Stelle findet ggf. eine Suspension statt
Es fällt auf, dass die Verwendung von P-/V-Operationen schnell unübersichtlich werden kann. Es gibt
keine strukturelle Zuordnung von P/V.
2.1.3 Dinierende Philosophen
An einem Tisch mit Essen sitzen 5 Philosophen. Es gibt allerdings nur 5 Stäbchen zum Essen, wobei
eine Person immer 2 Stäbchen benötigt, so dass jeweils 1 Stäbchen mit dem Nachbarn geteilt werden muss. Ein Philosoph denkt so lange, bis er hungrig wird und versucht dann nacheinander beide
Stäbchen aufzunehmen und zu essen. Es findet keine zusätzliche Kommunikation untereiander statt.
Die Implementierung mit Semaphoren sieht wie folgt aus:
Listing 6: Ein Philosoph i von n lässt sich dann wie folgt implementieren
1
2
3
4
5
6
7
8
9
while ( true ) {
think ( ) ;
g e t hungry ( ) ;
P( s t i c k ( i ) ) ;
P( s t i c k ( ( i +1)%n ) ) ;
eat ( ) ;
V( s t i c k ( i ) ) ;
V( s t i c k ( ( i +1)%n ) ) ;
}
Die Reihenfolge der V -Operationen ist hierbei egal. Problem der Implementierung: Deadlocks (Verklemmungen), falls alle Philosophen ein Stäbchen nehmen.
Eine mögliche Lösung wird durch ein Zurücklegen des ersten Stäbchens aufgelöst, falls kein Zweites
verfügbar ist.
5
Listing 7: Implementierung mit Zurücklegen
1
2
3
4
5
6
7
8
9
10
11
while ( true ) {
think ( ) ;
g e t hungry ( ) ;
P( s t i c k ( i ) ) ;
IF ( s t i c k ( ( i +1)%n ) == 0 ) THEN { V( s t i c k ( i
ELSE { P( s t i c k ( (
eat ( ) ;
V( s t i c k ( i
V( s t i c k ( (
}
}
)); }
i +1)%n ) ) ;
));
i +1)%n ) ) ;
An der Stelle P(stick((i+1)%n)) kann ein anderer Philosoph das Stäbchen wegnehmen. Der letzte
Philosoph muss sein Stäbchen aber immer wieder weglegen. Livelocks sind aber nicht ausgeschlossen, d. h. ein oder mehrere Philosophen können verhungern (trotz eines fairen Schedules). Nachteile
von Semaphoren: P-/V-Operatoren sind schwer zuordenbar (sind eigentlich wie Klammern, aber nur
während der Laufzeit; nicht in der Syntax) ⇒ Codestrukturierung ist aus softwaretechnischer Sicht
schwer und bietet viele mögliche Fehlerquellen ⇒ lieber: strukturiertes Konzept
2.1.4 Monitore (Dijkstra ’71, Hoare ’74)
Monitore entsprechen einer Menge von Prozeduren und Datenstrukturen, die als Betriebsmittel betrachtet mehreren Threads zugänglich sind, aber nur von einem Thread zur Zeit benutzt werden
können ⇒ in einem Monitor befindet sich höchstens ein Thread.
Als Beispiel betrachten wir die Monitor-Implementierung eines Puffers:
Listing 8: Implementierung eines beschränkten Puffers als Monitor
1
2
3
4
5
6
7
8
9
10
11
monitor b u f f e r {
[ i n t ] [ 0 . . . ( n −1)] c o n t e n t s ;
i n t num , wp , rp = 0 ;
queue s e n d e r , r e c e i v e r ;
p r o c e d u r e push ( i n t item ) {
IF (num == n ) THEN { d e l a y ( s e n d e r ) ; }
c o n t e n t s [ wp ] = item ;
wp := (wp+1) mod n ;
num := num + 1 ;
continue ( r e c e i v e r ) ;
}
12
function int pull () {
IF (num == 0 ) THEN { d e l a y ( r e c e i v e r ) ; }
i n t item := c o n t e n t [ rp ] ;
rp := ( rp +1) mod n ;
num := num − 1 ;
continue ( s e n d e r ) ;
r e t u r n item ;
}
13
14
15
16
17
18
19
20
21
}
Bemerkungen:
Zeile 4: Prozessqueues
Zeile 6: Puffer ist voll
Zeile 14: Puffer ist leer
Semantik:
• num als Anzahl der Elemente im Puffer
• wp als Pointer zum Schreiben
6
• rp als Pointer zum Lesen
• delay(Q) ⇒ der aufrufende Prozess wird in die Warteschlange Q eingefügt
• continue(Q) ⇒ aktiviert den ersten Thread in der Warteschlange Q, falls ein solcher vorhanden
ist
Vorteile:
• innerhalb der Monitorprozesse können Synchronisationsprobleme ignoriert werden
• alle kritischen Bereiche/Datenstrukturen befinden sich in einem abstrakten Datentyp
Nachteile:
• häufig zu viel Synchronisation ⇒ sequentielles Programm oder Deadlocks
• wird in Programmiersprachen selten unterstützt (in Java existiert ein monitorähnliches Konzept)
3 Nebenläufige Programmierung in Java
3.1 Java
Programmiersprache des Internets ⇒ Reaktivität notwendig ⇒ Nebenläufigkeit.
Klasse Thread im Paket java.lang:
• eigene Threads als Unterklasse von Thread implementierbar
• eigentlicher Code in run()-Methode
• Anlegen von Threads mit new und Konstruktor1
• Ausführung des Threads mittels der Methode start()2
Listing 9: Beispiel
1
2
3
4
5
6
7
8
9
public c l a s s C o n c u r r e n t P r i n t e r extends Thread {
private S t r i n g s t r ;
public C u r r e n t P r i n t ( S t r i n g s t r 1 ) { s t r = s t r 1 ; }
public void run ( ) { while ( true ) { System . out . p r i n t ( s t r + ” ” ) ; } }
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
new C o n c u r r e n t P r i n t e r ( ” a ” ) . s t a r t ( ) ;
new C o n c u r r e n t P r i n t e r ( ”b” ) . s t a r t ( ) ;
}
}
Ausgabe: Menge aus a’s und b’s. Bei der Verwendung von green Threads, ggf. auch nur a’s oder nur
b’s.
Wegen fehlender Mehrfachvererbung in Java ist oft das Erben von der Threadklasse problematisch.
Deshalb besteht alternativ die Möglichkeit der Implementierung des Interfaces Runnable:
1
2
3
4
5
6
7
8
9
10
11
public c l a s s C o n c u r r e n t P r i n t implements Runnable {
String str ;
public C o n c u r r e n t P r i n t ( S t r i n g s t r 1 ) { s t r = s t r 1 ; }
public void run ( ) { while ( true ) { System . out . p r i n t ( s t r + ” ” ) ; } }
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
Runnable aThread = new C o n c u r r e n t P r i n t ( ” a ” ) ;
Runnable bThread = new C o n c u r r e n t P r i n t ( ”b” ) ;
new Thread ( aThread ) . s t a r t ( ) ;
new Thread ( bThread ) . s t a r t ( ) ;
}
}
1 initialisiert
2 diese
nur die Attribute ⇒ ist also sehr klein!
führt die run()-Methode aus
7
Achtung
this liefert jetzt kein Thread-Objekt mehr, sondern ein Runnable-Objekt. Das aktuelle Thread-Objekt
bekommt man mittels Thread.currentThread().
3.2 Interface der Thread-Objekte
Name ⇒ standard main Thread, Thread-0, Thread-1, ... / getName, setName
Prioritäten ⇒ Zahlen zwischen MIN_PRIORITY (1) und MAX_PRIORITY (10) mit Standard-Wert
NORM_PRIORITY (5) / setPriority, getPriority (⇒ auf die Prioritäten darf man sich aber nicht
verlassen!)
Threadgruppe ⇒ Zuordnung von Threads zu Gruppen als Strukturierungskonzept
Zustände:
• erzeugt ⇒ wird durch new Thread() erreicht
• aktivierbar ⇒ wird durch start oder yield sowie dem Eingabeende oder dem Ablauf der
Sleep-Zeit erreicht
• terminiert ⇒ wird erreicht, wenn run beendet wird
• nicht aktivierbar ⇒ wird durch das Warten auf eine Eingabe oder ein sleep erreicht
Das Thread-Objekt bleibt erhalten, solange es referenziert wird. Während der Ausführung existiert
eine “Eigenreferenz”.
Durch Aufruf der Methode yield wird dem Scheduler signalisiert, dass der Prozess die Kontrolle
abgeben kann. Bei green Threads führt dieser Aufruf in der Regel zum Threadswitch, falls dies möglich
ist. Bei präemptivem Scheduling kann yield auch keinen Efekt haben.
3.2.1 Dämonthreads
setDaemon(true) von start(), um einen Thread als Dämonthread zu deklarieren.
Idee: “unwichtiger” Hintergrundthread ⇒ Java Virtual Machine (JVM) terminiert, falls nur noch
Dämonthreads existieren.
Beispiel: AWT-Thread, Serverthreads, Garbage Collector
Scheduler: Es gibt unterschiedliche Implementierungen
• green Threads ⇒ kooperativer Scheduler in JVM (wird in der Regel nicht mehr unterstützt)
• native Threads ⇒ Verwendung von Betriebssystemprozessen mit präemptivem Multitasking
Vorteil:
• Ausnutzung moderner Multicore-Architekturen
• Wiederverwendung des Betriebssystem-Schedulers
Nachteil:
• größerer Verbrauch von Systemressourcen (keine light-weight Threads)
3.2.2 sleep()-Methode:
Die Objekt-Methode sleep deaktiviert den ausführenden Thread um die angegebene Zeit in Millisekunden. sleep kann eine InterruptedException werfen. Hierzu später mehr.
Listing 10: erste Variante
1
2
try { Thread . c u r r e n t T h r e a d ( ) . s l e e p ( 3 0 0 0 ) ; }
catch ( I n t e r r u p t e d E x c e p t i o n s e ) { . . . }
Listing 11: zweite Variante
1
this . s l e e p ( 3 0 0 0 ) ;
8
3.3 Synchronisation
Kombination aus Lock/Unlock-Konzept und Monitoren. Thread-Methoden können als synchronized
deklariert werden:
• In allen synchronized-Methoden eines Objektes darf sich maximal ein Thread aufhalten. Hierzu
zählen auch Unterbrechungen (auch unsynchronisierte Methoden), die in einer synchronizedMethode aufgerufen werden.
• kein Verlassen der synchronized-Methode durch sleep oder yield.
andere Sicht
• jedes Objekt besitzt einen Lock
• beim Versuch der Ausführung einer synchronized-Methode wird wie folgt vorgegangen:
– Lock verfügbar ⇒ Lock nehmen und fortfahren
– eigener Thread hat Lock ⇒ fortfahren
– sonst ⇒ auf Lock suspendieren
• Lock wird beim Verlassen der synchronized-Methode wieder freigegeben (auch bei Terminierung oder Exception)
Im Vergleich zu Semaphoren sind synchronisierte Methoden:
• strukturorientiert
• ein “unlock” kann nicht vergessen werden
• keine Suspension bei “mehrfachem” Nehmen des Locks
Im Vergleich zum Monitorkonzept sind synchronisierte Methoden flexibler, da einzelne Methoden
unsynchronisiert bleiben können, aber auch größere Gefahr von Fehlern.
Ein einfaches Beispiel soll die Verwendung von synchronized-Methoden veranschaulichen. Wir betrachten eine Implementierung einer Klasse für ein Bankkonto:
Listing 12: Beispiel
1
2
3
4
5
6
7
8
c l a s s Account {
private double b a l a n c e ;
public Account ( double i n i t i a l ) { b a l a n c e = i n i t i a l ; }
public synchronized double g e t B a l a n c e ( ) { return b a l a n c e ; }
public synchronized void d e p o s i t ( double amount ) {
b a l a n c e += amount ;
}
}
9
10
Account a = new Account ( 3 0 0 ) ;
11
12
...
13
14
a . deposit (100) . . . a . deposit (100);
// n e b e n l a e u f i g e Ausfuehrung
15
16
...
Die möglichen Ergebnisse ohne Synchronisation sind 400, 500 oder richtiger Unsinn (z. B. falls der
Double teilweise geschrieben wird, da diese Operation nicht atomar ist). Mit Synchronisation ist nur
das Ergebnis 500 möglich. synchronized ist für Konstruktoren nicht erlaubt, aber auch nicht sinnvoll,
da diese immer nur von einem Thread ausgeführt werden. Vererbte synchronized-Methoden müssen
nicht synchronisiert sein (⇒ verfeinerte Implementierung). Nicht überschriebene Methoden bleiben
synchronized. Nicht-synchronisierte Methoden können beim Überschreiben synchronized werden.
9
3.4 Synchronized Klassen-Methoden
Auch Klassenmethoden können synchronisiert werden: static synchronized analog wie oben, nur
auf Klassenebene. Keine Wechselwirkung mit synchronized Objektmethoden.
3.5 Synchronized Anweisungen
Vererbte synchronisierte Methoden müssen nicht zwingend wieder synchronisiert sein. Wenn man
solche Methoden überschreibt, so kann man das Schlüsselwort synchronized auch weglassen. Dies
bezeichnet man als verfeinerte Implementierung. Die Methode der Oberklasse bleibt dabei synchronized. Andererseits können unsynchronisierte Methoden auch durch synchronisierte überschrieben
werden.
Klassenmethoden, die als synchronisiert deklariert werden (static synchronized) haben keine Wechselwirkung mit synchronisierten Objektmethoden. Die Klasse hat also ein eigenes Lock.
Man kann sogar auf einzelne Anweisungen synchronisieren:
synchronized ( expr ) b l o c k
1
Dabei muss expr zu einem Objekt auswerten, dessen Lock dann zur Synchronisation verwendet wird.
Streng genommen sind synchronisierte Methoden also nur syntaktischer Zucker: So steht die Methodendeklaration
1
synchronized A m( a r g s ) b l o c k
eigentlich für
A m( a r g s ) { synchronized ( t h i s ) b l o c k }
1
Einzelne Anweisungen zu synchronisieren ist sinnvoll, um weniger Code synchronisieren bzw. sequentialisieren zu müssen:
private double s t a t e ;
1
2
public void c a l c ( ) { double r e s ;
3
4
// do some r e a l l y e x p e n s i v e c o m p u t a t i o n
...
5
6
7
// s a v e t h e r e s u l t t o an i n s t a n c e v a r i a b l e
synchronized ( t h i s ) {
state = res ;
}
8
9
10
11
}
12
Synchronisierung auf einzelne Anweisungen ist auch nützlich, um auf andere Objekte zu synchronisienen. Wir betrachten als Beispiel eine einfache Implementierung einer synchronisierten Collection:
1
2
3
4
class Store {
public synchronized boolean hasSpace ( ) {
...
}
5
public synchronized void i n s e r t ( int i )
throws N o S p a c e A v a i l a b l e E x c e p t i o n {
6
7
8
...
9
}
10
11
}
Wir möchten diese Collection nun verwenden wie folgt:
10
1
S t o r e s = new S t o r e ( ) ;
2
3
...
4
5
6
7
i f ( s . hasSpace ( ) ) {
s . insert (42);
}
Dies führt jedoch zu Problemen, da wir nicht ausschließen können, dass zwischen den Aufrufen von
hasSpace() und insert(int) ein Re-Schedule geschieht. Da sich das Definieren spezieller Methoden
für solche Fälle oft als unpraktikabel herausstellt, verwenden wir die obige Collection also besser
folgendermaßen:
1
2
3
4
5
synchronized ( s ) {
i f ( s . hasSpace ( ) ) {
s . insert (42);
}
}
3.6 Unterscheidung der Synchronisation im OO-Kontext
Wir bezeichnen synchronisierte Methoden und synchronisierte Anweisungen in Objektmethoden als
server-side synchronisation. Synchronisation der Aufrufe eines Objektes bezeichnen wir als client-side
synchronisation.
Aus Effizenzgründen werden Objekte der Java-API, insbesondere Collections, nicht mehr synchronisiert. Für Collections stehen aber synchronisierte Versionen über Wrapper wie synchronizedCollection, synchronizedSet, synchronizedSortedSet, synchronizedList, synchronizedMap oder synchronizedSortedMap zur Verfügung.
Sicheres Kopieren einer Liste in ein Array kann nun also auf zwei verschiedene Weisen bewerkstelligt
werden: Als erstes legen wir eine Instanz einer synchronisierten Liste an:
1
L i s t <I n t e g e r > u n s y n c L i s t = new L i s t <I n t e g e r > ( ) ;
2
3
4
// f i l l t h e l i s t
...
5
6
L i s t <I n t e g e r > l i s t = C o l l e c t i o n s . s y n c h r o n i z e d L i s t ( u n s y n c L i s t ) ;
Nun können wir diese Liste entweder mit der einfachen Zeile
1
I n t e g e r [ ] a = l i s t . toArray (new I n t e g e r [ 0 ] ) ;
oder über
1
Integer [ ] b ;
2
3
4
5
6
synchronized ( l i s t ) {
b = new I n t e g e r [ l i s t . s i z e ( ) ] ;
l i s t . toArray ( b ) ;
}
in ein Array kopieren. Bei der zweiten, zweizeiligen Variante ist die Synchronisierung auf die Liste
unabdingbar: Wir greifen in beiden Zeilen auf die Collection zu, und wir können nicht garantieren,
dass nicht ein anderer Thread die Collection zwischenzeitig verändert. Dies ist ein klassisches Beispiel
für Client-side synchronisation.
11
3.7 Kommunikation zwischen Threads
Komunikation geschieht über geteilte Objekte. Problem: Wann erhält die Variable den Wert? Als
Beispiel betrachten wir den einfachen Fall, dass ein Thread einem anderen einen Wert mitteilt und
dieser beim Empfänger ausgegeben wird. Den Code können wir in folgender Klasse zusammenfassen:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class C {
private i n t s t a t e = 0 ;
private boolean m o d i f i e d = f a l s e ;
public synchronized void p r i n t N e w S t a t e ( ) {
while ( ! m o d i f i e d ) { } ;
System . out . p r i n t l n ( s t a t e ) ;
modified = false ;
}
public synchronized void s e t V a l u e ( i n t v ) {
state = v ;
m o d i f i e d = true ;
System . out . p r i n t l n ( ” v a l u e s e t ” ) ;
}
}
Diese erste Lösung zeigt den erfolgten Versand durch Veränderung eines geteilten Objekts, hier boolesches Flag modified an. Ein großer Nachteil dieser Lösung ist sicherlich das Busy Waiting beim Leser
(Wurde modified verändert?).
Als Verbesserung können Threads in Java mittels wait()3 und Aufwecken mittels notify(). Es ergibt
sich folgender Code:
1
2
3
4
5
6
7
8
9
10
11
12
class C {
private i n t s t a t e = 0 ;
public synchronized void p r i n t N e w S t a t e ( ) {
wait ( ) ;
System . out . p r i n t l n ( s t a t e ) ;
}
public synchronized void s e t V a l u e ( i n t v ) {
state = v ;
notify ();
System . out . p r i n t l n ( ” v a l u e s e t ” ) ;
}
}
Thread 1 führt printNewState() und Thread 2 führt setValue(42) nebenläufig aus. Dies ergibt die
Ausgabe “value set”, “42” und nicht “42”, “value set” und auch nicht 0!
Bevor wir das Beispiel weiter verfeinern, wollen wir zunächst die Semantik der Methoden wait() und
notify() näher beleuchten.
3.7.1 Semantik
• wait() ⇒ suspendiert den ausführenden Thread und gibt den Lock des Objektes frei
• notif y() ⇒ erweckt einen4 Thead des Objektes und fährt direkt mit der eigenen Berechnung
fort. Wenn kein Thread suspendiert ist, dann wird nur fortgefahren (⇒ ein notify() wird nicht
gespeichert)
Ausgabe von Thread 1 nach der Ausgabe von Thread 2 oder Thread 2 ist fertig und anschließendes Suspendieren von Thread 1 (hier keine Ausgabe). printNewState() kann nur Änderungen
ausgeben, die nach seinem wait()-Aufruf passieren.
Wie können wir gewährleisten, dass auch vorhergehende “Nachrichten” ausgeben werden?
Wie bei der “busy waiting”-Idee verwenden wir zusätzlich noch ein Flag:
1
private boolean m o d i f i e d = f a l s e ;
3 verlässt
4 es
die Methode und versucht beim Aufwecken die synchronized Methode wieder neu zu betreten
ist nicht genau festgelegt welchen!
12
2
3
4
5
6
7
public synchronized void p r i n t N e w S t a t e ( ) {
i f ( ! modified ) { wait ( ) ; }
System . out . p r i n t l n ( s t a t e ) ;
modified = false ;
}
In setValue() wird zusätzlich (irgendwo) modified = true; eingefügt. Somit gehen Nachrichten,
welche früher gesendet wurden nicht verloren.
Was passiert aber bei mehreren schreibenden Threads?
Die erste Zustandsänderung kann übersehen werden und nicht ausgegeben werden. Als Lösung, müssen
wir also auch die “Sender” synchronisieren. Diese müssen also warten, bis ein alter Wert verarbeitet
wurde.
Listing 13: Nichtfunktionierende Lösung
1
2
3
4
5
6
7
public synchronized void s e t V a l u e ( i n t v ) {
i f ( modified ) { wait ( ) ; }
state = v ;
notify ();
m o d i f i e d = true ;
System . out . p r i n t l n ( ” v a l u e s e t ” ) ;
}
Nun stellt sich aber das Problem, dass notify() anstelle eines Lesers einen Schreiber erweckt und der
erste geschriebene Wert immer noch verloren geht. Es ist also notwendig, alle schlafenden Threads zu
erwecken einen lesenden Thread erweckt? Möglicherweise wird ein weiterer Schreiber erweckt und die
Nachricht geht immer noch verloren. Eine mögliche Lösung ist alle schlafenden Threads zu erwecken
und über ein bisschen Busy Waiting, die fälschlicherweise erweckten Threads wieder schlafen legen:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class C {
private i n t s t a t e = 0 ;
private m o d i f i e d = f a l s e ;
public synchronized void p r i n t N e w S t a t e ( ) {
while ( ! m o d i f i e d ) { w a i t ( ) } ;
modified = false ;
notify ();
System . out . p r i n t l n ( s t a t e ) ;
}
public synchronized void s e t V a l u e ( i n t v ) {
while ( m o d i f i e d ) { w a i t ( ) } ;
state = v ;
m o d i f i e d = true ;
notify ();
System . out . p r i n t l n ( ” v a l u e s e t ” ) ;
}
}
⇒ Es werden also alle suspendierten Objekte aufgeweckt und das richtige erkennt sich selber.
Dies ist der Java-Stil zur synchronisierten Kommunikation: Verwendung von wait() in Kombination
mit notifyAll() und einen bisschen Busy Waiting:
while(!Nachricht erhalten) { wait(); } und notifyAll();
Dies hat aber folgende Nachteile:
• sehr unkontrollierte Kommunikation
• Verschwendung von Systemressourcen (Busy Waiting)
• selbst bei passender wait-/notify()-Verwendung können von außen mit synchronisierten Anweisungen weitere wait()-/notify-Aufrufe hinzugefügt werden ⇒ falscher Thread wird aufgeweckt
13
3.7.2 Variante von wait() mit Timeout:
wait(long) oder wait(long, int), wobei long die Zeit in Millisekunden und int die Zeit in Nanosekunden darstellen. Nach Ablauf der Zeit fordert der Thread eigenständig wieder einen Lock an. Bei
notify() ist kein Zeitablauf möglich, wobei der Thread trotzdem aufgeweckt wird.
wait(0) = wait(0,0) = wait()
3.7.3 Fallstudie: einelementiger Puffer
In dieser Fallstudie wollen wir eine Kommunikationsabstraktion in Java entwickeln: einen einelementigen Puffer, welcher einen synchronisierten Zugriff auf einen Wert wie folgt bietet:
• Puffer kann leer oder voll sein
• ein Wert kann in einen leeren Puffer geschrieben werden (⇒ put())
• aus einem vollen Puffer kann ein Wert entfernt werden (⇒ take())
• take() suspendiert, falls der Puffer leer ist
• put() suspendiert falls der Puffer voll ist
Der einelementige Puffer kann auch als veränderbare Variable gesehen werden. Wir nennen ihn deshalb
MVar (mutable variable).
1
2
3
public c l a s s MVar <T> {
private T c o n t e n t = null ;
private boolean empty ;
4
public MVar ( ) {
empty = true ;
}
5
6
7
8
public MVar(T o ) {
empty = f a l s e ;
content = o ;
}
9
10
11
12
13
public synchronized T t a k e ( ) throws I n t e r r u p t e d E x c e p t i o n {
while ( empty ) { w a i t ( ) ; }
notifyAll ();
empty = true ;
return c o n t e n t ;
}
14
15
16
17
18
19
20
public synchronized void put (T o ) throws I n t e r r u p t e d E x c e p t i o n {
while ( ! empty ) { w a i t ( ) ; }
notifyAll ();
empty = f a l s e ;
content = o ;
}
21
22
23
24
25
26
27
}
Bemerkungen:
Zeile 1: das <T> ist eine Typvariable, d. h. eine Variable über Typen, die mit Objekttypen gefüllt
werden kann ⇒ wirkt als Polymorphie
Zeile 9: das Argument ist hier ein Objekt o vom Typ T
Zeile 16: hier werden die Schreiber aufgeweckt, da der Puffer jetzt leer ist
Zeile 23: hier werden die Leser aufgeweckt, da der Puffer jetzt wieder voll ist
Dieser Code ist unschön, da er ein wenig zielgerichtetes Erwecken der suspendierten Threads implementiert.
14
1. Verbesserung
Gruppierung der Threads in Gruppen, was ein Erwecken von jeweils nur relevanten Threads ermöglicht:
(i) take()-Threads
(ii) put()-Threads
Listing 14: Verwendung von Synchronisationsobjekten zur zielgerichteten Erweckung der Threads
1
2
3
public c l a s s MVar <T> {
private T c o n t e n t = null ;
private boolean empty ;
4
private Ob ject r = new Obj ect ( ) ;
w = new Obj ect ( ) ;
5
6
7
public MVar ( ) {
empty = true ;
}
8
9
10
11
public MVar(T o ) {
empty = f a l s e ;
content = o ;
}
12
13
14
15
16
public T t a k e ( ) throws I n t e r r u p t e d E x c e p t i o n {
synchronized ( r ) {
while ( empty ) { r . w a i t ( ) ; }
synchronized (w) {
empty = true ;
w. n o t i f y A l l ( ) ;
return c o n t e n t ;
}
}
}
17
18
19
20
21
22
23
24
25
26
27
public void put (T o ) throws I n t e r r u p t e d E x c e p t i o n {
synchronized (w) {
while ( ! empty ) { w . w a i t ( ) ; }
synchronized ( r ) {
empty = f a l s e ;
content = o ;
r . notifyAll ();
}
}
}
28
29
30
31
32
33
34
35
36
37
38
}
Bemerkungen:
Zeile 5/6: dies sind die beiden Synchronisationsobjekte
Zeie 21: r.wait() darf nur ausgeführt werden, wenn man einen Lock auf r hat
Zeile 23: hier befindet sich maximal 1 Thread und es gilt empty = false
Zeile 26/27 dies stellt einen kritischen Bereich dar, der über r und wait() abgesichert ist
Zeile 36: hier wird kein Lock gehalten
Zeile 38: hier befindet sich maximal 1 Thread und es gilt empty = true
Achtung: take() hält ein Lock auf r und wartet ggf. auf w und put() hält Lock auf w und wartet auf
r! (race condition) ⇒ Gefahr eines Deadlocks! Es ist aber kein Deadlock möglich, da an relevanten
Programmpunkten unterschiedliche Werte für empty vorliegen müssen ⇒ es können keine zwei Threads
gleichzeitig an diesem Punkt sein!
15
2. Verbesserung
Da wir jetzt schon spezielle Lockobjekte zur Unterscheidung der Leser und Schreiber eingeführt haben,
sollte es doch auch möglich sein gezielt nur einen Leser bzw. Schreiber zu erwecken, d. h. notify()
statt notifyAll() zu verwenden.
1
2
3
public c l a s s MVar <T> {
private T c o n t e n t = null ;
private boolean empty ;
4
private Ob ject r , w ;
5
6
7
8
9
10
public MVar ( ) {
empty = true ;
r = new Ob ject ( ) ;
w = new Ob ject ( ) ;
}
public MVar(T o ) {
empty = f a l s e ;
content = o ;
r = new Ob ject ( ) ;
w = new Ob ject ( ) ;
}
11
12
13
14
15
16
17
public T t a k e ( ) throws I n t e r r u p t e d E x c e p t i o n {
synchronized ( r ) {
while ( empty ) { r . w a i t ( ) ; }
synchronized (w) {
empty = true ;
w. n o t i f y ( ) ;
return c o n t e n t ;
}
}
}
18
19
20
21
22
23
24
25
26
27
28
public void put (T o ) throws I n t e r r u p t e d E x c e p t i o n {
synchronized (w) {
while ( ! empty ) { w . w a i t ( ) ; }
synchronized ( r ) {
empty = f a l s e ;
content = o ;
r . notify ();
}
}
}
29
30
31
32
33
34
35
36
37
38
39
}
Auf den ersten Blick scheint es auch möglich, dieses Programm weiter zu vereinfachen und anstelle
von while nur ein if zu verwenden. Dies ist aber nicht möglich, wenn man sich nochmal die genaue
Semantik des wait und notify Mechanismus anschaut. Bei notify wird ein schlafender Thread nicht
unmittelbar erweckt und weiterlaufen. Vielmehr bewirbt sich der erweckte Thread um den Lock,
welcher zunächst noch vom notify ausführenden Thread gehalten wird. Dieser kann ihm aber von
einem gerade neu initierten Leser (oder Schreiber) weggeschnappt werden, wodurch die Semantik der
MVar verletzt würde und ggf. zwei Threads einen Wert auslesen oder setzen würden.
Nach außen bieten MVars eine gute Abstraktionsstruktur, welche durchaus sinnvoll zur synchronisierten Threadkommunikation eingesetzt werden kannn. [⇒] Verwende in Zukunft besser MVars anstatt synchronized, wait() und notify(). Ähnliche Kommunikationsabstraktionen werden im Paket java.util.concurrent.atomic zur Verfügung gestellt. Allerdings Java-typisch mit sehr vielen Kontrollmöglichkeiten.
Als Verwendungsbeispiel für MVars können wir das Producer-/Consumer-Problem betrachten. Auf
den ersten Blick scheint ein einelementiger Puffer nicht besonders geeignet zu sein, da Produzenten
blockiert werden. Dieses Konzept ist in der Praxis aber oft gut geeignet, da auch “automatisch”
eine Lastbalancierung stattfindet. Ein Überangebot führt zu einer Sequentialisierung auf Seiten der
Producer, wodurch mehr Rechenzeit für die Consumer zur Verfügung steht (siehe Abbildung 1).
16
Abbildung 1: Verwendung von MVar’s beim Producer-/Consumer-Problem
Abbildung 2:
3.8 Beenden von Threadausführungen
Die Methode isAlive() liefert true, falls der Thread noch läuft (auch wenn er gerade suspendiert
ist). Beenden eines Threads durch:
a) normales Beenden der run-Methode (durch Terminieren des Codes)
b) Abbruch der run-Methode
c) Aufruf der destroy-Methode (deprecated)
d) Ende aller Nicht-Dämon-Threads
Bei a) und b) werden alle Locks freigegeben. Bei c) wird der Thread wirklich “abgeschossen”, ohne
dass Aufräumarbeiten stattfinden (destroy ist allerdings nicht immer implementiert und sollte nicht
mehr verwendet werden). Alternative Interrupts (s.u.). d) ist unproblematisch, da wir ja sowieso am
Programmende sind.
Unterbrechung von Threads
• Es stehen folgende Methoden der Klasse Thread zur Verfügung:
– public void interrupt() ⇒ sendet einen Interrupt an den Thread, was zu einer
InterruptedException in den Zuständen sleep und wait führt
– public boolean isInterrupted() ⇒ testet, ob der Thread ein Interrupt erhielt
– public static boolean interrupted() ⇒ testet den aktuellen Thread auf einen Interrupt und löscht ggf. den “Interrupt-Zustand” zum Beenden einer langen nebenläufigen
Berechnung (diese Methode kann ein Thread nur auf sich selbst anwenden)
• InterruptedExceptions können entweder mittels try ... catch ... finally aufgefangen
oder weitergeleitet werden (mittels throws InterruptedException)
⇒ in MVar: put / read / take throws InterruptedException
• siehe Abbildung 2
3.9 Warten auf Ergebnisse
Wenn eine Threadberechnung terminiert, kann sie Ergebnisse anderen Threads zur Verfügung stellen,
z. B. als Attribut oder über spezielle Selektor-Methoden. Um dies zu synchronisieren könnte man
isAlive() und busy waiting verwenden. D. h. Threads welche auf das Ergebnis eines anderen Threads
warten, überprüfen ob der Thread noch lebt und fahren fort, wenn dies nicht mehr der Fall ist. Dies
17
verschwendet aber Systemressourcen.
Alternative 1: Kommunikation und Synchronisation über MVars
Alternative 2: join-Methode der Thread-Klasse. Ein Thread, der die join-Methode eines anderen
Threads aufruft, suspendiert, bis der andere Thread terminiert.
Listing 15: Beispiel
1
2
3
4
5
c l a s s Calc extends Thread {
private i n t r e s u l t ;
public void run ( ) { r e s u l t = . . . ; }
public i n t g e t R e s u l t ( ) { return r e s u l t ; }
}
6
7
8
9
10
11
12
13
14
15
16
c l a s s showJoin {
void run ( ) {
Calc c a l c = new Calc ;
calc . start ();
try {
calc . join ();
System . out . p r i n t l n ( ” R e s u l t i s ”+ c a l c . g e t R e s u l t ( ) ) ;
} catch ( I n t e r r u p t e d E x c e p t i o n e ) { . . . }
}
}
Wie sleep() und wait() kann join() durch einen Interrupt unterbrochen werden. Die Semantik von
join() ist ähnlich:
while(calc.isAlive()) { wait(); } mit zugehörigem notifyAll(); bei Terminierung.
Variante mit Timeout
join(long Nanosekunden)
join(long Nanosekunden, int Millisekunden)
Der Grund für die Wiedererweckung (Terminierung oder Zeitablauf) ist nicht unterscheidbar. Durch
ein zusätzliches isAlive() kann dies herausgefunden werden.
Weitere Konzepte
Lese- und Schreiboperationen auf int und boolean sind atomar. Die Kombination von mehreren aber
nicht, genauso wie dies nicht für long und double gilt. Dies kann ggf. zu inkonsistenten Werten führen
(z. B. wenn die erste Hälfte geschrieben wurde)
Lösung: Synchronisieren oder Werte als volatile deklarieren ⇒ atomare Veränderungen
weiteres Problem: Compileroptimierung
Wir betrachten einen Thread, der den Code in Listing 16 ausführt. Wenn nun andere Threads den
Wert der Variablen currentValue verändern, so sollte sich auch die Ausgabe des Threads ändern.
Listing 16: Beispiel
1
2
3
4
5
currentValue = 5;
while ( true ) {
System . out . p r i n t l n ( c u r r e n t V a l u e ) ;
Thread . s l e e p ( 1 0 0 0 ) ;
}
Dies ist aber nicht unbedingt der Fall und es ist möglich, dass weiterhin nur 5 und nicht der neu
gesetzte Wert ausgegeben wird.
Dies liegt an einer Compileroptimierung: Konstantenpropagation. Dadurch wird der Wert der Variablen currentValue vom Compiler direkt in das System.out.println() hinein geschrieben, da der
Wert ja immer gleich ist. Als Lösung sollte die Variable currentValue als volatile deklariert werden,
was eine Compileroptimierung verhindert.
18
3.10 ThreadGroups
Threads können in Gruppen zusammengefasst werden, was die Strukturierung von Threadstrukturen
ermöglicht. Es handelt sich um eine hierarchische Struktur, was bedeutet, dass ThreadGroups sowohl
Threads als auch ThreadGroups enthalten können.
Prinzipiell gehört jeder Thread zu einer ThreadGroup. Standardmäßig ist dies die gleiche ThreadGroup
wie die des Elternthreads. Die ThreadGroup kann verändert werden mittels
public Thread(ThreadGroup g [, String name]), wobei der Name optional ist.
Nun ThreadGroups können mittels public ThreadGroup(String name) als Untergruppe zur aktuellen Threadgruppe generiert werden. ThreadGroups bieten folgende Methoden:
• getName()
• getParent()
• setDaemon(boolean daemon)
• setMaxPriority(int maxPri)
• int activeCount() ⇒ gibt die Anzahl der Threads aus; die Gruppe bekommt man mittels
isAlive()
• int enumerate(Thread[] threadsInGroup, boolean recursive), wobei das int die
Anzahl der Threads, Thread[] die Threads der Gruppe und boolean = true ist (auch bei
Untergruppen)
Die Semantik ist hier etwas anders als bei Threads: ThreadGroups werden gelöscht, wenn sie leer sind.
Außerdem gibt es SecurityHandler auf Gruppen (z. B. Wer darf Thread in ThreadGroups erzeugen?“)
”
ThreadPools
Fast das gleiche Konzept wie ThreadGroups gibt es im Paket java.util.concurrent mit den ThreadPool noch einmal. Hierbei handelt es sich allerdings um eine Abstraktion zur Verwaltung von laufenden Threads. Sie ermöglichen eine Einsparung von System-Ressourcen durch Wiederverwendung von
Threads.
Executer Interface
Das Executer Interface stellt Möglichkeiten zum Ausführen von Threads im Rahmen von ThreadPools
zur Verfügung:
• void execute(Runnable o) ⇒ nimmt einen Task (das Runnable o / nicht Thread!) in einen
ThreadPool auf; also e.execute(r) mit e als Executer-Objekt und r als Runnable-Objekt statt
new(Thread(r)).start()
• die Klasse Executer stellt statische Methoden zur Generierug von Executoren zur Verfügung:
static ExecuterService newFixedThreadPool(int n) startet den ThreadPool mit n
Threads und stellt eine Verfeinerung von Executer dar.
• Ein neuer Task (mit execute hinzugefügt) wird von einem dieser Threads ausgeführt, falls ein
Thread verfügbar ist. Ansonsten wird er in eine Warteschlange eingefügt und gestartet, falls ein
anderer Task beendet wird ⇒ kein Overhead durch Threadstart. Die Threads werden erst beim
Shutdown beendet.
• Es sind alternative Methoden verfügbar (z. B. zum bedarfsgesteuerten Starten von Threads [mit
Obergrenze] oder dynamischen ThreadPools)
Debugging
Es gibt nur eingeschränkte Infomationen, die für ein printf-Debugging genutzt werden können:
• die Klasse Thread stellt einige Methoden zum Debuggen zur Verfügung:
– public String toString() ⇒ liefert Stringrepräsentation des Thread inklusive Name,
Priorität und ThreadGroup
19
Abbildung 3:
– public static void dumpStack() ⇒ liefert einen Stacktrace des aktuellen Threads auf
System.out
• für ThreadGroups gibt es:
– public String toString()
– public void list() ⇒ druckt rekursiv die toString()-Ergebnisse aller Threads/
ThreadGroups aus
Beurteilung von Java Threads
• Kommunikation über geteilten Speicher (Objekte)
• gute Kapselung des wechselseitigen Ausschlusses in Obkjekten mittels synchronized
• Synchronisation der Kommunikation mittels wait(), notify() und notifyAll(), was allerdings
sehr ungerichtet ist
• meistens Verwendung von notifyAll() und busy waiting
• viele zusätzliche Konzepte: Prioritäten, Dämonen, sleep()/yield(), Server-Side- und ClientSide-Synchronisation mittels synchronized-Ausdrücken, Interrupts, isAlive(), join(), ThreadGroups, ThreadPools, Synchronisation auf Klassenebene, . . .
• Vererbung und synchronized-Methoden passen nicht gut zusammen
• Kommunikationsabstraktion (MVar, Chan oder entsprechende Konstrukte in
java.util.concurrent) ersetzen inzwischen viele andere Konzepte und ermöglichen eine Programmierung auf höherem Niveau (Message Passing). Auch hier gibt es aber wieder viele Konfigurationsmöglichkeiten (z. B. timeouts).
Unbeschränkter Puffer (Chan)
Ein Chan dient zur Kommunikation zwischen Schreibern und Lesern, so dass die Schreiber nie suspendieren.
Implementierung mittels verketteter Liste
Leser müssen ggf. suspendieren, falls der Chan leer ist. Leser und Schreiber müssen synchronisiert
werden. Zur Implementierung (der Zeiger) wollen wir MVars verwenden (siehe Abbildung 3).
1
2
c l a s s Chan<T> {
private MVar<MVar<ChanElem<T>>> read , w r i t e ;
3
4
private c l a s s ChanElem<T> {
5
6
7
private T v a l u e ;
private MVar<ChanElem<T>> next ;
8
9
10
11
12
public ChanElem (T v , MVar<ChanElem<T>> n ) {
value = v ;
next = n ;
}
20
Abbildung 4:
13
public T v a l u e ( ) {
return v a l u e ;
}
14
15
16
17
public MVar<ChanElem<T>> next ( ) {
return next ;
}
18
19
20
}
21
22
public Chan ( ) throws I n t e r r u p t e d E x c e p t i o n {
MVar<ChanElem<T>> h o l e = new MVar<ChanElem<T> >();
r e a d = new MVar<MVar<ChanElem<T>>>(h o l e ) ;
w r i t e = new MVar<MVar<ChanElem<T>>>(h o l e ) ;
}
23
24
25
26
27
28
public T r e a d ( ) throws I n t e r r u p t e d E x c e p t i o n {
MVar<ChanElem<T>> rEnd = r e a d . t a k e ( ) ;
ChanElem<T> item = rEnd . r e a d ( ) ;
r e a d . put ( item . next ( ) ) ;
return item . v a l u e ( ) ;
}
29
30
31
32
33
34
35
public void w r i t e (T o ) {
MVar<ChanElem<T>> newHole = new MVar<ChanElem<T>>(),
oldHole = write . take ( ) ;
o l d H o l e . put (new ChanElem<T>(o , newHole ) ) ;
w r i t e . put ( newHole ) ;
}
36
37
38
39
40
41
42
}
Bemerkungen:
Zeile 26: hier findet eine Synchronisation der Leser statt, da durch read die MVar kurzzeitig leer
wird, was dann andere Leser suspendiert
Zeile 27: liest die erste MVar s und liefert das erste Element (v1 )
Zeile 28: hier wird die Blockade der anderen Leser wieder aufgehoben
Zeile 34: hier werden die anderen Schreiber suspendiert
Siehe Abbildung 4
Beachte: Die Implementierung ermöglicht gleichzeitiges Lesen und Schreiben des nicht-leeren Chans!
Als Erweiterung können wir zum Chan noch eine Methode isEmpty hinzufügen, welche den aktuellen
Zustand des Chans analysiert. Sicherlich muss bei der Verwendung einer solchen Methode bedacht
werden, dass erhaltene Information nur eine sehr begrenzte Gültigkeit hat. Man kann sich aber dennoch
Fälle vorstellen, in denen solch eine Methode nützlich sein kann.
1
2
3
4
public boolean isEmpty ( ) {
MVar<ChanElem<T>> rEnd = r e a d . t a k e ( ) ;
MVar<ChanElem<T>> wEnd = w r i t e . t a k e ( ) ;
w r i t e . put (wEnd ) ;
21
r e a d . put ( rEnd ) ;
return ( rEnd==wEnd ) ;
5
6
7
}
In der Übung wird die MVar-Implementierung um eine read-Methode, welche die Mvar nicht leert
erweitert. Mit dieser Methode ist eine Vereinfachung dieser Implementierung möglich.
Beachte aber, dass diese Implementierung nicht in allen Situationen das erwartete Verhalten hat.
Durch die Synchronisierung mit anderen lesenden Threads, kann es dazu kommen, dass isEmpty
blockiert, nämlich genau dann, wenn ein anderer Thread bereits von einem leeren Chan lesen will und
hierbei natürlich suspendiert.
Lösungsideen für dieses Problem werden in den Übungen besprochen. Eine weitere Lösung werden wir
bei abstrakteren Konzepten später in dieser Vorlesung kennen lernen.
Noch ein Konzept: Pipes (oder ein alter Schritt in Richtung Message Passing)
Pipes werden meist für Datei- oder Netzwerkkommunikation verwendet. Mit ihnen ist aber auch eine
Kommunikation zwischen Threads möglich. Eine Pipe besteht aus zwei Strömen:
(i) PipedInputStream
(ii) PipedOutputStream
, welche bei der Konstruktion durch:
PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);
oder mittels in.connect(out) verbunden werden können.
Pipes haben in der Regel eine beschränkte Größe. Threads, die aus einer leeren Pipe lesen oder in eine
volle Pipe schreiben wollen, werden suspendiert. Das Schreiben erfolgt mittels:
• write(int b)
• write(byte[], int off, int len) ⇒ schreibt b[of f ] . . . b[of f + len]
• flush() ⇒ setzt die gepufferte write-Methode um
• close() ⇒ schließt das Schreibende
das Lesen mittels:
• int read() ⇒ liefert -1, falls die Pipe geschlossen ist
• int read(byte[], int off, int len) ⇒ suspendiert bei einer unzureichenden Zahl an Bytes
auf der Pipe
• int available() ⇒ liefert die Anzahl der Bytes in der Pipe
• close()
Andere Daten als Bytes können mittels DataOutputStream oder PipeWriter übertragen werden. Ein
Vergleich mit Chan und MVar findet in der Übung statt.
4 Nebenläufige Programmierung in Haskell
Auch Haskell bietet in Form der Bibliothek Concurrent Haskell die Möglichkeit nebenläufig zu
programmieren. Hierbei stellt eine MVar, wie wir sie in Java implementiert haben die grundlegende
Datenstruktur dar.
4.1 Concurrent Haskell
Erweiterung von Haskell 98 um Konzepte zur nebenläufigen Programmierung.
Modul: Control.Concurrent. Zur Generierung neuer Threads steht die Funktion
forkIO :: IO () -> IO ThreadId
22
zur Verfügung. Das Argument vom Typ IO() stellt den abzuspaltenden Prozess dar. Die Aktion ist
sofort beendet. Die ThreadId identifiziert den neu abgespalteten Thread. Ein Thread kann seine eigene
ThreadId mit folgenden Funktion abfragen:
myThreadId :: IO ThreadId
Threads werden bei Terminierung beendet oder mittels killThread::ThreadId->IO() abgeschos”
sen“. Im Gegensatz zu Erlang werden die ThreadIds aber nicht zur Kommunikation verwendet.
Listing 17: Beispiel
1
2
3
4
5
6
7
main : : IO ( )
main = do
id <− f o r k I O ( l o o p 0 )
s t r <− getLine
i f s t r == ” s t o p ”
then k i l l T h r e a d id
e l s e return ( )
8
9
10
11
12
l o o p : : Int−>IO ( )
l o o p n = do
print n
l o o p ( n+1)
Das Beispiel ist nicht ganz so schön, da das gesamte Programm ohnehin terminiert, falls der Hauptthread terminiert.
4.2 Kommunikation
Haskell bietet Kommunikationsabstraktionen namens MVar und Chan, wie wir sie auch schon in Java
kennengelernt (und dort auch implementiert) haben.
newEmptyMVar :: IO(MVar a) kreiert eine neue leere MVar
newMVar :: a -> IO(MVar a) kreiert neue gefüllte MVar.
takeMVar :: MVar a -> IO a
putMVar :: Mvar a - > a -> IO()
readMVar :: MVar a -> IO a
tryPutMVar :: Mvar a -> a -> IO Bool
...
Dinierende Philosophen
Listing 18: Dinierende Philosophen
1
2
3
4
5
6
7
8
p h i l : : MVar ( ) −> MVar ( ) −> IO ( )
p h i l l r = do
takeMVar l
takeMVar r
−− e a t
putMVar l ( )
putMVar r ( )
phil l r
9
10
11
12
13
14
main : : IO ( )
main = do
s t i c k s <− c r e a t e S t i c k s 5
startPhils sticks
p h i l ( l a s t s t i c k s ) ( head s t i c k s )
15
16
17
18
19
20
c r e a t e S t i c k s : : I n t −> IO ( [ MVar ( ) ] )
c r e a t e S t i c k s 0 = return [ ]
c r e a t e S t i c k s n = do
s t i c k s <− c r e a t e S t i c k s ( n−1)
s t i c k <− newMVar ( )
23
21
return ( s t i c k : s t i c k s )
22
23
24
25
26
27
s t a r t P h i l s : : [ MVar ( ) ] −> IO ( )
s t a r t P h i l s [ ] = return ( )
s t a r t P h i l s ( l : r : s t i c k s ) = do
forkIO ( p h i l l r )
startPhils ( r : sticks )
--eat ist ein Kommentar
[_] matched eine ein-elementige Liste
4.3 Kanäle in Haskell
Da die MVar als grundlegende Datenstruktur auch in Haskell vorhanden ist, ist eine Übertragung der
Chan-Implementierung recht einfach möglich:
1
module Chan where
2
3
import C o n t r o l . Concurrent . MVar
4
5
6
data Chan a = Chan (MVar ( Stream a ) ) −− r e a d end
(MVar ( Stream a ) ) −− w r i t e end
7
8
ty pe Stream a = MVar ( ChanElem a )
9
10
data ChanElem a = ChanElem a ( Stream a )
11
12
13
14
15
16
17
newChan : : IO ( Chan a )
newChan = do
h o l e <− newEmptyMVar
r e a d <− newMVar h o l e
w r i t e <− newMVar h o l e
return ( Chan r e a d w r i t e )
18
19
20
21
22
23
24
writeChan
writeChan
newHole
oldHole
putMVar
putMVar
: : Chan a −> a −> IO ( )
( Chan
w r i t e ) v = do
<−newEmptyMVar
<− takeMVar w r i t e
o l d H o l e ( ChanElem v newHole )
w r i t e newHole
25
26
27
28
29
30
31
readChan : : Chan a −> IO a
readChan ( Chan r e a d ) = do
rEnd <− takeMVar r e a d
( ChanElem v newReadEnd ) <− readMVar rEnd
putMVar r e a d newReadEnd
return v
32
33
34
35
36
37
38
39
isEmptyChan : : Chan a −> IO Bool
isEmptyChan ( Chan r e a d w r i t e ) = do
rEnd <− takeMVar r e a d
wEnd <− takeMVar w r i t e
putMVar w r i t e wEnd
putMVar r e a d rEnd
return ( rEnd==wEnd)
Die Verwendung von Kanälen ermöglicht ein anderes Programmierparadigma, das so genannte Message Passing. Message Passing wird z. B. in Erlang und Scala zur nebenläufigen und auch verteilten
Programmierung eingesetzt. Im folgenden wollen wir uns zunächst intensiver mit Erlang auseinandersetzen.
24
5 Erlang
Erlang wurde von der Firma Ericsson entwickelt. Es ist eine funktionale Programmiersprache mit
speziellen Erweiterungen zur nebenläufigen und verteilten Programmierung. Sie wurde gezielt zur
Entwicklung von Telekommunikationsanwendungen eintwickelt, wird inzwischen aber auch in anderen
Bereichen zur Entwicklung insbesondere verteilter Anwendungen eingesetzt.
5.1 Sequentielle Programmierung in Erlang
Erlang ist eine funktionale Programmiersprache mit strikter Auswertung, d. h. es werden zuerst die
Argumente ausgewertet und erst anschließend die Funktion aufgerufen, wie in imperativen Sprachen
auch. Erlang ist ungetypt/(dynamisch) getypt ⇒ Laufzeittypfehler für unpassende Basistypen (z. B.
true + 7 ⇒ Widerspruch).
Erlang ist verfügbar unter www.erlang.org und auf den SUN’s unter /home/erlang/bin/erl installiert.
Listing 19: Fakultätsfunktion (factorial) als Beispiel
1
2
f a c ( 0 ) −> 1 ;
f a c (N) when N>0 −> N ∗ f a c (N−1).
Erklärungen:
• “;” trennt die Regeln der Funktions-Definition
• Variablen werden groß geschrieben
• der guard when N>0 gehört zur Auswahl der Regel mit hinzu
• jede Funktionsdefinition endet mit einem Punkt.
Statt let-Ausdrücken bind-once-Variablen:
1
2
3
f a c (N) when N>0 −> N1 = N−1,
F1 = f a c (N1 ) ,
N ∗ F1 .
Erklärungen:
• “,” als Trennzeichen der Sequenzen
• die letzte Zeile ist das Ergebnis der Berechnung, welches automatisch ausgegeben wird
Funktionen werden nach ihrer Stelligkeit unterschieden:
fac(N,M) -> ... ist eine andere Funktion als fac/1, was für eine einstellige Funktion wie z. B. fac(N)
steht. jedes Erlang-Programm benötigt einen Modulkopf:
• -modul(math). ⇒ das Modul befindet sich in der Datei math.erl
• -export([fac/1]). ⇒ erlaubt den Zugriff von außen
Ablauf:
> erl
...
> c(math).
> math:fac(6).
720
Erklärungen:
• c(math). compiliert das Modul math in der Datei math.erl.
Alternativ kann in der Shell mittels erlc math.erl kompiliert werden.
• halt(). oder Crtl + C“und dann a führt zum Verlassen der Erlang-Umgebung.
”
Datenstrukturen/Werte:
• Atome: a, b, c, 0, 1 ,true, false, ..., ’Hallo’, ’willi@mickey’
⇒ beginnen mit Kleinbuchstaben oder stehen in Hochkommata.
25
• Tupel: { t1 , . . . , tn } mit beliebigen Werten ti , z. B. Bäume:
{node,{node,empty,1,empty},2,empty}
• Listen: [e|l], wobei e das Kopfelement der Liste ist. l ist eine Restliste und [] steht für die
leere Liste. Als Abkürzung können wir auch [1,2,3] statt [1|[2|[3|[]]]] schreiben.
Beispiel: Konkatenation zweier Listen
app([], Ys) -> Ys;
app([X|Xs], Ys) -> [X|app(Xs, Ys)].
Patern Matching:
• Pat = e, wobei Pat eine der folgenden Formen hat:
– Variable
– Atom
– Tupel {P at1 , . . . , P atn }
– [ P at1 |P ate ]
– []
– [ P at1 , . . . , P atn ]
Das Matching eines Pattern bindet ungebundene Variablen an Teilstrukturen der Werte; z.B:
– {X,[1|Ys]} = {42,[1,2,3]}
das X gebunden wird
[X/42, Ys/[2,3]], wobei X/42 bedeutet, dass die 42 an
– {X,{1,Y},Y} = {3,{1,[]},42}
müsste
– {X,{1,Y},Y} = {3,{1,42},42}
macht nicht, da Y an [] und 42 gebunden werden
[X/3, Y/42]
Beachte: Die Substitution wird auf alle vorkommenden Variablen angewendet, auch in Pattern.
Ein “Umbinden” ist nicht mehr möglich!
Beispiel:
{X,[Y|Ys]} = {42,[1,2]}
[X|Xs] = [3,4]
wobei die zweite Zeile zum Fehler führt, da X bereits an 42 gebunden ist.
5.2 Nebenläufige Programmierung
Erlang-Prozesse sind light-weight, so dass davon gleichzeitig sehr viele existieren können.
Ein Erlang-Prozess besteht aus folgenden drei Bestandteilen:
• Programmterm t, der “ausgewertet” wird
• Prozess-Identifier pid
• Mailbox (Message Queue), die Nachrichten aufnimmt
Starten von Erlang-Prozessen:
• spawn(module, func,[a1 , . . . , an ]) ⇒ startet einen nebenläufigen Prozess, dessen Berechnung
mit module:func(a1 , . . . , an ) startet
• spawn terminiert sofort und liefert den pid des neuen Prozesses zurück
• Beachte: func muss in module exportiert werden (auch wenn spawn im selben Modul ausgerufen
wird)
Alternativ kann man spawn auch eine parameterlose Funktion übergeben, welche dann als separater
Prozess gestartet wird: spawn(fun () -> module:fun(a1 ,. . . ,an ) end). Der Funktionsrumpf muss
hierbei natürlich keine definierte Funktion sein. Es kann ein beliebiger Erlang-Ausdruck sein.
Seinen eigenen pid erhält ein Prozess durch Aufruf von self().
Senden von Nachrichten:
• pid!v ⇒ sendet den Wert v an den Prozess pid. Der Wert wird hinten in die Mailbox von pid
eingetragen.
• pids sind Werte wie Atome oder Listen und können somit auch in Datenstrukturen gespeichert
26
oder auch verschickt werden
• es ist gewährleistet, dass Nachrichten nicht verloren gehen
Empfangen von Nachrichten: receive-Ausdruck
1
2
3
4
5
6
receive
Pat1 −> e1 ;
...
Patn −> e n
[ a f t e r t −> e ]
end
• die Pattern werden der Reihe nach gegen die Werte der eigenen Mailbox gematcht (exakte
Reihenfolge siehe Übung)
• erstes passendes Pattern P ati wird gewählt (→ Substitution σ). Das gesamte receive-Statement
wird durch σ(ei ) ersetzt. Die Berechnung macht mit ei weiter.
• bei after t -> e findet ein Timeout nach t Millisekunden statt und es geht mit e weiter
Spezialfall: t = 0 ⇒ die Pats werden nur genau einmal gegen alle Mailbox-Einträge gematcht
5.3 Ein einfacher Key-Value-Store
Listing 20: Server des Key-Value-Stores
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
−module ( dataBase ) .
−e x p o r t ( s t a r t / 0 ) .
s t a r t ( ) −> dataBase ( [ ] ) .
dataBase (KVs) −>
receive
{ a l l o c a t e , Key , P} −>
case lo oku p ( Key , KVs) o f
n o t h i n g −> P ! f r e e ,
r e c e i v e { v a l u e , V, P} −> dataBase ( [ { Key ,V} | KVs ] )
end ;
{ j u s t ,V} −> P ! a l l o c a t e d ,
dataBase (KVs)
end ;
{ lookup , Key , P} −> P ! lo oku p ( Key , KVs ) ,
dataBase (KVs)
end .
17
18
19
20
l o o k u p (K , [ ] ) −> n o t h i n g ;
l o o k u p (K, [ { K,V} | ] ) −> { j u s t ,V} ;
l o o k u p (K, [ | KVs ] ) −> lo oku p (K, KVs ) ;
Listing 21: Beispiel-Client zum Key-Value-Stores
1
2
3
−module ( c l i e n t ) .
−e x p o r t ( [ s t a r t / 0 ] ) .
−import ( base , [ g e t L i n e / 1 , p r i n t L n / 1 ] )
4
5
6
7
s t a r t ( ) −>
DB = spawn ( database , s t a r t , [ ] ) ,
c l i e n t (DB) .
8
9
10
11
12
13
14
15
16
17
c l i e n t (DB) −>
I n p u t = g e t L i n e ( ” ( l ) ookup / ( i ) n s e r t > ” ) ,
case I nput o f
” i ” −> K = g e t L i n e ( ” key > ” ) ,
DB! { a l l o c a t e , K, s e l f ( ) } ,
receive
f r e e −>
V = getLine ( ” value > ” ) ,
DB! { v a l u e , V, s e l f ( ) } ;
27
18
19
20
21
22
23
24
25
26
27
28
29
a l l o c a t e d −>
p r i n t L n ( ” key a l l o c a t e d ” )
end ;
” l ” −> K = g e t L i n e ( ” key > ” ) ,
DB! { lookup , K, s e l f ( ) } ,
receive
n o t h i n g −> p r i n t L n ( ”Key not a l l o c a t e d ” ) ;
{ j u s t , V} −> p r i n t L n (V)
end ;
X −> p r i n t L n (X)
end ,
c l i e n t (DB) .
Bemerkungen:
Zeile 3: base ist ein Modul mit IO-Funktionen (wird auf der Web-Site bereit gestellt)
Zeile 6: DB enthält den zugehörigen Prozess-Identifier
Zeile 27: dieser Fall fängt alle “sonst”-Fälle ab. Hier kann auch _ -> 42 stehen.
Beachte: Es können auch mehrere Clienten mit einem Server kommunizieren. Dies entspricht einem
Nameserver, kann aber auch als geteilter Speicher gesehen werden.
Dieser Key-Value-Store stellt zum Einen eine einfache erste Anwendung zur nebenläufigen Programmierung in Erlang dar. Er zeigt aber auch, dass man mit Hilfe eines Server-Prozesses und Message
Passing, das Modell eines geteilten Speichers simulieren kann. Der Schlüssel entspricht hierbei der
Speicheradresse und der Wert dem abgelegten wert. Im nächsten Schritt sollten wir also untersuchen,
ob auch ähnliche Synchronisationsmechanismen, wie sie bei geteiltem Speicher verwendet werden,
simuliert werden können.
5.4 Wie können Prozesse in Erlang synchronisiert werden?
Als Beispiel: Implementierung der dinierenden Philosophen
Idee: Repräsentiere den Zustand eines Sticks innerhalb eines speziellen Stick-Prozesses. Die Funktionen
zum Nehmen und Hinlegen eines Essstäbchens können dann wie folgt definiert werden:
Erlang-Prozesse sind seiteneffektsfrei, bis auf das Senden von Nachrichten!
Listing 22: Essstäbchen
1
2
3
4
5
6
stickDown ( ) −>
receive
{ take , P} −> P ! took ,
stickUp ( ) .
−> stickDown ( )
end .
7
8
9
10
11
s t i c k U p ( ) −>
receive
put −> stickDown ( )
end .
12
13
14
15
16
t a k e ( St ) −> St ! { take , s e l f ( ) } ,
receive
took−> ok
end .
17
18
put ( St ) −> St ! put .
Hierbei ist zu beachten, dass ein entsprechender Stabprozess zwischen den Zuständen stickUp()
und stickDown() hin- und herwechselt. Die take-Nachricht wird aber nur im stickDown()-Zustand
verarbeitet. Bei stickUp() verbleibt sie in der Mailbox. Außerdem wartet der Prozess, der take/2
ausführt im receive, bis eine entsprechende Bestätigungsnachricht (took) verschickt wurde.
Nun ist es einfach, die Philosophen zu realisieren:
28
Listing 23: Dinierende Philosophen
1
2
3
4
5
−module ( d i n i n g P h i l s ) .
−e x p o r t ( [ s t a r t / 1 , stickDown ( ) / 0 , p h i l / 3 ] ) .
s t a r t (N) when N >= 2 −>
S t i c k s = c r e a t e S t i c k s (N) ,
startPhils ([ l i s t s : last ( Sticks )| Sticks ] , 0).
6
7
c r e a t e S t i c k s ( 0 ) −> [ ] ;
8
9
10
11
12
c r e a t e S t i c k s (N) −>
S t i c k = spawn ( d i n i n g P h i l s , stickDown , [ ] ) ,
S t i c k s = c r e a t e S t i c k s (N−1) ,
[ Stick | Sticks ] .
13
14
s t a r t P h i l s ( [ ] , N) −> ok ;
15
16
17
18
s t a r t P h i l s ( [ SL , SR | S t i c k s ] , N) −>
spawn ( d i n i n g P h i l s , p h i l , [ SL , SR , N] ) ,
s t a r t P h i l s ( [ SR | S t i c k s ] , N+1).
19
20
21
22
23
24
25
26
27
28
29
30
p h i l ( SL , SR , N) −>
b a s e : p r i n t (N) ,
base : printLn ( ” i s thinking ” ) ,
t i m e r : s l e p (10+N) ,
// #4
t a k e ( SL ) ,
t a k e (SR ) ,
b a s e : p r i n t (N) ,
base : printLn ( ” i s e a t i n g ” ) ,
put ( SL ) ,
put (SR ) ,
p h i l ( SL , SR , N ) .
Bemerkungen:
Zeile 2: stickDown()/0 und phil/3 müssen exportiert werden, damit diese gestartet werden können
Zeile 23: die Zeile ist zur direkten “Deadlockvermeidung” notwendig
An diesem Beispiel erkennt man gut das Prinzip der Synchronisation in Erlang. Als weiteres Beispiel
betrachten wir noch die Implementierung einer MVar in Erlang.
5.5 MVar
Als weiteres Beispiel wollen wir eine MVar in Erlang implementieren. Hierbei können wir genau wie
bei der Stick-Implementierung vorgehen. Wir müssen nur zusätzlich einen Wert als Zustand des MVarProzesses vorsehen.
Listing 24: MVar in Erlang
1
−module (mVar ) .
2
3
−e x p o r t ( [ new/ 0 ,new/ 1 ,mVar/ 1 , t a k e / 1 , put / 2 ] ) .
4
5
new( ) −> spawn (mVar , mVar , [ empty ] ) .
6
7
new(V)−> spawn (mVar , mVar , [ { f u l l ,V } ] ) .
8
9
10
11
12
13
14
mVar( empty ) −> r e c e i v e
{ put , V, P} −> P ! put , mVar( { f u l l ,V} )
end ;
mVar( { f u l l ,V} ) −> r e c e i v e
{ take , P} −> P ! { took ,V} , mVar( empty )
end .
15
16
17
t a k e (MVar) −>
MVar ! { take , s e l f ( ) } ,
29
18
19
20
receive
{ took ,V} −> V
end .
21
22
23
24
25
26
put (MVar ,V) −>
MVar ! { put , V, s e l f ( ) } ,
receive
put −> ok
end .
Im Gegensatz zur Implementierung der Stäbchen, verwenden wir hier keine zwei sich abwechselnd
aufrufenden Funktionen, sondern unterschieden die Zustände durch zwei unterschiedliche Argumente
(empty und {full,...}).
5.6 Nebenläufige Programmierung in Erlang
Wir haben nun gesehen, wie man geteilten Speicher und lockbasierte Synchronisation in Erlang simulieren kann. In der Praxis greift man allerdings selten auf diesen Ansatz zurück. Vielmehr verwendet
man ausgezeichnete Server-Prozesse, welche die Anfragen an ein Objekt sequentialisieren und hierdurch den geteilten Zustand schützen.
5.7 Verteilte Programmierung in Erlang
Nebenläufige Prozesse haben keinen gemeinsamen Speicher und können deshalb problemlos auf mehreren Rechnern verteilt werden. Nachrichten werden automatisch in TCP-Nachrichten umgewandelt.
(siehe Abbildung 5). Es ist auch möglich, dass mehrere Knoten auf einem Rechner laufen.
Starten eines Erlang-Knotens: erl -name willi ⇒ da ansonsten keine veteilte Kommunikation
möglich ist, da der Prozess nicht bekannt ist
Alternative: erl -sname willi ⇒ analog, funktioniert aber nur im gleichen Subnetz
Starten von Prozessen auf anderen Knoten:
• Voraussetzung: Erlang-Shell muss auf dem anderen Knoten/Rechner laufen
• z. B. DB = spawn(’[email protected]’, database, start,[]) in obigem Beispiel
• allgemein: spawn(Knoten, module, func, args) wobei das funktionale Ergebnis wieder eine PID ist, welche jetzt auch IP-Adresse und Knotennamen mitkodiert. Dies eignet sich zum
Auslagern von Berechnungen (z. B. zur Lastverteilung).
5.8 Verbinden unabhängiger Erlang-Prozesse
In offenen Systemen (z. B. Telefon, Chat) ist es nötig, Verbindungen zur Laufzeit herzustellen. Dazu
ist das globale Registrieren eines Prozesses auf einem Knoten notwendig:
register(name, pid).
Senden an restliche Prozesse: {name, knoten}! msg, was meistens nur für den “Erstkontakt” verwendet wird. Danach findet ein Austausch der PID’s statt (z. B. Database-Server läuft auf Knoten
“servermickey” ⇒ erl -sname server auf mickey).
5.8.1 Veränderungen des Clients
Listing 25: Veränderung des Clients
1
2
3
4
5
s t a r t −>
{ dbServer , ’ s e rv e r @m i c ke y ’ } ! { connect , s e l f ( ) } ,
receive
{ c onnected , DB} −> c l i e n t (DB)
end .
30
Abbildung 5:
5.8.2 Veränderungen des Servers
Listing 26: Veränderung des Servers
1
2
3
s t a r t ( ) −>
r e g i s t e r ( dbServer , s e l f ( ) ) ,
database ( [ ] ) .
4
5
6
7
8
9
10
11
d a t a b a s e (L) −>
receive
{ a l l o c a t e , . . . } −> . . . ;
{ lookup , . . . } −> . . . ;
{ connect , P} −> P ! { connected , s e l f ( ) } ,
d a t a b a s e (L)
end .
Es sind also nur wenige Veränderungen beim Übergang von nebenläufiger zu verteileter Programmierung notwendig.
Als weiteres Beispiel wollen wir einen verteilten Chat implementieren, der aus einem (registrierten)
Server und beliebig vielen passenden Clients besteht.
Listing 27: Veränderung des Clients
1
2
−module ( c h a t S e r v e r ) .
−e x p o r t ( [ s t a r t / 0 ] ) .
3
4
5
s t a r t ( ) −> r e g i s t e r ( chat , s e l f ( ) ) ,
run ( [ ] ) .
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
run ( C l i e n t s ) −>
receive
{ connect , Pid , Name} −>
case l oo k up 2 (Name , C l i e n t s ) o f
n o t h i n g −>
Pid ! { connected , names ( C l i e n t s ) , s e l f ( ) } ,
b r o a d c a s t ( C l i e n t s , { l o g i n , Name } ) ,
run ( [ { Pid , Name } | C l i e n t s ] ) ;
{ j u s t , } −>
Pid ! nameOccupied ,
run ( C l i e n t s )
end ;
{ message , Nonsens , Pid } −>
case lo oku p ( Pid , C l i e n t s ) o f
{ j u s t , Name} −> b r o a d c a s t ( C l i e n t s , { msg , Name , Nonsens } ) ,
run ( C l i e n t s ) ;
n o t h i n g −> run ( C l i e n t s )
end ;
{ l o g o u t , Pid } −>
31
case lo oku p ( Pid , C l i e n t s ) o f
{ j u s t , Name} −> R e m a i n i n g C l i e n t s = remove ( Pid , C l i e n t s ) ,
b r o a d c a s t ( R e m a i n i n g C l i e n t s , { l o g o u t , Name } ) ,
run ( R e m a i n i n g C l i e n t s ) ;
n o t h i n g −> run ( C l i e n t s )
end
26
27
28
29
30
31
32
end .
33
34
35
36
names ( [ ] ) −> [ ] ;
names ( [ { , Name } | C l i e n t s ] ) −> [ Name | names ( C l i e n t s ) ] .
%names ( C l i e n t s ) −> l i s t s : map( fun ( { ,X})−>X end , C l i e n t s ) .
37
38
39
40
b r o a d c a s t ( [ ] , ) −> ok ;
b r o a d c a s t ( [ { Pid , } | C l i e n t s ] , Msg ) −> Pid ! Msg ,
b r o a d c a s t ( C l i e n t s , Msg ) .
41
42
43
44
l o o k u p ( , [ ] ) −> n o t h i n g ;
l o o k u p (K, [ { K,V} | ] ) −> { j u s t ,V} ;
l o o k u p (K, [ | KVs ] ) −> lo oku p (K, KVs ) .
45
46
47
48
l o o k u p 2 ( , [ ] ) −> n o t h i n g ;
l o o k u p 2 (V, [ { K,V} | ] ) −> { j u s t ,K} ;
l o o k u p 2 (V , [ | KVs ] ) −> l oo k up 2 (V, KVs ) .
49
50
51
52
remove ( , [ ] ) −> [ ] ;
remove (K, [ { K, } | KVs ] ) −> remove (K, KVs ) ;
remove (K, [ KV| KVs ] ) −> [KV| remove (K, KVs ) ] .
Listing 28: Veränderung des Clients
1
2
−module ( c h a t C l i e n t ) .
−e x p o r t ( [ c o n n e c t / 2 , keyboard / 2 ] ) .
3
4
5
6
7
8
9
10
11
12
c o n n e c t (Name , Node ) −>
{ chat , Node } ! { connect , s e l f ( ) , Name} ,
receive
{ c onnected , Names , S e r v e r P i d } −>
b a s e : p r i n t ( Names ) ,
spawn ( c h a t C l i e n t , keyboard , [ S e r v e r P i d , Name ] ) ,
run ( S e r v e r P i d ) ;
nameOccupied −> 42
end .
13
14
15
16
17
run ( S e r v e r P i d ) −>
receive
??? −> . . .
end .
18
19
20
21
22
23
24
25
26
keyboard ( S e r v e r P i d , Name) −>
S t r = b a s e : g e t L i n e (Name ) ,
case S t r o f
” bye ” −> S e r v e r P i d ! { l o g o u t , s e l f ( ) } ;
−> S e r v e r P i d ! { message , Str , s e l f ( ) } ,
keyboard ( S e r v e r P i d , Name)
end .
5.9 Robuste Programmierung
In Erlang sterben Prozesse normalerweise unbemerkt; Nachrichten an terminierte Prozesse gehen ohne
Fehlermeldungen verloren.
Prozesse können aber auch mittels link(pid) verbunden werden. Durch einen Link wird eine bidirektionale Verbindung zwischen dem ausführendem Prozess und dem Prozess pid aufgebaut. Stirbt
32
einer der gelinkten Prozesse (auch bei Terminierung), so wird der andere ebenfalls terminiert. Dies ist
zum Aufräumen bei Prozessende praktisch.
Alternativ ist es aber auch möglich, anstelle kooperativ mit zu sterben, eine Nachricht zu empfangen,
falls der andere gelinkte Prozesse abstürzt, ist auch der Empfang einer Benachrichtigung möglich.
Wurde mit folgendem Funktionsaufruf process_flag(trap_exit, true) ein Prozessflag gelöscht,
so erhält man anstelle des “Mitsterbens” eine Nachricht der Form: {’EXIT’, reason, pid}, wobei
reason den Grund für die Prozessterminierung und pid den abgestürzten Prozess angibt.
5.10 Systemabstraktionen
Bei der Entwicklung von Erlang-Anwendungen fällt auf, dass in verteilten Systemen immer wieder
ähnliche Prozess-Muster auftreten. Diese wurden als generische Varianten heraus gearbeitet und stehen
als Bibliothek in Erlang zur Verfügung. In der Vorlesung haben wir uns den gen_server etwas genauer
angeschaut. Er eigent sich zur Entwicklung von Client-Server-Systemen. Es werden insbesondere zwei
unterschiedliche Arten der Anfrage von Clienten beim Server unterstützt: synchrone Kommunikation
(wartet auf Antwort; gen_server:call) und asynchrone Kommunikation (kein Warten auf Antwort;
gen_server:cast). Außerdem werden natürlich auch die anderen Erlang-Mechanismen, wie Linking
und Code-Austausch zur Laufzeit unterstützt.
Der Vorteil der Verwendendung des Frameworks ist, dass bei der Implementierung des Protokolls
weniger Fehler gemacht werden können und aufbauend noch weitere Abstraktionen, wie z.B. der
Supervisiontree, definiert werden können.
Als Beispiel für die Verwendung eines gen_servers realisieren wir nun unseren Chat in diesem Framework:
Listing 29: Chat unter Verwendung des generischen Servers
1
2
−module ( genChat ) .
−b e h a v i o u r ( g e n s e r v e r ) .
3
4
5
6
7
−e x p o r t ( [ s t a r t / 0 ] ) .
−e x p o r t ( [ l o g i n / 2 , msg / 2 , l o g o u t / 1 , who / 1 ] ) .
−e x p o r t ( [ i n i t / 1 , h a n d l e c a l l / 3 , h a n d l e c a s t / 2 , h a n d l e i n f o / 2 ,
terminate /2 , code change / 3 ] ) .
8
9
10
s t a r t ( ) −> b a s e : putStrLn ( ” S e r v e r s t a r t e d ” ) ,
g e n s e r v e r : s t a r t l i n k ( { l o c a l , c h a t } , genChat , [ ] , [ ] ) .
11
12
13
i n i t ( ) −> b a s e : putStrLn ( ” S e r v e r i n i t i a l i z e d ” ) ,
{ok , [ ] } .
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
% Asynchronous i n t e r f a c e
h a n d l e c a s t ( { message , Text , P} , C l i e n t s ) −>
case l o oku p (P , C l i e n t s ) o f
nothing
−> ok ;
{ j u s t , Name} −> b r o a d c a s t ( { message , Text , Name} , C l i e n t s )
end ,
{ noreply , C l i e n t s } ;
h a n d l e c a s t ( { l o g o u t , P} , C l i e n t s ) −>
case l o oku p (P , C l i e n t s ) o f
nothing
−> { n o r e p l y , C l i e n t s } ;
{ j u s t , Name} −> NewClients=remove (P , C l i e n t s ) ,
b r o a d c a s t ( { l o g o u t , Name} , NewClients ) ,
{ n o r e p l y , NewClients }
end .
29
30
31
32
33
34
35
36
% Synchronous i n t e r f a c e
h a n d l e c a l l ( { l o g i n , Name} , { ClientP , } , C l i e n t s ) −>
base : p r i n t ( ClientP ) ,
b r o a d c a s t ( { l o g i n , Name} , C l i e n t s ) ,
{ reply ,
{ l o g g e d I n , s e l f ( ) , l i s t s : map( fun ( { ,N} ) −> N end , C l i e n t s ) } ,
[ { ClientP , Name } | C l i e n t s ] } ;
33
37
38
39
40
h a n d l e c a l l ( who , C l i e n t P , C l i e n t s ) −>
{ reply ,
{ o n l i n e , l i s t s : map( fun ( { ,N} ) −> N end , C l i e n t s ) } ,
Clients }.
41
42
43
44
h a n d l e i n f o ( Msg , C l i e n t s ) −>
b a s e : putStrLn ( ” Unexpected Message : ”++b a s e : show ( Msg ) ) ,
{ noreply , C l i e n t s } .
45
46
t e r m i n a t e ( shutdown , C l i e n t s ) −> ok .
47
48
49
50
c o d e c h a n g e ( OldVsn , S t a t e , E x t r a ) −>
% Code t o change S t a t e
{ok , S t a t e } .
51
52
53
%C l i e n t i n t e r f a c e
l o g i n ( Node , Name) −> g e n s e r v e r : c a l l ( { chat , Node } , { l o g i n , Name } ) .
54
55
msg ( Pid , Msg ) −> g e n s e r v e r : c a s t ( Pid , { message , Msg , s e l f ( ) } ) .
56
57
l o g o u t ( Pid ) −> g e n s e r v e r : c a s t ( Pid , { l o g o u t , s e l f ( ) } ) .
58
59
who ( Pid ) −> g e n s e r v e r : c a l l ( Pid , who ) .
60
61
62
63
64
%h e l p e r f u n c t i o n s
b r o a d c a s t ( , [ ] ) −> ok ;
b r o a d c a s t ( Message , [ { P , } | C l i e n t s ] ) −> P ! Message ,
b r o a d c a s t ( Message , C l i e n t s ) .
65
66
67
68
remove ( , [ ] ) −> [ ] ;
remove (K, [ { K, } | KVs ] ) −> remove (K, KVs ) ;
remove (K, [ KV| KVs ] ) −> [KV| remove (K, KVs ) ] .
69
70
71
72
l o o k u p ( , [ ] ) −> n o t h i n g ;
l o o k u p (K, [ { K,V} | ] ) −> { j u s t ,V} ;
l o o k u p (K, [ | KVs ] ) −> lo oku p (K, KVs ) .
Ähnlich funktionieren die generischen Abstraktionen einer Finite-State-Maschine (gen_fsm)und des
Event Handlings (gen_event). Die Prozesse der generische Prozessabstraktionen können dann auch
noch mit Hilfe der Supervisiontree-Abstraktion struturiert werden. Hier können dann Strategien, wie
die Anwendung auf den Ausfall einzelner Komponenten reagieren soll, definiert werden. Hierdurch
wird die Entwicklung robuster Anwendungen zusätzlich unterstützt.
6 Grundlagen der Verteilte Programmierung
Computer bilden Knoten in Netzwerken und können miteinander kommunizieren. Wegen der Komplexität der Netzwerktheorie wurden Schichten (Layer) zur Aufgabenverteilung eingeführt. Gängigste
Ansatz: Open Systems Interconnection (OSI)-Modell der International Standards Organisation (ISO)
6.1 7 Schichten des ISO-OSI-Referenzmodells
1. Physical Layer
Stellt den Strom auf der Leitung bzw. die 0 und 1 dar
2. Data Link Layer
Gruppierung von Bits/Bytes in Frames (Startmarken, Endmarken, Checksummen)
Versand zwischen benachbarten Knoten
defekte Frames werden verworfen
3. Network Layer
Bildung von Datenpaketen statt Frames (Netzwerkadresse, Routing)
34
4. Transport Layer
automatische Fehlererkennung und Fehlerkorrektur
Flusskontrolle zur Begrenzung der Datenmenge (⇒ Vermeidung von Überlast)
5. Session Layer
Anwendung-zu-Anwendung-Verbindung
Kommunikationssitzung, aber auch verbindungslose Kommunikation möglich
6. Presentation Layer
Representation von Daten und ihrer Konvertierung (z. B. Kompression, Verschlüsselung)
7. Application Layer
Anwendungen (z. B. Java-Anwendung)
6.2 Protokolle des Internets
1. Internet Protocol (IP):
• befindet sich auf Schicht 3 (network layer)
• wird sowohl in local area networks (LANs) als auch in wide area networks (WANs) verwendet
• Informationsaustausch zwischen Endknoten (Hosts) mit IP-Paketen oder IP-Datagrammen
• keine Abhängigkeit zwischen Paketen
• verbindungslos
• Versionen: IPv4 (32 Bit-Adressen), IPv6 (128 Bit-Adressen)
• IP-Adresse: z. B. 134.245.248.202 für falbala“
”
• Die IP-Adresse ist für uns eine eindeutige Adresse im Internet. Sie wird beim Routing
verwendet.
• Alternativ: Hostnames, Domain Name System (DNS)
“falbala.informatik.uni-kiel.de” wobei “falbala” der Rechnername und
“informatik.uni-kiel.de” die (Sub-) Domain sowie “uni-kiel.de” die Domain sind
2. Transmisson Control Protocol (TCP):
• befindet sich auf Schicht 4 (transport layer)
• garantiert die Zustellung (durch Wiederholung) und richtige Reihenfolge der gesendeten
Bytes
• verwendet IP
• sendet Byte-Sequenz
• sorgt für faire Lastverteilung
• arbeitet mit Ports (in der Regel) von 0 bis 65535
• hält die Verbindung zwischen Hosts
3. User Datagram Protocol (UDP):
• befindet sich auf Schicht 4 (transport layer)
• sendet nur Datenpakete
• arbeitet mit Ports
• keine garantierte Zustellung
• keine garantierte richtige Reihenfolge
• verbindungslos
• schneller als TCP
• ist fast nur ein Wrapper für IP
35
6.3 Darstellung von IP-Adressen/Hostnames in Java
Die Klasse InetAddress im Paket java.net behandelt sowohl IPv4 als auch IPv6. Es stehen zwei
Klassenmethoden zur Konstruktion zur Verfügung:
• static InetAddress getByName(String hostname) throws
java.net.UnknownHostException mit folgenden Möglichkeiten für den hostname:
– IP-Adresse ⇒ das Objekt wird angelegt, falls die IP-Adresse bekannt ist
– Domain-Name ⇒ DNS-Lookup; das Objekt wird angelegt, falls die IP-Adresse bekannt ist
– “localhost” ⇒ das Objekt wird mit der IP-Adresse 127.0.0.1 angelegt
• getAllByName(String hostname) ⇒ liefert alle IP-Adressen eines Hosts
Zur Ausgabe stehen weiterhin folgende Methoden zur Verfügung:
• String getHostAddress() ⇒ liefert eine IP-Adresse des Hosts
• String getHostName() ⇒ liefert den bei der Konstruktion angegebenen hostname
• boolean equals(InetAddress i) ⇒ vergleicht zwei IP-Adressen
6.4 Netzwerkkommunikation
Der Rechner kommuniziert über sogenannte Ports (durchnummeriert von 0 bis 65535) mit dem Netzwerk. Auf Programmiersprachenseite werden die Ports über Sockets angesprochen, wobei pro Port nur
eine Socketverbindung möglich ist.
6.4.1 UDP in Java
Z. B. nützlich bei DNS-Anfragen, Audio-/Videodaten, Zeitsignalen, bei zeitkritischen Anwendungen.
Eigentlich nur Wrapper für IP-Pakete.
Datagram Packet Klasse:
• repräsentiert UDP-Pakete
• Verwendung beim Senden und Empfangen ⇒ unterschiedliche Bedeutung der Adresse:
– Senden ⇒ Zieladresse
– Empfangen ⇒ Sourceadresse (günstig für Antwort)
• Aufbau: siehe Abbildung 6
Konstruktoren:
• DatagramPacket(byte[] buffer, int length) ⇒ zum Empfang von Paketen
• DatagramPacket(byte[] buffer, int length, InetAdress dest_addr,
int dest_port) ⇒ zum Senden
Methoden zum Modifizieren:
getAddress, setAddress, getData, setData, getLength, setLength, getPort, setPort
zur Anbindung an Ports: Datagram Socket Klasse:
• zum Senden und Empfangen von Datagram Paketen
• Aufbau siehe Abbildung 7
• Konstruktoren:
? DatagramSocket() throws SocketException ⇒ nur zum Senden von UDP-Paketen (freier Port wird verwendet und belegt); eine Exception wird geworfen, falls kein freier Port
mehr vorhanden ist
? DatagramSocket(int port) throws SocketException ⇒ zum Senden und Empfangen
geeignet (ist aber eher für Server gedacht); eine Exception wird geworfen, falls der Port
belegt ist
• wichtigste Methoden:
? close() ⇒ Schließen des Sockets und Freigeben des Ports
? getLocalPort, setLocalPort
36
Abbildung 6:
Abbildung 7:
? getLocalAddress
? getReceiveBuffersize, setReceiveBuffersize ⇒ Abfragen bzw. Setzen der maximalen
Größe des Puffers
? void receive(DatagrammPacket p) throws IOException ⇒ liest UDP-Paket
(gepuffert) und schreibt dieses in p
1. IP- und PortAdresse werden mit Senderadresse und Senderport überschrieben
2. das length-Attribut enthält die tatsächliche Länge, die kleiner oder gleich der Größe
des Pakets ist
3. blockiert, bis das Paket empfangen wurde
Programmierung mit UDP
verwendete Klassen:
• DatagramPacket mit receive und send ⇒ das, was übermittelt werden soll
• DatagramSocket
Für receive gibt es auch einen Timeout, welcher mit der Methode int getSoTimeout() gelesen und
mit setSoTimeout(int t) gesetzt werden kann. Die Zeit wird hierbei in Millisekunden angegeben
und definiert, wie lange receive maximal blockiert; falls keine Nachricht in dieser Zeit eintrifft, so
gibt es eine java.io.InterruptedIOException.
Die möglichen Verbindungen können auch eingeschränkt werden:
• connect(InetAddress remoteAddr, int remotePort) ⇒ es sind nur noch Verbindungen mit
dem angegebenen Rechner möglich (Filter für ein- und ausgehende Pakete)
• disconnect
• getInetAddr
• getPort
Statt byte[] können auch Streams angebunden werden. Als Beispiel betrachten wir eine einfache
Client-/Server-Architektur, die einen Inkrementserver zur Verfügung stellt. Der Client schickt einen
Wert hin und erhält den inkrementierten Wert zurück.
1
import j a v a . n e t . ∗ ;
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public c l a s s Sender {
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
byte [ ] b = new byte [ 1 ] ;
b [ 0 ] = Byte . p a r s e B y t e ( a r g s [ 0 ] ) ;
DatagramPacket p a c k e t = new DatagramPacket ( b , 1 ) ;
try {
p a c k e t . s e t A d d e s s ( I n e t A d d r e s s . getByName ( ” mickey .
i n f o r m a t i k . uni−k i e l . de ” ) ) ;
packet . setPort ( 6 0 0 0 1 ) ;
DatagramSocket s o c k e t = new DatagramSocket ( ) ;
s o c k e t . send ( p a c k e t ) ;
socket . r e c e i v e ( packet ) ;
System . out . p r i n t l n ( ” R e s u l t : ”+b [ 0 ] ) ;
} catch ( E x c e p t i o n e ) { System . out . p r i n t l n ( ”Bad I n t e r n e t c o n n e c t i o n ” ) ; }
}
}
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public c l a s s R e c e i v e r {
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
byte [ ] b = new byte [ 1 ] ;
b [ 0 ] = 42;
try {
DatagramPacket p a c k e t = new DatagramPacket ( b , 1 ) ;
DatagramSocket s o c k e t = new DatagramSocket ( 6 0 0 0 1 ) ;
while ( b [ 0 ] ! = 0 ) {
socket . r e c e i v e ( packet ) ;
System . out . p r i n t l n ( ” R e c e i v e d : ”+b [ 0 ] ) ;
b[0]++;
s o c k e t . send ( p a c k e t ) ;
}
} catch ( E x c e p t i o n e ) { System . out . p r i n t l n ( ”Bad I n t e r n e t c o n n e c t i o n ” ) ; }
}
}
Bemerkungen:
Zeile 5: Byte-Array mit einem Eintrag
Zeile 6: parseByte(...) übersetzt das Hauptargument als Byte (-128 bis 127)
Zeile 7: das erste Argument ist das Byte-Array; das zweite Argument die Packetlänge
Zeile 13: Anfrage an den Server
Zeile 14: Abwarten der Antwort (blockiert bis irgendein Packet ankommt)
Zeile 23: der Inhalt ist hier egal, da als Erstes etwas empfangen und der Inhalt dabei überschrieben
wird
Zeile 26: dient der Definition des zu hörenden Ports (“Server-Port” des Dienstes)
Zeile 27: Beenden des Receivers über java Sender -1 möglich
Zeile 28: blockiert, bis eine Anfrage reinkommt
Zeile 30: Inkrement-Service
Zeile 31: Absenden der Antwort (Adresse und Port sind durch den Paketeingang bereits definiert;
Verwendung der Adresse des Senders für die Antwort)
Problem: falls der Sender keine Antwort bekommt, so blockiert dieser dauerhaft ⇒ eine Lösungsmöglichkeit wäre ein Timeout ggf. mit erneuter Anfrage (unsichere Kommunikation).
Transmission Control Protocol (TCP)
Eigenschaften:
• verbindungsorientiertes Protokoll
• sichere Übertragung ist garantiert (es gehen keine Pakete verloren)
38
• Versenden von Datenströmen ⇒ werden automatisch in TCP-Pakete aufgebrochen
• richtige Reihenfolge ist garantiert
• die Bandbreite wird zwischen mehreren TCP-Verbindungen fair geteilt
• TCP unterscheidet zwischen Client und Server einer Verbindung
• der TCP-Client initiiert eine Verbindung
• der TCP-Server antwortet auf diese Verbindung ⇒ bidirektionale Verbindung ist etabliert
• TCP-Sockets in java.net ⇒ verbinden zwei Ports auf (eventuell) unterschiedlichen Rechnern
(kann auch für UDP genutzt werden)
Konstruktoren:
• Socket(InetAddr addr, int port) thorws java.net.IOException ⇒ stellt Verbindung zu
addr und port her, wobei als lokaler Port dieser Verbindung ein freier Port verwendet wird
• Socket(InetAddr addr, int port, InetAddr localAddr, int localPort) throws
java.net.IOException ⇒ analog wie oben, aber mit festem lokalem Port (→ besser nicht
verwenden, da ein Fehler auftritt, falls Port besetzt ist)
• Socket(String host, int port) throws java.net.IOException
Beachte: Socket blockiert, bis die Verbindung aufgebaut ist.
Methoden:
• close() ⇒ schließt die Verbindung (die Daten sollten vorher “geflushed” werden, d. h. der Puffer
sollte geleert und die Daten übertragen werden)
• InetAddr getInetAddress() ⇒ liefert remoteAddress
• int getPort() ⇒ liefert den remotePort
• getLocalInetAddress
• getLocalPort
• InputStream getInputStream()
• OutputStream getOutputStream()
• es ist auch eine Konfiguration der Socket-Verbindung (z. B. Timeout, etc.) möglich
Lesen und Schreiben geschieht über die Streams.
Listing 30: Im Beispiel verbindet sich der Client mit dem Server und empfängt eine Nachricht
1
2
import j a v a . n e t . ∗ ;
import j a v a . i o . ∗ ;
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public c l a s s D a y t i m e C l i e n t {
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
try {
S o c k e t s o c k e t = new S o c k e t ( ” mickey ” , 6 0 0 0 1 ) ;
System . out . p r i n t l n ( ” Connection e s t a b l i s h e d ” ) ;
B u f f e r e d R e a d e r r e a d e r = new B u f f e r e d R e a d e r (
new InputStreamReader ( s o c k e t . g e t I n p u t S t r e a m ( ) )
);
System . out . p r i n t l n ( ” R e s u l t : ”+r e a d e r . r e a d L i n e ( ) ) ;
socket . close ( ) ;
} catch ( IOException i o e ) { System . out . p r i n t l n ( ” E r r o r : ”+i o e ) ; }
}
}
TCP-Server: Klasse ServerSocket
Konstruktoren:
• ServerSocket(int port) throws java.io.IOException ⇒ “horcht” auf dem Port
• ServerSocket(int port, int NumberOfClients) throws java.io.IOException,
wobei numberOfClients die Größe der Backlog-Queue (Anzahl der wartenden Clients; default:
50) darstellt
Zum Akzeptieren von Verbindungen:
39
• Socket accept() throws java.io.IOException ⇒ wartet auf den Client und akzeptiert diesen → Verbindung
• close() ⇒ schließt den ServerSocket
• getSoTimeout()
• setSoTimeout(int ms) → Setzen eines Timeout für das accept
Listing 31: Server des Beispiels
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public c l a s s DayTimeServer {
public s t a t i c void main ( S t r i n g [ ] a r g s ) {
try {
S e r v e r S o c k e t s e r v e r = new S e r v e r S o c k e t ( 6 0 0 0 1 ) ;
while ( true ) {
Socket nextClient = s e r v e r . accept ( ) ;
OutputStream out = n e x t C l i e n t . getOutputStream ( ) ;
new P r i n t S t r e a m ( out ) . p r i n t (new j a v a . u t i l . Date ( ) ) ;
out . c l o s e ( ) ;
nextClient . close ( ) ;
// Ende d e r S o c k e t −Verbindung
}
} catch ( BindException be ) {
System . out . p r i n t l n ( ” S e r v i c e a l r e a d y r u n n i n g on p o r t 60001 ” ) ;
}
catch ( IOException i o e ) { System . out . p r i n t l n ( ” I /O e r r o r : ”+i o e ) ; }
}
}
6.5 Socket-Optionen
• Zugriff/Einstellung über Socket-Methoden, d. h. Methoden der Klasse Socket
• SO_KEEPALIVE: setKeepAlive(true) ⇒ regelmäßiges Pollen (Anfragen) zum Verbindungstest
• SO_RCVBUF: setReceiveBuffersize(size) ⇒ Betriebssystempuffer wird in seiner Größe beeinflusst (size ist in Byte)
• SO_SNDBUF: setSendBuffersize(size) ⇒ Betriebssystempuffer wird in seiner Größe beeinflusst
(size ist in Byte)
• SO_LINGER: setSoLinger(true,50) ⇒ auch beim Schließen des sockets wird noch für 50 Millisekunden versucht, die Daten zu senden
• TCP_NODELAY ⇒ Veränderung des TCP-Protokolls, so dass keine Transmissionskontrolle (Warten, bis ein Paket voll ist, bevor es gesendet wird) mehr stattfindet
ACHTUNG: die Performance kann sinken
• SO_TIMEOUT: setSoTimeout(int time) ⇒ Leseoperation versucht time Millisekunden zu lesen.
Eventuell tritt eine InterruptedIOException auf, falls keine Antwort kommt (time=0 entspricht
keinem Timeout).
Listing 32: Beispiel
1
2
3
4
5
6
7
try {
s . setSoTimeout ( 2 0 0 0 ) ;
. . . l e s e aus S o c k e t s . . .
} catch ( I n t e r r u p t e d I O E x c e p t i o n i o e ) {
System . e r r . p r i n t l n ( ” S e r v e r a n t w o r t e t n i c h t ! ” ) ;
}
catch ( IOException i o ) { . . . }
6.6 Sockets in Erlang
In der Vorlesung haben wir die TCP-Kommunikation über Sockets in Erlang kennen gelernt. Die Prinzipien sind natürlich identisch, mit denen in Java. Als Beispielanwendung haben wir einen einfachen
Tupelserver entwickelt, welchen wir später auch als Registry verwenden können:
40
Listing 33: TCP-Version eines Key-Value-Servers in Erlang
1
2
−module ( t c p ) .
−e x p o r t ( [ s e r v e r / 0 , c l i e n t / 0 , r e q u e s t H a n d l e r / 3 ] ) .
3
4
5
6
7
8
s e r v e r ( ) −>
{ok , LSock } = g e n t c p : l i s t e n ( 5 0 4 2 , [ l i s t , { packet , l i n e } ,
{ r e u s e a d d r , true } ] ) ,
DB = spawn ( k e y V a l u e S t o r e , s t a r t , [ ] ) ,
acceptLoop ( LSock ,DB) .
9
10
11
12
13
14
acceptLoop ( LSock ,DB) −>
spawn ( tcp , r e q u e s t H a n d l e r , [ LSock ,DB, s e l f ( ) ] ) ,
receive
nex t −> acceptLoop ( LSock ,DB)
end .
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
r e q u e s t H a n d l e r ( LSock ,DB, Parent ) −>
{ok , Sock } = g e n t c p : a c c e p t ( LSock ) ,
Parent ! next ,
receive
{ tcp , Sock , S t r } −>
case l i s t s : s p l i t w i t h ( fun (C) −> C/=44 end , l i s t s : d e l e t e ( 1 0 , S t r ) ) o f
{ ” s t o r e ” , [ | Arg ] } −>
{Key , [ | Value ] } = l i s t s : s p l i t w i t h ( fun (C) −> C/=44 end , Arg ) ,
DB! { s t o r e , Key , Value } ;
{ ” loo kup ” , [ | Key ] } −> DB! { lookup , Key , s e l f ( ) } ,
receive
Ans −> g e n t c p : send ( Sock , b a s e : show ( Ans)++” \n” )
end
end ,
g e n t c p : c l o s e ( Sock ) ;
Other −> b a s e : p r i n t ( Other )
end .
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
c l i e n t ( ) −>
case b a s e : g e t L i n e ( ” ( s ) t o r e , ( l ) ookup : ” ) o f
” s ” −> Key = b a s e : g e t L i n e ( ”Key : ” ) ,
Value = b a s e : g e t L i n e ( ” Val : ” ) ,
{ok , Sock } = g e n t c p : c o n n e c t ( ” l o c a l h o s t ” , 5 0 4 2 , [ l i s t , { packet , l i n e } ] ) ,
g e n t c p : send ( Sock , ” s t o r e , ”++Key++” , ”++Value++” \n” ) ,
g e n t c p : c l o s e ( Sock ) ;
” l ” −> Key = b a s e : g e t L i n e ( ”Key : ” ) ,
{ok , Sock } = g e n t c p : c o n n e c t ( ” l o c a l h o s t ” , 5 0 4 2 , [ l i s t , { packet , l i n e } ,
{ active , false }]) ,
g e n t c p : send ( Sock , ” lookup , ”++Key++” \n” ) ,
{ok , Res } = g e n t c p : r e c v ( Sock , 0 ) ,
b a s e : putStrLn ( Res ) ,
g e n t c p : c l o s e ( Sock )
end .
6.7 Verteilung von Kommunikationsabstraktionen
Nachdem wir nun verstanden haben, wie eine verteilte Kommunikation über TCP funktioniert, können
wir uns überlegen, wie Kommunikationsabstraktionen transparent in ein verteiltes System eingebettet
werden können. Hierbei ist es egal, ob es sich um Pids in Erlang, Remoteobjekte bei RMI oder
Ähnliches handelt. Man benötigt eine externe Darstellung des entsprechenden Objekts im Netzwerk,
welche über IP-Adresse des Rechners, einen Port und ggf. noch eine weitere Identifikationsnummer,
einen Zugriff auf das entsprechende Objekt von Remote ermöglicht.
Wird ein entsprechendes (lokales) Objekt serialisiert, wandelt man es in seine Remote-String-Darstellung um und verschickt diese. Alle Kommunikation auf ein Remote-Objekt wird dann über TCP
serialisiert, wodurch entsprechend auch wieder andere Kommunikationsabstraktionen verteilt werden
41
können.
Als Beispiel hierfür haben wir in Vorlesung und Übung Channels in Java, Erlang bzw. Haskell für
einen Zugriff von Remote erweitert. Hierbei haben wir insbesondere die folgenden Aspekte diskutiert
und realisiert:
• Verwendung des Key-Value-Stores als Registry
• Schreiben eines Channels von Remote
• Optimierung der Kommunikation durch Zusammenlegen der Kommunikation auf einen Port
• Garbage Collection von nicht mehr verwendeten Channels
• Lesen aus einem remote Channel
• Realisierung von Linking
7 Synchronisation durch Tupelräume
Ansatz ohne Kommunikation zwischen zwei Partnern (Server/Client)
Linda: Kommunikation durch Bekanntmachen und Abfragen von Tupeln. Das grundlegende Modell
sieht wie folgt aus:
• zentraler Speicher (ggf. auch mehrere)
Tupelraum als Multimenge von Tupeln
• viele unabhängige Prozesse, die Tupel in den Tupelraum hinzufügen oder Tupel aus diesem lesen
(entfernen) können
• sprachunabhängig: Linda ist ein Kommunikationsmodell, das zu verschiedenen Sprachen hinzugefügt werden kann (z. B. C-Linda, Fortran-Linda, Scheme-Linda, Java-Spaces)
Hierbei basiert Linda auf folgenden Grundoperationen:
• out(t) ⇒ fügt das Tupel t in den Tupelraum ein. Berechnungen in t werden vor dem Einfügen
ausgewertet.
Beispiel: out(("hallo",1+3,6.0/4.0)) fügt das Tupel ("hallo",4,1.5) in den Tupelraum
ein.
• in(t) ⇒ entfernt das Tupel t aus dem Tupelraum, falls t dort vorhanden ist. Ansonsten suspendiert es, bis t im Tupelraum eingefügt wurde. t kann auch Variablen enthalten (Pattern
Matching). Diese parsen immer und werden an entsprechende Werte aus dem Tupelraum gebunden.
Beispiel: in(("hallo",?i,?f)) entfernt obig eingefügtes Tupel und bindet i/4 und f/1.5
• rd(t) ⇒ wie in(t), aber das Tupel wird nicht entfernt
• eval(stmt) ⇒ erzeugt einen neuen Prozess, der stmt auswertet
• inp(t), rdp(t) ⇒ wie in, rd, aber ohne Blockade, falls das Tupel nicht vorhanden ist. Als
Rückgabewert erhält man einen boolschen Wert. Dieser ist false, falls t nicht im Tupelraum
vorhanden ist.
• Beispiel: zwei Prozesse, die abwechselnd aktiv sind (ping-pong) in Linda
1
2
3
4
5
6
1
2
3
4
5
6
ping ( ) {
while ( true ) {
out ( ” p i n g ” ) ;
i n ( ” pong ” ) ;
}
}
pong ( ) {
while ( true ) {
in ( ” ping ” ) ;
out ( ” pong ” ) ;
}
}
42
Listing 34: Hauptprogramm
1
2
3
4
main ( ) {
eval ( ping ( ) ) ;
e v a l ( pong ( ) ) ;
}
Es ist aber auch möglich, Datenstrukturen durch Tupel darzustellen:
Interpretiere bestimmte Werte als Referenzen im Speicher (Tupelraum), z. B. durch Integer-Werte.
Dann ist z. B. ein Array kodierbar als:
{("A",1,r1),("A",2,r2),...,("A",n,rn);...}
Der Zugriff auf einzelne Arraykomponenten kann dann ermöglicht werden. Z. B. können wir den 4.
Index im Array wie folgt mit drei multiplizieren:
in("A",4,?e);
out("A",4,3*e);
Anwendung: verteilte und parallele Bearbeitung komplexer Datenstrukturen
Listing 35: Rechenoperation auf jedem Element
1
2
3
4
5
f o r ( i =1; i<=n ; i ++) { e v a l ( f ( i ) ) ; }
f ( int i ) {
i n ( ”A” , i , ? r ) ;
out ( ”A” , i , compute ( r ) ) ;
}
Dies ist nur sinnvoll, wenn compute aufwendig ist und die Verteilung auf mehrere Prozessoren möglich
ist.
Formulierung von Synchronisationsproblemen: Kodierung der dinierenden Philosophen
1
2
3
4
5
6
7
8
9
10
p h i l ( int i ) {
while ( true ) {
think ( ) ;
in ( ” stab ” , i ) ;
i n ( ” s t a b ” , ( i +1)% j ) ;
eat ( ) ;
out ( ” s t a b ” , i ) ;
out ( ” s t a b ” , ( i +1)% j ) ;
}
}
Listing 36: Initialisierung
1
2
3
4
f o r ( i =0; i <s ; i ++) {
out ( ” s t a b ” , i ) ;
eval ( phil ( i ) ) ;
}
Eine Client/Server-Kommunikation ist auch modellierbar
Idee: Zur Zuordnung der Antworten zu Anfragen werden von dem Server eindeutige ID’s verwendet:
1
2
3
4
5
6
7
8
9
10
server () {
i n t i =1;
eval ( idserver ( 1 ) ) ;
while ( true ) {
in ( ” request ” , i , ? req ) ;
...
out ( ” r e s p o n s e ” , i , r e s ) ;
i ++;
}
}
43
1
2
3
4
5
6
7
1
2
3
4
5
6
i d s e r v e r ( int i ) {
while ( true ) {
i n ( ”newID” ) ;
out ( ”newID” , i ) ;
i ++;
}
}
Client () {
int id ;
out ( ”newID” ) ;
i n ( ”newID” , ? i d ) ;
out ( ” r e q u e s t ” , ? id , 1 ) ;
i n ( ” r e s p o n s e ” , id , ? r e s 1 ) ;
7.1 Java-Spaces
Linda in Java
out t
space.write(Entry e, Transaction tr, long lease) mit:
• Entry e als Interface für Einträge in den Tupelspace
• Transaction tr als Transaktionskonzept zur atomaren Ausführung mehrerer Modelle von Tupelräumen, sonst null
• long lease gibt die Zeit an, wie lange ein Tupelraum verbleibt
in t
space.take(Entry e, Transaction tr, long lease) mit:
• Entry e als Template (Pattern)
• long lease als maximale Suspensionszeit
das Template kann auch Variablen/Wildcards in Form von null enthalten. Das Template matcht den
Eintrag im Space, falls:
1. das Template den gleichen Typ oder Obertyp wie der Eintrag hat
2. alle Felder des Templates matchen:
– Wildcard (null) matcht immer
– Werte matchen nur gleiche Werte
Das Matchen der Variablen erfolgt durch das Ersetzen von null durch den korrekten Wert.
rd(t)
space.read(...)
eval(stmt)
Java Threads
inp(t), rdp(t)
über lease=0
Beachte: Die Objektidentität bleibt in Space nicht erhalten. Objekte werden (de-)serialisiert, da es sich
häufig um verteilte Anwendungen handelt. Spaces können lokal oder im Netzwerk gestartet werden.
Sie können persistent sein. Sie können über Jini-lookup-Sevice oder RMI-Registrierung oder andere
gestartet werden.
7.2 Implementierung von Tupelräumen in Erlang
Um zu verstehen, wie Tupelräume realisiert werden können, betrachten wir eine prototypische Implementierung in Erlang. Erlang verfügt zwar auch über Pattern Matching, Pattern sind aber keine
Bürger erster Klasse und können nicht als Nachrichten verschickt werden. Es ist in Erlang aber möglich,
Funktionen zu verschicken. Somit können wir recht einfach partielle Funktionen verschicken, welche
das Pattern Matching im Tupelraum realisieren. Als Beispiel können wir das Pattern {hallo, X, Y}
durch die Funktionsdefinition
1
fun ( { h a l l o , X,Y) −> {X,Y} end
realisieren. Hierbei liefern wir die Bindungen als Paar der gebundenen Variablen zurück. Die Partialität
der Funktion verwenden wir um das Nicht-Matchen des Patterns auszudrücken. Im Server können wir
diesen Fall mit Hilfe eines catch-Konstrukts erkennen und entsprechend verfahren.
44
Im Tupel-Server benötigen wir zwei Strukturen (Listen) zur Speicherung von Anfragen. In der einen
Liste werden alle Werte im Tupelraum gespeichert. Neue in- bzw. rd-Requests werden zunächst gegen
alle Werte gemacht und ggf. direkt beantwortet. Sollte kein passender Wert gefunden werden, speichern
wir die Requests in einer zweiten Liste. Wird später dann ein neuer Wert eingefügt, wird dieser
entsprechend für alle offenen Requests überprüft und ggf. eine Antwort zurück geschickt.
Die konkrete Implementierung ergibt sich wie folgt:
Listing 37: Erlang Implementierung des Linda-Modells
1
2
−module( l i n d a ) .
−export ( [ s t a r t / 0 , out / 2 , i n / 2 , rd /2 ] ) .
3
4
5
s t a r t ( ) −> r e g i s t e r ( l i n d a , s e l f ( ) ) ,
lindaLoop ( [ ] , [ ] ) .
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
l i n d a L o o p ( Ts , Reqs ) −>
receive
{out , T} −> {Reqs1 , KeepT} = findMatchingReq (T, Reqs ) ,
case KeepT of
true −> l i n d a L o o p ( [T | Ts ] , Reqs1 ) ;
f a l s e −> l i n d a L o o p ( Ts , Reqs1 )
end ;
{InRd , F , P} −>
case f i n d M a t c h i n g T u p l e (F , Ts , InRd ) of
nothing
−> l i n d a L o o p ( Ts , [ {F , P , InRd} | Reqs ] ) ;
{ j u s t , {Match , Ts1}} −> P! {tupleMatch , Match} ,
l i n d a L o o p ( Ts1 , Reqs )
end
end .
21
22
23
24
25
26
27
28
29
30
31
32
33
findMatchingReq ( , [ ] ) −> { [ ] , true} ;
findMatchingReq (T, [ Req | Reqs ] ) −>
{F , P , InRd} = Req ,
case catch F(T) of
{ ’EXIT ’ , } −> {Reqs1 , KeepT} = findMatchingReq (T, Reqs ) ,
{ [ Req | Reqs1 ] , KeepT} ;
Match
−> P! {tupleMatch , Match} ,
case InRd of
i n −> {Reqs , f a l s e } ;
rd −> findMatchingReq (T, Reqs )
end
end .
34
35
36
37
38
39
40
41
42
43
44
45
46
f i n d M a t c h i n g T u p l e ( , [ ] , ) −> n o t h i n g ;
f i n d M a t c h i n g T u p l e (F , [T | Ts ] , InRd ) −>
case catch F(T) of
{ ’EXIT ’ , } −> case f i n d M a t c h i n g T u p l e (F , Ts , InRd ) of
nothing
−> n o t h i n g ;
{ j u s t , {M1, Ts1}} −> { j u s t , {M1, [T | Ts1 ] }}
end ;
Match
−> case InRd of
i n −> { j u s t , {Match , Ts}} ;
rd −> { j u s t , {Match , [T | Ts ] }}
end
end .
47
48
out ( Space , T) −> Space ! {out , T} .
49
50
51
52
53
i n ( Space , F) −> Space ! { in , F , s e l f ( ) } ,
receive
{tupleMatch , Match} −> Match
end .
54
55
56
rd ( Space , F) −> Space ! {rd , F , s e l f ( ) } ,
receive
45
57
58
{tupleMatch , Match} −> Match
end .
8 Spezifikation und Testen von Systemeigenschaften
Das Testen von Systemeigentschaften von nebenläufigen Systemen geht über Ansätze, wie Unit-Tests
oder Zusicherungen im Quellcode hinaus. Man ist in der Regel daran interessiert, globale Systemeigenschaften zu analysieren, die sich aus den Zuständen der einzelnen Prozesse zusammen setzen. Als
Beispiel können wir noch einmal auf das nebenläufige Verändern eines geteilten Zustands zurückkommen, hier als Client-Server-Struktur in Erlang:
Listing 38: Kritischer Bereich
1
2
−module ( c r i t i c a l ) .
−e x p o r t ( [ s t a r t / 0 ] ) .
3
4
5
6
s t a r t ( ) −> S = spawn ( fun ( ) −> s t o r e ( 4 2 ) end ) ,
spawn ( fun ( ) −> i n c ( S ) end ) ,
dec ( S ) .
7
8
9
10
11
s t o r e (V) −> r e c e i v e
{ lookup , P} −> P ! V, s t o r e (V ) ;
{ s e t , V1}
−> s t o r e (V1)
end .
12
13
14
15
16
17
i n c ( S ) −> S ! { lookup , s e l f ( ) } ,
receive
V −> S ! { s e t ,V+1}
end ,
inc (S ) .
18
19
20
21
22
23
dec ( S ) −> S ! { lookup , s e l f ( ) } ,
receive
V −> S ! { s e t , V−1}
end ,
dec ( S ) .
Hierbei stellen die Zustände, bevor der Zustand neu geschireben wird sicherlich kritische Zustände da.
Eigentlich sollten beide Client-Prozesse nie gleichzeitig in diesen Zuständen sein, da dann der Speicher
in Abhängigkeit eines veralteten Wertes verändert wird. Solche Eigenschaften können elegant mit Hilfe
von Temporallogiken ausgedrückt werden.
9 Linear Time Logic (LTL)
Die Lineare temporalale Logik (LTL) ermöglicht es, Eigenschaften von Pfaden eines Systems auszudrücken. Ihre Syntax ist wie folgt definiert:
Syntax der Linear Time Logic (LTL)
Sei Props eine (endliche) Menge von Zustandspropositionen. Die Menge der LTL-Formeln ist definiert
als kleinste Menge mit:
• Props ⊆ LT L
• ϕ, ψ ∈ LT L =⇒ −
−
−
−
Zustandspropositionen
¬ϕ ∈ LT L
ϕ ∧ ψ ∈ LT L
Xϕ ∈ LT L
ϕ U ψ ∈ LT L
46
Negation
Konjunktion
Im nächsten Zustand gilt ϕ
ϕ gilt bis ψ gilt
Eine LTL-Formel wird bzgl eines Pfades interpretiert. Propositionen gelten, wenn der erste Zustand
des Pfades die Propositionen erfüllt. Der Modaloperator Next Xϕ ist erfüllt, wenn ϕ auf dem Pfad
ab dem nächsten Zustand gilt. Außerdem enthält LTL den Until-Operator, welcher erfüllt ist, wenn
ϕ solange gilt, bis ψ gilt. Hierbei mus ψ aber tatsächlich irgendwann gelten.
Formal ist die Semantik wie folgt definiert:
Pfadsemantik von LTL
Ein unendliches Wort über Propositionsmengen π = p0 p1 p2 . . . ∈ P(Props)ω heißt Pfad. Ein Pfad π
erfüllt eine LTL-Formel ϕ (π |= ϕ) wie folgt:
p0 π
π
π
p0 π
p0 p1 . . .
|=
|=
|=
|=
|=
: ⇐⇒
: ⇐⇒
: ⇐⇒
: ⇐⇒
: ⇐⇒
P
¬ϕ
ϕ∧ψ
Xϕ
ϕU ψ
P ∈ p0
π 6|= ϕ
π |= ϕ ∧ π |= ψ
π |= ϕ
∃i ≥ 0 : pi pi+1 . . . |= ψ ∧ ∀j < i : pj pj+1 . . . |= ϕ
Ausgehend vom Kern von LTL können wir praktische Abkürzungen defnieren, welche es häufig einfacher machen, Eigentschaften zu spezifizieren.
Abkürzungen in LTL
f alse
true
ϕ∨ψ
ϕ→ψ
F ϕ
Gϕ
F ∞ϕ
G∞ ϕ
ϕWψ
ϕ Rψ
:=
:=
:=
:=
:=
:=
:=
:=
:=
:=
¬P ∧ P 5
¬f alse
¬(¬ϕ ∧ ¬ψ)
¬ϕ ∨ ψ
true U ϕ
¬F ¬ϕ
GF ϕ
F Gϕ
(ϕ U ψ) ∨ (G ϕ)
¬(¬ϕ U ¬ψ)
der Boolesche Wert false
der Boolesche Wert true
Disjunktion
Implikation
Irgendwann (Finally) gilt ϕ
Immer (Globaly) gilt ϕ
Unendlich oft gilt ϕ
Nur endlich oft gilt ϕ nicht
Schwaches Until (weak until)
ϕ lässt ψ frei (release)
Das Release ist hierbei sicherlich nicht ganz so verständlich, wird aber bei der Implementierung recht
nützlich sein.
Ausgehend von der Pfadsemantik, erfüllt ein System eine LTL-Formel, wenn jeder Pfad des Systems
die LTL-Formel erfüllt. Formal werden Systeme hierbei als Kripke-Strukturen repräsentiert:
Kripke-Struktur
K = (S, Props, −→, τ, s0 ), wobei
• S eine Menge von Zuständen,
• Props eine Menge von Propositionen,
• −→⊆ S × S die Transitionsrelation,
• τ : S −→ P(Props) eine Beschriftungsfunktion für die Zustände und
• s0 ∈ S der Startzustand sind,
heißt Kripke-Struktur. Anstatt (s, s0 ) ∈−→ schreibt man in der Regel s −→ s0 .
Ein Zustandspfad von K ist ein unendliches Wort s0 s1 . . . ∈ S ω mit si −→ si+1 und s0 der Startzustand
von K. Wenn s0 s1 . . . ein Zustandspfad von K und pi = τ (si ) für alle i ≥ 0, dann ist das unendliche
Wort p0 p1 . . . ∈ P(Props)ω ein Pfad von K.
Kripke-Struktur-Semantik von LTL
Sei K = (S, →, τ, s0 ) eine Kripke-Struktur. K erfüllt eine LTL-Formel ϕ (K |= ϕ) genau dann, wenn
für alle Pfade π von K: π |= ϕ.
47
Mit Hilfe von LTL können im Wesentlichen drei unterschiedliche Arten von Eigenschaften definiert
werden:
• Sicherheitseigenschaften (Safety):
Solche Eigenschaften haben in der Regel die Form: G ¬ ϕ
Ein mögliches Beispiel für ϕ kann für zwei Prozesse, für welche der wechselseitige Ausschluss
gewährleistet sein soll cs1 ∧ cs2 sein, wenn der Prozess i ∈ {1, 2} seinen kritischen Bereich durch
die Proposition csi anzeigt.
• Lebendigkeitseigenschaften (Liveness):
Diese Eigenschaften haben in der Regel die Form: F ϕ, wobei sie oft auch nur bedingt gelten
sollen, z.B. in einer Implikation.
Ein Beispiel ist, die Antwort (answ) auf einen Request (req) eines Servers: G (req −→ F answ).
Hierbei ist die eigentlich Lebendigkeitseigenschaft nur erwünscht, wenn eine Anfrage erfolgte.
Mit Hilfe von G fordern wir, dass diese Eigenschaft überall im System gelten soll.
• Fairness:
Fairness kann als F ∞ ϕ beschrieben werden. Hiermit können wir ausdrücken, dass jeder beteiligte
Prozess unendlich oft dran kommt und vom Scheduler nicht ausgeschlossen wird.
Man verwendet Fairnesseingenschaften in der Regel als Vorbedingung einer Implikation, um
“unfaire” Pfade nicht zu betrachten. Dies entspricht der Beschränkung auf präemptive Scheduler,
welche wir einleitend in Betracht gezogen haben.
9.1 Implementierung von LTL zum Testen
LTL wurde ursprünglich als Spezifikationslogik entwickelt, welche dann mit Hilfe von Model Checking
gegenüber einer endlichen Kripke-Struktur (Systembeschreibung) überprüft werden kann. Das Model
Checking ist aber für reale Systeme im Allgemeinen unentscheidbar, wie folgendes Erlang Beispiel
zeigt:
Listing 39: Unentscheidbarkeit von LTL
1
s t a r t ( ) −> f ( ) , prop ( t e r m i n a t e d ) .
Hierbei soll prop(terminated) bedeuten, dass das Programm nachdem der Funktionsaufruf f() beendet wurde, in einen Zustand überführt wird, in dem die Proposition terminated gilt. Will man für
dieses Programm die LTL-Formel
F terminated
entscheiden, so müsste man ja entscheiden, ob die Funktion f() terminiert. Da Erlang ja eine universelle Programmiersprache ist, ist dies aber direkt ein Widerspruch zum Halteproblem und somit im
Allgemeinen unentscheidbar.
Auch wenn die formale Verifikation also nicht möglich ist, kann es aber dennoch sinnvoll sein, LTLEigenschaften zu testen. Inwieweit die sich ergebenden Aussagen zu interpretieren sind, werden wir
später noch besprechen. Zunächst wollen wir den Ansatz zum LTL-Testen implementieren.
Als ersten Schritt müssen wir uns eine geeignete Darstellung von LTL-Formeln in Erlang überlegen.
Wir beschänken und hier auf einen Teil der LTL-Operatoren. Die restlichen Operatoren sollen als
Übung ergänzt werden. Bei der Negation des Untils ist die Verwendung des Release hilfreich, da dies
gerade als “Negation” des Until definiert ist.
Listing 40: LTL-Implementierung in Erlang
1
−module ( l t l ) .
2
3
−e x p o r t ( [ prop / 1 , d i s j / 2 , c o n j / 2 , neg / 1 , x / 1 , f / 1 , g / 1 ] ) .
4
5
6
7
8
9
10
11
prop ( Phi ) −> { prop , Phi } .
neg ( Phi ) −> { neg , Phi } .
d i s j ( Phi , P s i ) −> { d i s j , Phi , P s i } .
c o n j ( Phi , P s i ) −> { c o n j , Phi , P s i } .
x ( Phi ) −> {x , Phi } .
g ( Phi ) −> {g , Phi } .
f ( Phi ) −> { f , Phi } .
12
48
13
14
15
16
17
18
19
20
showLTL ( { prop , P} ) −> b a s e : show (P ) ;
showLTL ( { d i s j , Phi , P s i } ) −> ” ( ”++showLTL ( Phi)++” o r ”++showLTL ( P s i)++” ) ” ;
showLTL ( { c o n j , Phi , P s i } ) −> ” ( ”++showLTL ( Phi)++”and”++showLTL ( P s i)++” ) ” ;
showLTL ( { neg , Phi } ) −> ” ( neg ”++showLTL ( Phi)++” ) ” ;
showLTL ( { x , Phi } ) −> ”X”++showLTL ( Phi ) ;
showLTL ( { f , Phi } ) −> ”F”++showLTL ( Phi ) ;
showLTL ( { g , Phi } ) −> ”G”++showLTL ( Phi ) ;
showLTL ( Phi ) −> b a s e : show ( Phi ) .
Das erste Problem bei der Implementierung stellt die Negation dar. Diese ist über die Negation auf
der mathematischen Meta-Ebene definiert und so nur schwer zu implementieren. Es ist aber möglich,
LTL-Formeln so äquivalent umzuformen, dass die Negation nur noch direkt vor Propositionen auftritt.
Die Negation wird vom Prinzip nach Innen geschoben.
Listing 41: LTL-Implementierung in Erlang
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
n o r m a l i z e ( true ) −> true ;
n o r m a l i z e ( f a l s e ) −> f a l s e ;
n o r m a l i z e ( {prop , P} ) −> prop (P ) ;
n o r m a l i z e ( { d i s j , Phi , P s i } ) −> d i s j ( n o r m a l i z e ( Phi ) , n o r m a l i z e ( P s i ) ) ;
n o r m a l i z e ( { c o n j , Phi , P s i } ) −> c o n j ( n o r m a l i z e ( Phi ) , n o r m a l i z e ( P s i ) ) ;
n o r m a l i z e ( {x , Phi} ) −> x ( n o r m a l i z e ( Phi ) ) ;
n o r m a l i z e ( { f , Phi} ) −> f ( n o r m a l i z e ( Phi ) ) ;
n o r m a l i z e ( {g , Phi} ) −> g ( n o r m a l i z e ( Phi ) ) ;
n o r m a l i z e ( {neg , true} ) −> f a l s e ;
n o r m a l i z e ( {neg , f a l s e } ) −> true ;
n o r m a l i z e ( {neg , {prop , P}} ) −> neg ( prop (P ) ) ;
n o r m a l i z e ( {neg , { d i s j , Phi , P s i }} ) −> c o n j ( n o r m a l i z e ( neg ( Phi ) ) , n o r m a l i z e ( neg ( P s i ) ) ) ;
n o r m a l i z e ( {neg , { c o n j , Phi , P s i }} ) −> d i s j ( n o r m a l i z e ( neg ( Phi ) ) , n o r m a l i z e ( neg ( P s i ) ) ) ;
n o r m a l i z e ( {neg , {x , Phi}} ) −> x ( n o r m a l i z e ( neg ( Phi ) ) ) ;
n o r m a l i z e ( {neg , { f , Phi}} ) −> g ( n o r m a l i z e ( neg ( Phi ) ) ) ;
n o r m a l i z e ( {neg , {g , Phi}} ) −> f ( n o r m a l i z e ( neg ( Phi} ) ) .
Als Nächstes stellen wir eine Funktion zur Verfügung, welche anhand der gültigen Propositionen
Formeln auf ihre Gültigkeit im aktuellen Zustand überprüft. Mögliche Ergebnisse sind hierbei true,
f alse und noch nicht entschieden. Im letzteren Fall erhalten wir eine Formel, welche wir auf dem
weiteren Pfad überprüfen müssen.
Listing 42: LTL-Implementierung in Erlang
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
c h e ck ( true , P r o p s ) −> true ;
c h e ck ( f a l s e , P r o p s ) −> f a l s e ;
c h e ck ( {prop , P} , Props ) −>
l i s t s : member (P , Props ) ;
c h e ck ( {neg , {prop , P}} , Props ) −> not ( check ( prop (P) , Props ) ) ;
c h e ck ( { c o n j , Phi , P s i } , Props ) −>
case check ( Phi , Props ) of
true −> check ( Psi , Props ) ;
f a l s e −> f a l s e ;
Phi1 −> case check ( Psi , Props ) of
true −> Phi1 ;
f a l s e −> f a l s e ;
P s i 1 −> c o n j ( Phi1 , P s i 1 )
end
end ;
c h e ck ( { d i s j , Phi , P s i } , Props ) −>
case check ( Phi , Props ) of
true −> true ;
f a l s e −> check ( Psi , Props ) ;
Phi1 −> case check ( Psi , Props ) of
true −> true ;
f a l s e −> Phi1 ;
P s i 1 −> d i s j ( Phi1 , P s i 1 )
end
end ;
c h e ck ( {x , Phi} , P r o p s ) −> x ( Phi ) ;
49
27
28
29
30
c h e ck ( { f , Phi} , Props ) −> check ( d i s j ( Phi , x ( f ( Phi ) ) ) , Props ) ;
c h e ck ( {g , Phi} , Props ) −> check ( c o n j ( Phi , x ( g ( Phi ) ) ) , Props ) ;
c h e ck ( Phi , P r o p s ) −>
b a s e : putStrLn ( ” Unexpected f o r m u l a i n check : ”++showLTL ( Phi ) ) .
Auf Grund der hier verwendeten dreiwertigen Logik, können wir Konjunktion und Disjunktion nicht
auf die entsprecheden Booleschen Operationen zurückführen, sondern müssen sie selbst implementieren. Die Formel X ϕ können wir nicht entscheiden und geben sie unverändert zurück.
Bei den Formeln F ϕ und G ϕ wickeln wir den Fixpunkt, über den diese Formeln definiert sind ab.
Dies ist auf Grund der folgenden Äquivalenzen möglich:
π |= F ϕ gdw. π |= ϕ ∨ X F ϕ
π |= G ϕ gdw. π |= ϕ ∧ X G ϕ
Beachte, dass die sich ergebenden Formeln nur true, f alse, X ϕ, Disjunktionen oder Konjunktionen
sein können. F , G und prop sind nur unterhalb eines X möglich.
Für den Fall, dass das System einen Schritt macht, können wir nun in den Formeln das X realisieren:
Listing 43: LTL-Implementierung in Erlang
1
2
3
4
s t e p ( {x , Phi} ) −> Phi ;
s t e p ( { c o n j , Phi , P s i } ) −> c o n j ( s t e p ( Phi ) , s t e p ( P s i ) ) ;
s t e p ( { d i s j , Phi , P s i } ) −> d i s j ( s t e p ( Phi ) , s t e p ( P s i ) ) ;
s t e p ( Phi ) −> b a s e : putStrLn ( ” Unexpected f o r m u l a i n s t e p : ”++showLTL ( Phi ) ) .
Die letzte Regel sollte eigentlich nicht auftreten und dient nur dem Erkennen von Programm-Fehlern.
Nun können wir der Server, welcher für das Verwalten und Überprüfen der LTL-Formeln zuständig
ist, implementieren. Er wird global unter dem Atom ltl registriert, damit seine Pid nicht in jeden
Prozess propagiert werden muss und Anwendungen somit einfacher erweitert werden können.
Als Zustand speichern wir in diesem Prozess eine Liste von zur Zeit gültigen Propositionen, sowie die
Formeln, welche in den weiteren Zuständen noch überprüft werden müssen. Außerdem speichern wir
die ursprünglichen mit assert hinzugefügten Formeln, um diese im Fall einer gefundenen Verletzung
ausgeben zu können. Alternaitv könnte man hier sicherlich auch einen String, der die Eigenschaft
geeignet benennt hinterlegen.
Bei den Propositionen ermöglichen wir es den Prozessen beliebige Propositionen zu setzen bzw. zu
löschen. Zur Vereinfachung vermeiden wir doppelte Vorkommen, da wir die Propositionsmenge als
einfache Liste implementieren. Zur einfacheren Kommunikation mit dem LTL-Server, stellen wir geeignete Schnittstellen-Funktionen zur Verfügung.
Erkennen wir die Verletzung einer Assertion, geben wir eine Warnmeldung aus. Diese sollte besser
in eine spezielle Datei geschrieben werden, da sie sonst in anderen Ein-/Ausgaben des eigentlichen
Programms untergeht.
Listing 44: LTL-Implementierung in Erlang
1
2
s t a r t ( ) −> LTL = spawn( fun ( ) −> l t l ( [ ] , [ ] , [ ] ) end ) ,
r e g i s t e r ( l t l , LTL ) .
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
l t l ( Phis , A s s e r t s , Props ) −>
receive
{ a s s e r t , Phi} −> case check ( n o r m a l i z e ( Phi ) , Props ) of
true −> l t l ( Phis , A s s e r t s , Props ) ;
f a l s e −> ptong ( Phi ) ,
l t l ( Phis , A s s e r t s , Props ) ;
Phi1 −> l t l ( [ Phi1 | P h i s ] , [ Phi | A s s e r t s ] , Props )
end ;
{newProp , P} −>
NewProps=[ P | Props ] ,
P h i s 1 = l i s t s : map( fun ( Phi ) −> check ( s t e p ( Phi ) , NewProps ) end , P h i s ) ,
{Phis2 , A s s e r t s 2 } = a n a l y z e ( Phis1 , A s s e r t s ) ,
l t l ( Phis2 , A s s e r t s 2 , NewProps ) ;
{ r e l e a s e P r o p , P} −>
50
18
19
20
21
22
23
24
25
26
27
28
29
NewProps= l i s t s : d e l e t e (P , Props ) ,
P h i s 1 = l i s t s : map( fun ( Phi ) −> check ( s t e p ( Phi ) , NewProps ) end , P h i s ) ,
{Phis2 , A s s e r t s 2 } = a n a l y z e ( Phis1 , A s s e r t s ) ,
l t l ( Phis2 , A s s e r t s 2 , NewProps ) ;
s t a t u s −>
b a s e : putStrLn ( ” Unevaluated A s s e r t i o n s : ” ) ,
l i s t s : z i p w i t h ( fun ( Phi , A s s e r t ) −>
b a s e : putStrLn ( showLTL ( A s s e r t ) ) ,
b a s e : putStrLn ( ”
”++showLTL ( Phi ) ) end ,
Phis , A s s e r t s ) ,
l t l ( Phis , A s s e r t s , Props )
end .
30
31
32
33
34
a s s e r t ( Phi ) −> l t l ! { a s s e r t , Phi} .
newProp (P) −> l t l ! {newProp , P} .
r e l e a s e P r o p (P) −> l t l ! { r e l e a s e P r o p , P} .
s t a t u s ( ) −> l t l ! s t a t u s .
35
36
37
38
39
40
41
42
43
44
a n a l y z e ( [ ] , [ ] ) −> { [ ] , [ ] } ;
a n a l y z e ( [ true | P h i s ] , [ A s s e r t | A s s e r t s ] ) −>
a n a l y z e ( Phis , A s s e r t s ) ;
a n a l y z e ( [ f a l s e | P h i s ] , [ A s s e r t | A s s e r t s ] ) −>
potong ( A s s e r t ) ,
a n a l y z e ( Phis , A s s e r t s ) ;
a n a l y z e ( [ Phi | P h i s ] , [ A s s e r t | A s s e r t s ] ) −>
{Phis1 , A s s e r t s 1 } = a n a l y z e ( Phis , A s s e r t s ) ,
{ [ Phi | P h i s 1 ] , [ A s s e r t | A s s e r t s 1 ] } .
45
46
47
ptong ( Phi ) −> b a s e : putStrLn ( ” A s s e r t i o n v i o l a t e d : ”++showLTL ( Phi ) ) ,
e x i t ( −1).
Die Funktion analyze löscht erfüllte und wiederlegte Formeln aus unseren Listenstrukturen. Im Fehlerfall gibt sie außerdem eine entsprechende Fehlermeldung mittels ptong/1 aus. Abschließend geben
wir dem Benutzer mit der Funktion status noch die Möglichkeit alle noch nicht entschiedenen Formeln
auszugeben.
Als Beispiel für die Benutzung der LTL-Bibliothek betrachten wir den wechselseitigen Ausschluss
zweier Prozesse. Auf Grund des Kommunikationsmodells von Erlang ist es nicht so einfach, überhaupt
einen kritischen Bereich zu definieren, in dem sich zwei Prozesse befinden. Wir verwenden hierzu einen
Speicher(-Prozess), auf dem zwei Prozesse nebenläufig Veränderungen vornehmen:
Listing 45: LTL-Implementierung in Erlang
1
2
−module( c r i t i c a l ) .
−export ( [ s t a r t /0 ] ) .
3
4
5
6
7
8
s t a r t ( ) −> t r y l t l : s t a r t ( ) catch
−> 42 end , \% F a l l s LTL−S e r v e r schon l a e u f t
l t l : a s s e r t ( l t l : g ( l t l : neg ( l t l : c o n j ( l t l : prop ( c s 1 ) , l t l : prop ( c s 2 ) ) ) ) ) ,
S = spawn( fun ( ) −> s t o r e ( 4 2 ) end ) ,
spawn( fun ( ) −> i n c ( S ) end ) ,
dec ( S ) .
9
10
11
12
13
s t o r e (V) −> receive
{ lookup , P} −> P!V, s t o r e (V ) ;
{ s e t , V1}
−> s t o r e (V1)
end .
14
15
16
17
18
19
20
i n c ( S ) −> S ! { lookup , s e l f ( ) } ,
receive
V −> l t l : newProp ( c s 1 ) , S ! { s e t ,V+1}
end ,
l t l : releaseProp ( cs1 ) ,
inc (S ) .
21
22
23
dec ( S ) −> S ! { lookup , s e l f ( ) } ,
receive
51
24
25
26
27
V −> l t l : newProp ( c s 2 ) , S ! { s e t , V−1}
end ,
l t l : releaseProp ( cs2 ) ,
dec ( S ) .
In Abhängigkeit des Schedulings wird die angegebene Eigenschaft nach ein paar Schritten widerlegt.
Bei der bisherigen Implementierung haben wir LTL zum Testen von Systemen eingesetzt. Hierbei
ergibt sich allerdings das Problem, dass einige Formeln nie als falsch (z. B. F ϕ) bzw. andere nie
als korrekt (z. B. Gϕ) beweisen werden können. Besonders problematisch sind somit Eigenschaften,
welche F ∞ oder G∞ verwenden. Sie können durch Testen weder widerlegt noch gezeigt werden.
Möglichkeiten hiermit umzugehen, ist z. B. obere Schranken einzubauen, nach wie vielen Schritten
(Abwicklungen) bzw. für wie viele Schritte ein F oder G gelten soll. Außerdem kann es sinnvoll sein,
zu protokollieren, wie oft F bzw. G bereits abgewicklet wurden, um einen Eindruck zu bekommen,
wie häufig die Bedingung einer Formel schon getestet wurde. Entsprechende Implementierung werden
in den Übungen besprochen.
9.2 Verifikation
Ursprünglich wurde LTL aber zur formalen Verifikation entwickelt. Betrachtet man endliche Kripke
Strukturen ist es nämlich sehr wohl möglich, Eigenschaften zu beweisen bzw. zu wiederlegen: LTL ist
für endliche Kripke Strukturen entscheidbar. Das Verfahren, mit dem dieses entschieden wird, nennt
man Model Checking, näheres hierzu findet man entsprechenden Theorievorlesungen.
Beim Model Checking geht man in der Regel davon aus, dasss man ein System (insb. sein Protokoll)
mit einer (wenig ausdrucksstarken) Spezifikationsprache beschreibt. Deren Semantik definiert eine
endliche Kripke Struktur, welche dann zur Überprüfung von Eigenschaften (LTL) verwendet werden
kann. Ein entsprechendes Tool zur Spezifikation/Verifikation mittels Model Checking ist z.B. Spin für
die Sprache Promela und LTL. Ähnliche Tools existieren für Prozessalgebren, wie den Calculus of
Communicating Systems (CCS) und modale Logiken (CTL, CTL∗ oder den modalen µ-Kalkül).
Hierbei ist die Idee, dass man zunächst die Spezifikation erstellt und gegen spezifizierte Eigenschaften
verifiziert. Danach wird diese dann schrittweise zum tatsächlichen System verfeinert. Als Konsequenz
wird die Kommunikation dann aber in der Regel nur wenig von konkreten Werten abhängen dürfen,
da dies auf Ebende der Spezifikation nicht ausgedrückt werden kann. Spezifikationssprachen bieten in
der Regel nur über einen sehr eingeschränkten Wertebereich, Datenstrukturen stehen gar nicht zur
Verfügung, da sonst die Semantik keine endliche/kleine Kripke Struktur bilden würde.
Ein andere möglicher Ansatz zur formalen Verifikation ist der umgekehrte Weg. Ausgehend von einem
existierenden System, versucht man durch Abstraktion eine endliche Kripke Struktur zu generieren.
Die Idee ist hierbei, dass sämtliche Pfade des echten Systems auch in dem abstrahierten endlichen
System enthalten sind. Hierzu kann man das Programm anstatt über konkreten Weten über abstrakten Werten ausführen. Bei Verzweigungen, welche dann über den abstrakten Werten nicht mehr
entschieden werden können, muss man dann um tatsächlich alle möglichen Pfade zu repräsentieren
zusätzlichen Nichtdeterminismus verwenden.
Da LTL ja auf allen Pfaden eines Systems gelten muss und die Abstraktion eine endliche Kripke
Struktur liefern kann, welche alle Pfade der tatsächlichen Semantik enthält, können wir LTL-Formeln
mittels Model Checking entscheiden. Hierbei müssen wir aber beachten, dass für den Fall, dass eine
LTL-Eigenschaft nicht gilt, zwei mögliche Fälle berücksichtigt werden müssen:
• Der Pfad, welcher ein Gegenbeispiel darstellt, existiert auch in der konkreten Semantik und zeigt
tatsächlich einen Fehler an.
• Der Pfad existiert in der tatsächlichen Semantik nicht, da eine Programm-Verzweigung auf
Grund der Abstraktion nicht korrekt entschieden wurde. In diesem Fall, kann man durch ein
Verfeinerung der Abstraktion versuchen, diese Verzweigung korrekt zu entscheiden, was aber in
der Regel zu einer Vergrößerung des Zustandsraums der abstrakten, endlichen Kripke-Struktur
führen wird.
In der Regel wird dieser Ansatz auch genau in den Fällen erfolgreich sein, in denen auch eine formale
Spezifikation möglich gewesen wäre, bei der genau die entsprechenden abstrakten Werte verwendet
worden wären, d.h. in Fällen, in denen das Systemverhalten, insbesondere die Kommunikation, wenig
von konkreten Werten abhängen.
52
9.3 Simulation einer Turing-Maschine
Die formale Verifikation von Programmen ist schwierig, da fast alle interessanten Programmeigenschaften unentscheidbar sind. Dies gilt bereits für sequentielle Programme, welche mit mehr als zwei
Zahlenwerten rechnen können oder dynamische Datenstrukturen verwenden können.
In diesem Kapitel wollen wir untersuchen, in wie weit auch die Erweiterung einer Programmiersprache
um nebenläufige Konzepte die Entscheidung interessanter eigenschaften, wie z.B. der Deadlockfreiheit,
unentscheidbar macht. Wir simulieren hierzu allein unter Verwendung einer endlichen Anzahl von
Atomen und nur drei Prozessen eine Turingmaschine.
Die Idee der Implementierung beruht auf der Darstellung des Bandes der Turing Maschine mit Hilfe
zweier Stacks. Beim bewegen des Kontrollkopfes, wird der oberste Wert des einen Stacks auf den
anderen Stack verschoben. Die Stacks werden jeweils durch einen Thread implementiert.
Ein Thread, welcher die Datenstruktur eines Stacks implementiert, kann wie folgt konstruiert werden:
Listing 46: Stack
1
2
3
4
5
6
s t a c k (P) −> r e c e i v e
pop −> pop ;
X
−> s t a c k (P) ,
P ! X,
s t a c k (P)
end .
Falls der Stack einen pop Nachricht erhält, tterminiertër, d.h. der darunter liegende Wert wird als
Nachricht an einen Kontrollprozess verschickt. Alle anderen Nachrichten werden als push interpretiert
und mit Hilfe des Laufzeitkellers gespeichert.
Die Funktione push und pop können dann einfach wie folgt definiert werden:
Listing 47: Stackoperationen
1
push ( St ,V) −> St !V.
2
3
4
5
6
pop ( St ) −> St ! pop ,
receive
X −> X
end .
Bei der Verwendung eines Stacks zur Simulation einer Turing Maschine ist aber noch wichtig, dass
auch über einen leeren Keller hinaus gelesen werden kann. Das Band enthält hier ja beliebig viele
Blanks. Diese Blanks können aber einfach beim pop auf einem leeren Stack geliefert werden:
Listing 48: Blank Stack
1
2
3
b l a n k S t a c k (P) −> s t a c k (P) ,
P ! blank ,
b l a n k S t a c k (P ) .
Nun kann eine Turing Maschine einfach wie folgt definiert werden: Sei M = hQ, Γ, δ, q0 , F i eine
deterministische Turing Maschine mit der Übergangsfunktin δ : Q \ F × Γ −→ Q × Γ × {l, r}. Dann
ist kann der Kontrollprozess wie folgt als Erlangfunktion ausgedrückt werden:
Für all q ∈ F , verwende Regel der Form:
delta(SL,SR,q,_) -> pop.
Für alle q ∈ Q\F mit δ(q, a) = (p, b, r):
delta(SL,SR,q,a) -> push(SL,b),
A = pop(SR),
delta(SL,SR,p,A).
und für alle q ∈ Q \ F mit δ(q, a) = (p, b, l):
53
delta(SL,SR,q,a) -> push(SR,b),
A = pop(SL),
delta(SL,SR,p,A).
Die Turing Maschine kann dann wie folgt gestartet werden:
start() -> SL=spawn(blankStack,[self()]),
SR=spawn(blankStack,[self()]),
writeInputToStack(SR),
A = pop(SR),
delta(SL,SR,q0 ,A),
outputStack(SR).
Man beachte, dass diese Turing-Maschinen-Simulation nur sehr wenige Werte verwendet. Außer dem
Bandalphabet Γ und der Zustandsmenge Q, sind dies das Atom pop und die Pid des Hauptprozesses.
Außerdem werden drei Threads verwendet. Es ist sogar möglich, auf noch einen weiteren Thread zu
verzichten. Dies ist jedoch technisch aufwändiger (Übung).
Entsprechend kann man auch mit endrekursiven Erlangprogrammen, die nur endlich viele Werte verwenden, aber beliebig viele Prozesse erlauben, eine Turingmaschine simulieren (Übung). Eine weitere
Möglichkeit ist die Verwendung eines einzelnen Prozesses und beliebig vielen Werten in der Mailbox,
bzw. dynamischen Datenstrukturen.
Durch die Simulation der Turingmaschine können zeigen, dass das Model Checking Problem für die
jeweils betrachtenen Systeme im allgemeinen unentscheidbar ist.
10 Transaktionsbasierte Kommunikation
In diesem Kapitel werden wir uns noch einmal mit den Nachteilen der lockkbasierten Kommunikation
beschäftigen und einen alternativen lockfreien Ansatz kennen lernen. Da der Ansatz besonders vielversprechend in Haskell implementiert wurde, betrachten wir in diesem Kapitel zunächst insbesondere
Haskell.
10.1 Ein Bankbeispiel in Concurrent Haskell
Aufgabe: Modellierung von Konten und zugehörigen Operationen auf den Konten.
Idee: Repräsentation eines Kontos als MVar Int mit folgenden Operationen:
1
2
3
4
withdraw : : MVar I n t −> I n t −> IO ( )
withdraw a c c amount = do
b a l a n c e <− takeMVar a c c
putMVar a c c ( b a l a n c e − amount )
takeMVar lockt quasi die MVar und putMvar gibt sie wieder frei.
1
2
3
1
2
1
2
3
4
5
6
7
8
d e p o s i t : : MVar I n t −> I n t −> IO ( )
d e p o s i t a c c amount =
withdraw a c c (−amount )
b a l a n c e : : MVar I n t −> IO I n t
b a l a n c e a c c = readMVar a c c
limitedWithdraw : : MVar I n t −> I n t −> IO Bool
limitedWithdraw a c c amount = do
b <− b a l a n c e a c c
i f b >= amount
then do
withdraw a c c amount
return True
e l s e return F a l s e
54
Problem: Die atomare Ausführung ist nicht garantiert. Nach der balance-Abfrage kann ein anderer
Thread Geld abheben ⇒ Kontostand ist zwar immer richtig, kann aber negativ werden.
Lösung: Lock müsste über gesamte limitedWithdraw Aktion erhalten werden ⇒ keine Wiederverwendung von withdraw möglich.
In Java werden alle Account-Methoden als synchronized gekennzeichnet, so dass eine Wiederverwendung möglich ist (mehrfaches Nehmen des gleichen Locks ist möglich).
Weitere Operation: sicherer Transfer
1
2
3
4
5
6
7
8
9
10
11
12
l i m i t e d T r a n s f e r : : MVar I n t −> Mvar I n t −> I n t −> IO Bool
l i m i t e d T r a n s f e r from t o amount = do
b <− takeMVar from
i f b >= amount
then do
b <− takeMVar t o
putMVar from ( b − amount )
putMVar t o ( b + amount )
return True
e l s e do
putMVar from b
return F a l s e
Problem: Falls gleichzeitig von A nach B und von B nach A überwiesen wird, landen wir im Deadlock.
Hier hilft auch in Java das synchronized nicht, da unterschiedliche Locks/Objekte beteiligt sind.
Listing 49: Lösung
1
2
3
4
5
6
7
8
9
10
11
l i m i t e d T r a n s f e r from t o amount = do
b <− takeMVar from
i f b >= amount
then do
putMVar from ( b − amount )
b <− takeMVar t o
putMVar t o ( b + amount )
return True
e l s e do
putMVar from b
return F a l s e
Wobei dies zu einem inkonsistenten Zwischenzustand führt.
Eine andere Lösung wäre alle Locks zu Beginn der Aktion gemäß einer globalen Ordnung zu nehmen
(zuerst kleine und dann große Locks nehmen, also immer erst A locken und dann B).
1
2
3
4
5
6
7
i f from <
then do
b1 <−
b2 <−
e l s e do
b2 <−
b1 <−
to
takeMVar from
takeMvar t o
takeMVar t o
takeMVar from
Während der gesamten Aktion sollten dann keine weiteren Locks erlaubt sein ⇒ keine Kompositionalität.
10.1.1 Gefahren bei der Programmierung mit Locks
• Verwendung von zu wenig Locks ⇒ Inkonsistenzen
• Verwendung von zu vielen Locks ⇒ übermäßige Sequentialisierung (im besten Fall) und Deadlocks (im schlechtesten Fall)
• Verwendung falscher Locks, da oft der Zusammenhang zwischen Lock und zu sichernden Daten
fehlt (bei Verwendung zusätzlicher Locks/Lockobjekte in Java) ⇒ Konsequenzen wie zuvor
• Nehmen von Locks in falscher Reihenfolge (Race Condition) ⇒ Deadlock
55
• Robustheit, da beim Absturz von Teilkomponenten Locks eventuell nicht mehr freigegeben werden und inkonsistente Systemzustände zurück bleiben (z. B. Geld wird vernichtet)
• Vergessen der Lockfreigabe ⇒ Deadlock
10.2 Transaktionsbasierte Kommunikation (in Concurrent Haskell als anderen
Ansatz)
Transaktionen sind ein bekanntes Konzept bei Datenbanken zur atomaren Ausführung komplexer
Datenbankanfragen und -modifikationen. Diese Idee kann auch zur nebenläufigen Programmierung
verwendet werden.
Die modifizierbaren Variablen entsprechen der Datenbank und die Operationen entsprechen der Transaktion.
Transaktionen beeinflussen die Welt nur, wenn sie erfolgreich waren. Sie werden in einer eigenen Monade ST M (Software Transactional Memory) definiert.
Das atomare Ausführen einer Transaktion in der Welt geschieht mittels einer Funktion
atomically :: STM a -> IO a. Im Gegensatz zu Datenbanken wird eine ST M immer wieder ausgeführt, bis sie erfolgreich war.
Zur Kommunikation (anstelle der MVars) dienen spezielle Transaktionsvariablen:
1
data TVar a −− abstract
2
3
4
5
newTVar
:: a
−> STM ( TVar a )
readTVar
: : TVar a
−> STM a
writeTVar : : TVar a −> a −> STM ( )
Beachte: Im Gegensatz zu MVars sind TVars niemals leer. Es gibt keinen Lock!
Listing 50: Bankbeispiel
1
ty pe Account = TVar I n t
2
3
4
b a l a n c e : : Account −> STM I n t
b a l a n c e a c c = readTVar a c c
5
6
7
8
9
1
2
3
1
2
3
4
5
6
7
8
withdraw : : Account −> I n t −> STM ( )
withdraw a c c amount = do
b <− b a l a n c e a c c
writeTVar a c c ( b − amount )
d e p o s i t : : Account −> I n t −> STM( )
d e p o s i t a c c amount =
withdraw a c c (−amount )
limitedWithdraw : : Account −> I n t −> STM Bool
limitedWithdraw a c c amount = do
b <− b a l a n c e a c c
i f b >= amount
then do
withdraw a c c amount
return True
e l s e return F a l s e
Fassen wir nun noch einmal das STM-Interface von Haskell zusammen:
Atomare Ausführung einer Transaktion: atomically :: STM a -> IO a
TVars als Kommunikationsabstraktion, welche im Rahmen von Transaktionen verändert werden können:
data TVar a ist abstrakt
Erstellen, Lesen und Verändern von TVar-Inhalten:
newTVar
:: a
-> STM (TVar a)
56
readTVar :: TVar a
-> STM a
writeTVar :: TVar a -> a -> STM ()
10.2.1 Beispielprogramm
Listing 51: Beispielprogramm
1
2
3
4
5
6
7
8
9
10
11
12
main : : IO ( )
main = do
a c c 1 <− a t o m i c a l l y ( newTVar 1 0 0 )
a c c 2 <− a t o m i c a l l y ( newTVar 1 0 0 )
f o r k I O ( do
b <− a t o m i c a l l y ( limitedWithdraw a c c 1 6 0 )
i f b then return ( )
e l s e putStrLn ”Du b i s t p l e i t e ! ”
)
a t o m i c a l l y ( t r a n s f e r acc1 acc2 50)
b <− a t o m i c a l l y ( b a l a n c e a c c 1 )
print b
10.3 Synchronisation mit Transaktionen
Als Nächstes wollen wir untersuchen, in wie weit Transaktionen auch geeignet sind, Synchronisationsprobleme zu lösen. Hierzu betrachten wir wieder die dinierenden Philosophen und versuchen eine
STM-basierte Implementierung zu entwickeln.
Zunächst müssen wir uns überlegen, wie die Stäbchen repräsentiert werden können. Da TVars (im
Gegensatz zu MVars) nicht leer sein können, benötigen wir eine TVar, welche boolesche Werte aufnehmen kann. Hierbei bedeutet der Wert True, dasss der Stab verfügbar ist und False, dass er bereits
vergeben ist.
Listing 52: Dinierende Philosophen
1
ty pe S t i c k = TVar Bool
// True = l i e g t a u f dem T i s c h ( i n i t i a l )
Als Nächstes können wir versuchen Funktionen zum Aufnehmen bzw. Zurücklegen des Stäbchens zu
definieren. Während putStick sehr einfach definiert werden kann, erreichen wir bei der Definition von
takeStick einen Punkt, an dem wir nicht weiter wissen:
Listing 53: Dinierende Philosophen
1
2
3
p u t S t i c k : : S t i c k −> STM ( )
p u t S t i c k s = do
writeTVar s True
4
5
6
7
8
9
10
t a k e S t i c k : : S t i c k −> STM ( )
t a k e S t i c k s = do
b <− readTVar s
i f b then do
writeTVar s F a l s e
e l s e ???
// T r a n s a k t i o n e r n e u t v e r s u c h e n
Falls takeStick ausgeführt wird, wenn die TVar den Wert False enthält, kann die gesamte Transaktion
nicht mehr erfolgreich werden. An diesem Punkt hätte der lockbasierte Ansatz gewartet (suspendiert),
bis ein anderer Prozess den Stab zurück legt.
Denkt man in Transaktionen erkennen wir als Anwender der STM-Bibliothek an dieser Stelle, dass
die Transaktion so insgesamt nicht erfolgreich ist, was wir durch Aufruf der Funktion
retry ::
STM ()
realisieren können. Bei Aufruf von retry wird die Transaktion erfolglos abgebrochen und erneut gestartet. Inwieweit hierbei tatsächlich Busy-Waiting notwendig ist, werden wir später klären. Zunächst
besagt die Ausführung von retry, dass die Transaktion an dieser Stelle fehlgeschlagen ist:
57
Listing 54: Dinierende Philosophen
1
2
3
4
5
6
t a k e S t i c k : : S t i c k −> STM( )
t a k e S t i c k s = do
b <− readTVar s
i f b then do
writeTVar s F a l s e
else retry
// T r a n s a k t i o n e r n e u t v e r s u c h e n
Nun müssen wir nur noch die Philosophen hinzufügen:
Listing 55: Dinierende Philosophen
1
2
3
4
5
6
7
8
9
10
p h i l : : S t i c k −> S t i c k −> IO ( )
p h i l l r = do
// denken
atomically ( takeStick l )
atomically ( takeStick r )
// E a t i n g
a t o m i c a l l y ( do
putStick l
putStick r )
phil l r
Die einzelnen Transaktionen werden mittels atomically ausgeführt. Hierbei fügen wir beide putStick
Aktionen zusammen aus. Unter Hinzunahme einer geeigneten Startfunktion für Sticks und Philosophen, läuft das Programm schon sehr schnell nach seinem Start in einen Deadlock. Diesesmal können
wir den Deadlock aber sehr viel einfacher vermeiden, als zuvor. Wir müssen einfach nur fordern, dass
beide Stäbchen atomar, also innerhalb einer Transaktion, genommen werden. Hierzu komponieren wir
beide sequentiell auf STM-Ebene und verwenden nur noch einen atomically Aufruf:
// Think
atomically(do
takeStick l
takeStick r)
Nun läuft das Programm nicht mehr in die Deadlock-Situation. Ein Philosoph versucht, atomar beide
Stäbchen zu nehmen. Ist dies nicht möglich, wird die gesamte Transaktion neu gestartet. Der Zustand,
in dem also das linke Stäbchen aufgenommen wurde, aber das zweit nicht verfügbar ist, wird außerhalb
der Transition nicht mehr sichtbar.
58
Herunterladen