6 Interaktion über Nachrichten Motivation: Prozesse haben keinen gemeinsamen Speicher, in dem sie gemeinsame Objekte unterbringen könnten z.B. Parallelrechner mit verteiltem Speicher (distributed-memory parallel computer) (auch „Mehrrechnersystem“ – multi-computer) Rechnernetz (computer network) mit enger Kopplung (cluster) oder LAN oder WAN Prozesse in verschiedenen Rechnern müssen über Nachrichten (messages) interagieren, „kommunizieren“ (Nachrichtenaustausch, message passing, Interprozeßkommunikation, inter-process communication, IPC) Prozesse in verschiedenen Rechnern müssen über Nachrichten (messages) interagieren, „kommunizieren“ (Nachrichtenaustausch, message passing, Interprozeßkommunikation, inter-process communication, IPC) Kommunikationsnetz (communication network) ist die HW/SW-Infrastruktur für die Nachrichtenübertragung Rechner Kommunikationsnetz Aber: Nachrichtenbasierte Interaktion kann auch innerhalb eines Rechners sinnvoll sein, z.B. zwischen Prozessen mit disjunkten Adreßräumen Def.: Verteiltes System (distributed system) = System von Prozessoren, Prozessen, Threads, ..., die mangels gemeinsamen Speichers nicht über Datenobjekte, sondern über Nachrichten interagieren (Gegensatz: zentralisiertes System) Beachte: Die Klassifikation eines Systems als verteilt oder zentralisiert ist u.U. abhängig von der Abstraktionsebene, auf der das System betrachtet wird (Beispiel: System lokal kommunizierender Prozesse) ( LV Verteilte Systeme) Prozesse kommunizieren unter Verwendung von Kommunikationsoperationen (communication primitives) send – Nachricht versenden (auch write, ...); die Nachrichtenquelle heißt Sender (sender) oder Produzent (producer) recv – Nachricht entgegennehmen (auch read, ...); die Nachrichtensenke heißt Empfänger (receiver), Verbraucher (consumer) in vielen möglichen Varianten (s.u.) send/recv beziehen sich auf 1. die Nachricht, 2. den Empfänger/Sender – entweder direkt oder indirekt über einen Kanal (channel), der sich wie ein Puffer (3) verhält [auch bei „direkt“ gibt es einen verborgenen Kanal, der dem Empfänger zugeordnet ist und aus dem nur dieser liest (mailbox) ] Asynchrone Nachrichtenübertragung wenn Pufferungskapazität > 0 Synchrone Nachrichtenübertragung wenn Pufferungskapazität = 0 Direkte Zustellung kann an einen oder mehrere Empfänger gehen: Einfachsendung (unicast) oder Rundsendung (multicast oder broadcast) Direkte Zustellung kann an einen oder mehrere Empfänger gehen: Einfachsendung (unicast) oder Rundsendung (multicast oder broadcast) Indirekte Zustellung über Kanal kann mit einem oder mehreren Sendern bzw. Empfängern verbunden sein: Disjunktives Warten (selective wait) – nicht Java beim Empfangen von mehreren Kanälen: SELECT { message = channelA.recv(); ... break; OR message = channelB.recv(); ... break; OR ... } wartet, bis bei mindestens einem der Kanäle eine Nachricht anliegt, führt dann (nichtdeterministisch) eines der möglichen recv und die darauf folgenden Anweisungen aus. Softwaretechnische Klassifizierung : Datenfluß-Architektur (dataflow) (6.1): Filter (filter) wandelt Eingabedaten in Ausgabedaten, weiß nichts von seiner Umgebung Dienst-Architektur (client/server) (6.2): Klienten (clients) beauftragen Dienstanbieter (servers), erwarten bestimmte Funktionalität und Antwort Ereignisbasierte Systeme (event-based systems) (6.3): Abonnenten (subscribers) sind an bestimmten Ereignissen interessiert, über deren Eintreten sie von den auslösenden Ereignisquellen (publishers) benachrichtigt werden Verteilte Algorithmen (distributed algorithms) (6.4): gleichberechtigte Partner (peers) kooperieren, setzen vereinbartes Verhalten voraus 6.1 Datenfluß-Architekturen Prozeß arbeitet als Filter (filter), d.h. empfängt Daten, evtl. über verschiedene Kanäle, berechnet daraus neue Daten, versendet diese, evtl. über verschiedene Kanäle. Nicht notwendig, aber typisch ist: Kommunikationspartner bzw. -kanäle sind formale Parameter des Prozeß-Codes: Ports (ports) Parameter ist entweder Eingabe- oder Ausgabekanal (input port, output port) Beliebte graphische Darstellung: gerichteter Graph mit Filtern als Ecken und Kanälen als Kanten, d.h. genau 1 Sender/Empfänger je Kanal Beispiel 1: Partialsummen von Skalarprodukt: 0 * + Beliebte graphische Darstellung: gerichteter Graph mit Filtern als Ecken und Kanälen als Kanten, d.h. genau 1 Sender/Empfänger je Kanal Beispiel 1: Partialsummen von Skalarprodukt: 0 + * Beispiel 2: Verschmelzen sortierter Zahlenfolgen: merge merge 6.1.1 Imperative Formulierung Kanaltyp repräsentiert durch generische Schnittstelle interface Channel<Message> { void send(Message m); Message recv(); } Prozesse erzeugen, die mit Kanälen parametrisiert sind (mehrere Sender/Empfänger je Kanal möglich!) Beispiel (s.o.): Verschmelzen dreier Zahlenfolgen void merge3(Channel<Integer> Channel<Integer> Channel<Integer> Channel<Integer> in1, in2, in3, out) { Channel<Integer> ch = new ChImp(); FORK merge(in1, in2, ch); FORK merge(ch, in3, out); } mit FORK wie in 2.1.2 und merge wie folgt: void merge(Channel<Integer> in1, Channel<Integer> in2, Channel<Integer> out) { int one = (in1.recv()).intValue(); int two = (in2.recv()).intValue(); while(true) { if(one < two) { out.send(new Integer(one)); one = (in1.recv()).intValue();} else {out.send(new Integer(two)); two = (in2.recv()).intValue();} } } arbeitet nichtterminierend auf unendlichen Eingabefolgen! (Übung: endliche Folgen mit null als Ende-Markierung) Ohne Objektorientierung, Kommunikation als Sprachkonzept: z.B. CSP (Hoare 1978) Occam (Inmos Ltd. 1983, für Transputer) Pascal-FC (Burns/Davies 1993, für Ausbildung) Senden: ChannelExpr ! MessageExpr Empfangen: ChannelExpr ? MessageVar Nebenläufigkeitsanweisung: PAR process1 process2 ... Einfaches Beispiel Produzent und Verbraucher: CHAN ch: PAR REAL64 x: SEQ produce x ch ! x continue REAL64 y: SEQ prepare ch ? y consume y Beachte: Kanäle und Semaphore sind gleich mächtig: Kanäle können mit Semaphoren implementiert werden (3): class Buffer<T> implements Channel<T> {...} Semaphore können mit Kanälen implementiert werden: class ChanSema implements Semaphore { Channel<Object> c = new Ch<Object>(); public void P() {c.recv();} public void V() {c.send(null);} public ChanSema(int init){ for(int i=0; i<init; i++) V(); } } 6.1.2 Deklarative Formulierung in hypothetischer Datenflußsprache (dataflow language) Kanäle benennen Prozesse deklarieren, auf Kanäle bezogen (mehrere Sender/Empfänger je Kanal möglich!) ! Es wird vorausgesetzt, daß Elementarbausteine wie *, +, merge, ... fertig vorliegen (als Sprachelemente oder in Bibliothek) Beispiel Partialsummen: sums(in chan float a, in chan float b, out chan float s) channels float product, float sum, float back(0) filters end sums. times(a,b,product), plus(product,back,sum), copy(sum,back,s) Beispiel Verschmelzen: merge3(in in in out chan chan chan chan int int int int in1, in2, in3, res) channels int ch filters end merge3. merge(in1,in2,ch), merge(ch,in3,res) 6.1.3 Pipes und Pipelines in Unix Ein interaktives Programm mit einfacher, textbasierter Benutzer-Interaktion liest von Standard-Eingabe (standard input) – hinter der sich die Tastatur verbirgt (in Java durch Benutzung von System.in), schreibt nach Standard-Ausgabe (standard output) – hinter der sich der Bildschirm verbirgt (in Java durch Benutzung von System.out) Typische Systemaufrufe (system calls) für Ein/Ausgabe: read (portNumber, bufferAddress, byteCount) write(portNumber, bufferAddress, byteCount) In Unix: Lesen von Standard-Eingabe: read(0,b,c) Schreiben nach Standard-Ausgabe: write(1,b,c) ! 0, 1 sind implizite formale Parameter des Programms, die bei Programmstart mit bestimmten Kanälen aktualisiert werden ! read entspricht recv, write entspricht send. Pipeline, Beispiel: in der Unix-Befehlssprache (Shell) formuliert, ist einfaches, lineares Datenflußsystem in deklarativer Formulierung (2.1.2) ls ls | grep .pdf grep .pdf | wc –l liefert Anzahl der PDF-Dateien im aktuellen Verzeichnis wc –l ! Tiefsinnige Bedeutung von | : Trennzeichen zwischen 2 Prozessen, Kanal, der den Ausgabe-Port 1 des Senders mit dem Eingabe-Port 0 des Empfängers verbindet (nur 1 Sender/Empfänger je Kanal möglich!) Als Kanal fungiert eine Pipe ( = „Rohr“), die mit einem speziellen Systemaufruf eingerichtet werden kann. Befehlsinterpretierer richtet die benötigten Pipes ein und startet die Prozesse mit entsprechend aktualiserten Ports. ( VL Betriebssysteme) Datenflußarchitekturen werden daher häufig auch als pipe-and-filter architectures bezeichnet. 6.1.4 Funktionale Formulierung Zur Erinnerung: Bei Auswertung eines Funktionsaufrufs Argumentübergabe entweder als Wert (sofortige Auswertung, eager evaluation, applicative order evaluation, data-driven) oder als Ausdruck (verzögerte Auswertung, lazy evaluation, normal order evaluation, demand-driven) Haskell praktiziert verzögerte Auswertung und erlaubt damit nichtstrikte Funktionen Verschränkung von Aufbau/Verarbeitung von Listen unendliche Listen Verzögert ausgewertete Listen heißen auch Ströme (streams) und die damit arbeitenden Funktionen stromverarbeitende Funktionen (stream-processing f.) Beispiel 1: Zahlenfolge durch Quadratwurzeln ersetzen roots :: Floating a => [a] -> [a] roots = map sqrt oder mit expliziter Rekursion: roots(h:t) = sqrt h : (roots t) Anwendung z.B. roots [3, 4, 5/0, 6] liefert [1.73205, 2.0, ... (Fehlermeldung) 6.1.4.1 Interaktive Benutzung Noch prägnantere Anwendung: Liste wird vom Benutzer stückweise eingegeben (in Miranda, nicht in Haskell!) roots $+ [ 3 ,1.73205 4 ,2.0 ^D ... (Fehlermeldung weil roots[] nicht definiert) Beispiel 2: wie Beispiel 1, aber vorher negative Zahlen streichen saferoots stream = roots(map abs stream) oder saferoots = map sqrt . map abs Bildlich: sqrt abs Datenfluß-Architektur ! Filter = Applikation einer stromverarbeitenden Funktion Port = formales Argument/Ergebnis mit Listentyp Kanal = (unsichtbar, Kapazität 0) Implementierung koroutinenartig, keine echte Nichtsequentialität 6.1.4.2 Stromverarbeitende Funktion als Unix-Filter Sei f :: String -> String Skript prog.hs enthalte ... main = interact f ... Dann ist runhugs prog ein Unix-Filter, der eine Bytefolge stream in eine Bytefolge f stream umwandelt Achtung bei interaktivem Test: die Eingabe wird nicht geechot ! Beispiel – vgl. mit obigem roots : roots = unlines. (map show). lines (map read). (map sqrt). main = interact roots Merke also: Unix Pipelines erlauben in einfacher Weise, Software zu entwickeln, die sowohl imperativen als auch funktionalen Code enthält 6.1.4.3 Mehrfache E/A-Ströme merge :: Ord a => [a] -> [a] -> [a] merge(x:xs)(y:ys) | x = y | x <= y | x >= y –- duplicates ignored ! = x : merge xs ys = x : merge xs (y:ys) = y : merge(x:xs) ys merge [] l = l merge l [] = l Damit z.B. (6.1.1) merge3 a b c = merge(merge a b)c 2 Eingabe/Ausgabeströme bei einfachem Vergleichselement: x y min(x,y) max(x,y) liest 2 Zahlen an den Eingabe-Ports, schreibt 2 Zahlen an den Ausgabe-Ports sort2 :: Ord a => [a] -> [a] -> ([a],[a]) sort2(x:xs)(y:ys) = ((min x y) : mins, (max x y) : maxs) where (mins,maxs) = sort2 xs ys sort2[][] = ([],[]) Damit z.B. folgendes Sortiernetz: sort3 in1 in2 in3 = (out1,out2,out3) where (a,b) = sort2 in1 in2 (c,out3) = sort2 b in3 (out1,out2) = sort2 a c 6.1.4.4 Rundsendungen und Rückkoppelung Beachte: eine Nachricht erreicht u.U. als Rundsendung mehrere Empfänger – nämlich falls mehrfach auf einen Strom Bezug genommen wird ! Beispiel: Variante von roots: Wurzelberechnung wie in diesem Beispiel: 4 –25 16 ... (4,2) (25,5) (16,4) ... abs (,) sqrt wurzeln x = zip y (map sqrt y) where y = map abs x Auch zyklische Datenflüsse sind möglich ! Beispiel Partialsummen von Skalarprodukt (s.o.): 0 a b * + c sums a b = c where x = times a b c = plus x (0:c) times a b = map2 (*) a b plus a b = map2 (+) a b Beispiel Hamming-Problem: gesucht ist Zahlenfolge, die mit 1 beginnt, mit x auch die Zahlen 2x, 3x, 5x enthält, und nur solche. 12345689... Beispiel Hamming-Problem: gesucht ist Zahlenfolge, die mit 1 beginnt, mit x auch die Zahlen 2x, 3x, 5x enthält, und nur solche. 12345689... merge3 2* 3* 1 h 5* h = 1 : merge3(map(2*)h)(map(3*)h)(map(5*)h) 6.1.5 Systolische Algorithmen = in Hardware gegossene Datenfluß-Algorithmen, mit getakteten, synchronen Parallelschritten Beispiele: Sortiernetze Matrix-Vektor-Multiplikation 6.1.5.1 Beispiel Sortiernetze Sortiernetze aus einfachen Vergleichselementen (6.1.4): x y min(x,y) max(x,y) liest 2 Zahlen an den Eingabe-Ports, schreibt 2 Zahlen an den Ausgabe-Ports 3 Zahlen sortieren: (keine Parallelarbeit) 3 Schritte 4 Zahlen sortieren: 4 Schritte 5 Zahlen sortieren: 5 Schritte 6 Zahlen sortieren: 6 Schritte odd-even transposition sort * n Zahlen sortieren: Maximaler Parallelitätsgrad: n div 2 O(n) Anzahl Bausteine: ½ n(n-1) O(n2) Anzahl Schritte: n (statt typisch O(n log n) ) O(n) amortisiert für k Zahlensätze bei Pipelining: (n+k-1)/k 1 für k * siehe Knuth (vol. 3) oder Cormen et al. 6.1.5.2 Beispiel Matrix-Vektor-Multiplikation Bauelement für einen Schritt des Skalarprodukts von Vektoren: a c‘ c b (b) (a) c‘ = c + a b Damit systolischer Algorithmus für C=A B mit spezieller Matrix: Bandmatrix der Breite k C A c1 c2 a11 a21 a12 a22 a23 c3 a31 a32 a33 a34 a42 a43 a44 a45 b4 a53 a54 . b5 . . . c4 c5 . . B b1 b2 b3 . a33 a23 a42 a32 a22 a12 a31 a21 a11 0 ci bi . . . b2 0 b1 0 0 0 0 a33 a23 a42 a32 a22 a12 a21 0 a11 0 0 0 ci bi a31 0 b1 a33 a42 a23 a32 a22 a12 0 a21 a11 b1 ci bi a31 b2 0 0 b1 a33 a42 a23 a32 0 a22 a11 b1+ a12 b2 a31 a21 b1 ci bi 0 b2 0 b1 a33 a42 a23 ci bi 0 b3 a32 a21 b1+ a22 b2 a31 b1 b2 0 1+ a33 a22 b2 + a23 b3 a42 a31 b1+ a32 b2 0 ci bi b3 b2 a34 a43 a31 b1+ a32 b2 + a33 b3 0 a42 b2 ci bi b4 b3 b2 + a33 b3 + a34 b4 a42 b2 + a43 b3 0 ci bi b4 b3 . . . usw. Vergleich mit Beispiel Sortiernetze: hier typischer Pipelining-Algorithmus k Funktionseinheiten bei Bandbreite k (auch k/2 möglich) für Ergebnisvektor der Länge n sind 2n + k Schritte erforderlich – im Gegensatz zu O(n k) Schritten bei sequentieller Berechnung