ConcurrentPaket

Werbung
Muster nebenläufiger Programmierung
concurrent Packet von Java
Alois Schütte
AOSD
1
Concurrent Packet
In diesem Teil der Veranstaltung werde Muster nebenläufiger Programmierung diskutiert.
Dazu wird das concurrent Packet von Java betrachtet.
Alois Schütte
AOSD
2
Concurrent Packet - Überblick
Standard-Mittel von Java bezogen auf Nebenläufigkeit sind:
• Threads mit dem Interface Runnable
• Synchronisations-Mechanismen synchronized, wait, notify und notifyAll.
Ab Java 5 sind im Paket java.util.concurrent Klassen für Standard-Muster der parallelen
Programmierung enthalten, z.B.:
• Locks
• Queues
• Thread-Pooling
• Scheduling
• Semaphore
• Exchanger
• CountDownLatch
• CyclicBarrier
Alois Schütte
AOSD
3
Concurrent Packet - Locks
Lockkonzept
• Ein Lock ist ein Mittel, um in multithreading Umgebungen den gemeinsamen Zugriff auf
Ressourcen zu koordinieren.
• Um eine Ressource nutzen zu können, muss ein Thread den zugehörigen Schlüssel
anfordern.
• Solange ein Thread den Schlüssel besitzt, kann kein anderer Thread die Ressource
verwenden, er muss warten.
• Der den Schlüssel besitzende Thread gibt ihn frei, daraufhin kann ein wartender Thread den
Schlüssel bekommen und die Ressource verwenden.
Dieses Lockkonzept könnte mit synchronized umgesetzt werden. Dabei hat man aber immer
die Blockstruktur als Einschränkung.
java.util.concurrent.locks beinhaltet Interfaces und Klassen für Locks.
Alois Schütte
AOSD
4
Concurrent Packet - Locks
Die Klasse TimeUnit wird im Zusammenhang mit Locks verwendet, um eine Zeitdauer in
SECONDS, MICROSECONDS, MILLISECONDS oder NANOSECONDS angeben zu können.
$ cat TimeUnit/MainClass.java
import static java.util.concurrent.TimeUnit.*;
public class MainClass extends Thread {
// This field is volatile because two different threads may access it
volatile boolean keepRunning = true;
public void run() {
while (keepRunning) {
long now = System.currentTimeMillis();
System.out.printf("%tr%n", now);
try { Thread.sleep(1000); // millisecs
} catch (InterruptedException e) {
return;
}
}
} // run
In der run-Methode wird die Methode sleep von Thread verwendet. Es wird eine Sekunde
geschlafen.
Alois Schütte
AOSD
5
Concurrent Packet - Locks
public void pleaseStop() {
keepRunning = false;
}
public static void main(String[] args) {
MainClass thread = new MainClass();
thread.start();
try { SECONDS.sleep(10); // = MILLISECONDS.sleep(10000)
} catch (InterruptedException ignore) { }
thread.pleaseStop();
} // main
}
$
Innerhalb main wird
• ein Thread gestartet, der die Uhrzeit jede Sekunde
ausgibt;
• SECONDS von TimeUnit verwendet, um das Programm
10 Sekunden lang laufen zu lassen.
Alois Schütte
AOSD
$ java MainClass
10:19:51 AM
10:19:52 AM
10:19:53 AM
10:19:54 AM
10:19:55 AM
10:19:56 AM
10:19:57 AM
10:19:58 AM
10:19:59 AM
10:20:00 AM
$
6
Concurrent Packet - Locks
Das Interface Lock spezifiziert das Verhalten von Lock-Objekten.
public interface Lock {
void lock();
void lockInterruptible() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws
InterruptedException
void unlock();
Condition newCondition();
// Erklärung später
}
•
•
•
•
•
lock wartet, bis der Objektschlüssel verfügbar ist und belegt ihn dann.
unlock gibt das Objekt frei.
lockInterruptible funktioniert wie lock, aber es wird eine Ausnahme geworfen, wenn
ein anderer Thread den Thread durch interrupt unterbricht.
tryLock liefert false, wenn der Objektschlüssel nicht verfügbar ist; ansonsten wird
derObjektschlüssel in Besitz genommen und true returniert.
tryLock(long, TimeUnit) funktioniert wie tryLock, aber es wird eine maximale
Zeitspanne gewartet, wenn das Objekt nicht verfügbar ist.
Alois Schütte
AOSD
7
Concurrent Packet – Locks - ReentrantLock
Die Klasse ReentrantLock implementiert die Schnittstelle Lock.
public class ReentrantLock implements Lock, Serializable {
public ReentrantLock(boolean fair);
public ReentrantLock;
// Methods of Lock
void lock();
void lockInterruptible() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws
InterruptedException
void unlock();
Condition newCondition();
// additional Methods
public boolean isFair();
public int getHoldCount();
public int getQueueLength();
public boolean isHeldByCurrentThread();
public boolean isLocked();
protected Thread getOwner();
protected Collection<Thread> getQueuedThreda();
}
Alois Schütte
AOSD
8
Concurrent Packet – Locks - ReentrantLock
•
Der Konstruktor kann die Angabe eine fair-Paramerters haben. Wenn mehrere Threads auf den
Lock warten, garantiert fair==true, dass der am längsten wartende Thread das Lock-Objekt
erhält.
•
isFair liefert den fair-Parameter des Konstruktors zurück.
•
Ein Lock enthält eine Zähler, der bei jedem lock inkrementiert, bei unlock dekrementiert wird.
Ein Thread kann also öfter lock aufrufen. getHoldCount liefert den Wert des Zählers.
•
getQueueLength returniert die Anzahl der auf einen Lock wartenden Threads.
•
isHeldByCurrentThread ist true, wenn der aufrufende Thread den Lock hält.
•
isLocked ist true, wenn irgendein Thread den Lock hält.
•
getOwner(), Collection<Thread> getQueuedThreads liefern den Besitzer und die wartenden
Threads.
Alois Schütte
AOSD
9
Concurrent Packet – Locks - ReentrantLock
Beispiel: Klasse Konto (Account), Geldabholer (Withdrawer als Thread)
Zunächst die Realisierung der Klasse Account mit synchronized.
$ cat ReentrantLock/synchronized/WithdrawApp.java
class Account {
private float balance;
public Account (float initialBalance) {
balance = initialBalance;
}
public synchronized float getBalance() {
return balance;
} // getBalance
public synchronized void withdraw( float amount) {
if (amount < 0 || balance < amount)
throw new IllegalArgumentException("withdraw: wrong amount "
+ amount);
try { Thread.sleep(1000); } catch (Exception e) {};
balance -= amount;
} // withdraw
} // Account
synchronized ist erforderlich, da ein Konto von mehreren Threads verwendet werden kann und
mindestens einer den Zustand per withdraw ändern kann.
Alois Schütte
AOSD
10
Concurrent Packet – Locks - ReentrantLock
Nun die Realisierung der Klasse Account mittels Locks.
• Die Blockstruktur von synchronized muss mittels lock und unlock nachgebildet werden:
import java.util.concurrent.locks.*;
private final ReentrantLock lock = new ReentrantLock(true);
lock.lock();
try {
...
}
finally {
lock.unlock();
}
•
•
•
Wichtig:
– Da im try-Block Ausnahmen auftreten können ist mittels finally sicherzustellen, dass
stets unlock aufgerufen wird!
– Nur so werden „gelockte“ Objekte immer freigegeben.
Die Verwendung von lock-unlock ist also aufwendiger, dafür aber universell: ein Thread kann
lock aufrufen, ein andere unlock
Soll anstelle einer Objektsperre eine Klassensperre deklariert werden, wird die Lock-Variable
als static definiert.
Alois Schütte
AOSD
11
Concurrent Packet – Locks - ReentrantLock
$ cat ReentrantLock/ReentrantLock/WithdrawApp.java
import java.util.concurrent.locks.*;
class Account {
private float balance;
private final ReentrantLock lock = new ReentrantLock(true);
public Account (float initialBalance) {
balance = initialBalance;
} // Account
public float getBalance() {
lock.lock();
try { return balance;
} finally {lock.unlock();}
} // getBalance
public void withdraw( float amount) {
lock.lock();
try {
if (amount < 0 || balance < amount)
throw new IllegalArgumentException("withdraw: wrong amount "
+ amount);
try { Thread.sleep(1000); } catch (Exception e) {};
balance -= amount;
} finally {lock.unlock();}
} // withdraw
} // Account
Alois Schütte
AOSD
12
Concurrent Packet – Locks - ReentrantLock
Was muss geändert werden, wenn Jenni und Hannah nicht gleichzeitig Geld abholen dürfen?
Idee:
Der erste Abholer hält den Lock, der zweite muss abgewiesen werden.
Lösung:
tryLock anstelle von lock
$ cat ReentrantLock/tryLock/WithdrawApp.java
public void withdraw( float amount ) {
if (lock.tryLock() == false) return;
try {
if (amount < 0 || balance < amount)
throw new IllegalArgumentException("withdraw: ...);
try { Thread.sleep(1000); } catch (Exception e) {};
balance -= amount;
} finally {
lock.unlock();
}
} // withdraw
Alois Schütte
AOSD
13
Concurrent Packet – Locks - Condition
Die Methode newCondition des Interface Lock liefert ein Condition-Objekt zurück. Genauer, ein
Objekt einer Klasse die die Schnittstelle Condition implementiert.
public interface Condition {
void await() throm InterruptedException;
void awaitUninterruptibly();
boolean await(long time Timeunit unit) throm InterruptedException;
long awaitNanos(long time) throm InterruptedException;
boolean awaitUntil(Date deadline) throm InterruptedException;
void signal();
void signalAll();
}
•
•
•
•
•
Die Methoden haben Ähnlichkeit zu wait und notify.
Eine Condition ist signalisiert oder nicht signalisiert. Sofort nach ihrer Erzeugung ist sie
signalisiert.
Ein await-Aufruf (≈wait) auf einer signalisierten Condition kehrt sofort zurück. Vor Rückkehr
von await wird die Condition in den nicht signalisierten Zustand versetzt.
signal (≈notify) versetzt eine Condition in den signalisierten Zustand, weckt also einen
wartenden Thread
signalAll (≈notifyAll) weckt alle auf die Condition wartenden Threads.
Alois Schütte
AOSD
14
Concurrent Packet – Locks - Condition
Beispiel: BoundedBuffer, zunächst mit synchronized.
$ cat Condition/BoundedBuffer/synchronized/BoundedBufferApp.java
class BoundedBuffer {
private float[] buffer;
private int first, last;
private int numberInBuffer = 0, size;
producer
BoundedBuffer(int length) {
size = length;
buffer = new float[size];
first = last = 0;
}
put
buffer
get
consumer
public synchronized void dumpBuffer() {
System.err.print("Buffer: "); // use err channel to log
for (int i=(first+1)%size, j=0; j<numberInBuffer; j++, i=(i+1)%size)
System.err.print(buffer[i] + " ");
System.err.println(" ");
}
•
•
float Puffer fester Grösse
dumpBuffer zum Debuggen des Puffers über stderr
Alois Schütte
AOSD
15
Concurrent Packet – Locks - Condition
Beispiel: BoundedBuffer, zunächst mit synchronized.
public synchronized void put(float item) throws InterruptedException {
while(numberInBuffer == size) wait();
last = (last+1)%size;
numberInBuffer++;
buffer[last] = item;
proddumpBuffer();
put
buffer get
ucer
notifyAll();
}
consumer
public synchronized float get() throws InterruptedException {
while(numberInBuffer == 0) wait();
first = (first+1)%size;
numberInBuffer--;
dumpBuffer();
notifyAll();
return buffer[first];
}
} // BoundedBuffer
•
•
•
Die Methoden put und get sind mittels synchronized synchronisiert.
last ist Einfügestelle.
von first wird gelesen.
Alois Schütte
AOSD
16
Concurrent Packet – Locks - Condition
Der Produzent verwendet die put-Methode:
class Producer extends Thread {
private BoundedBuffer buffer;
public Producer(BoundedBuffer b) {
buffer = b;
}
public void run() {
for(int i = 0; i < 100; i++) {
try {
buffer.put(i);
System.out.println("put " + i);
}
catch (InterruptedException ingnored) {};
}
}
} // Producer
Alois Schütte
AOSD
producer
put
buffer
get
consumer
17
Concurrent Packet – Locks - Condition
Wie kann man dies nun mittels Condition realisieren und wo sind die Vorteile?
$ cat Condition/BoundedBuffer/condition/BoundedBufferApp.java
class BoundedBuffer {
private float[] buffer;
private int first, last;
private int numberInBuffer = 0, size;
private ReentrantLock
lock
= new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
BoundedBuffer(int length) {
...
}
producer
put
buffer
get
consumer
public void dumpBuffer() {
...
}
•
•
lock ist ein ReentrantLock Objekt.
Es gibt zwei Condition Attribute, notFull und notEmpty für das Objekt lock.
Alois Schütte
AOSD
18
Concurrent Packet – Locks - Condition
put:
public void put(float item) throws InterruptedException {
lock.lock();
try {
while(numberInBuffer == size) notFull.await();
last = (last+1)%size;
numberInBuffer++;
buffer[last] = item;
proddumpBuffer();
put
ucer
notEmpty.signal();
} finally {
lock.unlock();
}
}
•
•
buffer
get
consumer
Wenn der Buffer voll ist, wird gewartet, bis eine Condition notFull signalisiert wird.
Nach dem Schreiben in den Buffer wird signaliert notEmpty.
Alois Schütte
AOSD
19
Concurrent Packet – Locks - Condition
get:
public float get() throws InterruptedException {
lock.lock();
try {
while(numberInBuffer == 0) notEmpty.await();
first = (first+1)%size;
numberInBuffer--;
dumpBuffer();
producer
notFull.signal();
return buffer[first];
} finally {
lock.unlock();
}
}
•
•
put
buffer
get
consumer
Wenn der Buffer leer ist, wird gewartet, bis eine Condition notEmpty signalisiert wird.
Nach dem Lesen des Buffer wird signaliert notFull.
Insgesamt ist man also mit Locks und Conditions flexibler, man kann unterschiedliche
Bedingungen signalisieren und so gezielt nur bestimmte Threads wecken (eben die die auf die
Condition warten).
Alois Schütte
AOSD
20
Concurrent Packet – Executor
Bisher gab es stets eine enge Verbindung, zwischen dem was ein Thread macht (definiert im
Runnable Objekt) und dem Thread selbst.
public class HelloWorld {
public static void main (String args [ ]) {
HelloWorldThread t = new HelloWorldThread ("Hello, World!");
new Thread(t).start();
// creation and starting a thread
}
}
class HelloWorldThread implements Runnable {
// task of a thread
private String str;
HelloWorldThread(String s) {
str = new String(s);
}
public void run ( ) {
System.out.println(str);
}
}
•
•
In main wird ein Runnable-Objekt t erzeugt (new). Dann muss es explizite gestartet werden.
HelloWorldThread definiert das runnable-Objekt.
In größeren Anwendungen macht es Sinn, strikt zwischen Thread-Management und
Anwendungsfunktionalität des Thread zu unterscheiden.
So sollte es auch möglich sein, dass ein Thread mehrere Aufgaben nacheinander ausführt, also
die "Threadhülse" mehrfach verwendet werden kann.
Alois Schütte
AOSD
21
Concurrent Packet – Executor
Objekte, die das Management von Threads übernehmen, werden Executor genannt. Es existieren
drei Schnittstellen für Executor:
• Executor erlaubt das Erzeugen und Ausführen von Threads
• ExecutorService ist ein Subinterface von Executor, das den Lebenszyklus von Thread
beeinflussen kann
• ScheduledExecutorService erlaubt das Definieren von zukünftigen oder
periodischen Abläufen
Executor hat eine Methode execute, mit der ein Thread erzeugt und gestartet werden kann.
Wenn r ein Runnable Objekt ist und e ein Executor, dann gilt:
e.execute(r); == new Thread(r)).start();
ExecutorService beinhaltet neben execute noch die Methode submit, die ebenfalls ein
Runnable-Objekt aufnehmen kann.
Die meisten der Executor-Schnittstellen-Implementieirungen benutzen Threadpools.
Alois Schütte
AOSD
22
Concurrent Packet – Executor
Die meisten Implementierungen der Executor-Schnittstellen benutzen Threadpools, die aus
Workerthreads bestehen.
Die Idee ist es, eine Menge von Workerthreads zu haben, die einmal erzeugt werden und
unterschiedliche Aufgaben im Verlauf der Zeit ausführen können.
Vorteil:
die Threaderzeugung geschieht nur einmal
Aufgaben
^
Threadpool
erledigte Aufgaben
Alternativ müsste für jede Aufgabe immer ein Thread erzeugt werden, dann gelöscht
werden, ein neuer Thread müsste erzeugt werden usw.
Es existieren unterschiedliche Arten von Threadpools. Hier sei eine stellvertretend
behandelt.
Bei einem fixedThreadpool gibt es eine feste Menge von Threads. Wenn mehrere
Aufgabe zu bearbeiten sind, als Threads verfügbar sind, werden sie in eine
Warteschlage eingereiht.
Alois Schütte
AOSD
23
Concurrent Packet – Executor - Threadpools
Beispiel: 4 Aufgaben mit je zwei Schritten durch einem Threadpool mit 2 Threads.
$ cat Executor/ThreadPool/Threadpool.java
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class Threadpool {
public static void main( String[] args ) {
^
Runnable r1 = new Runnable() {
public void run() {
System.out.println( "A1 " + Thread.currentThread() );
System.out.println( "A2 " + Thread.currentThread() );
}
};
Runnable r2 = new Runnable() {
public void run() {
System.out.println( "B1 " + Thread.currentThread() );
System.out.println( "B2 " + Thread.currentThread() );
}
};
•
•
r1 und r2 sind zwei Runnable-Objekte, die jeweils eine Aufgabe mit zwei Schritten
nacheinander erledigen.
r1 und r2 sollen nebenläufig ablaufen können.
Alois Schütte
AOSD
24
Concurrent Packet – Executor Executor - Threadpools
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute( r1 );
executor.execute( r2 );
try {Thread.sleep(5000);} catch (Exception e) {}
System.out.println();
^
executor.execute( r1 );
executor.execute( r2 );
executor.shutdown();
System.out.println( "Threads started, main ends\n" );
} // end main
A1 Thread[pool-1-thread-1,5,main]
} // end class Threadpool
B1 Thread[pool-1-thread-2,5,main]
$
A2 Thread[pool-1-thread-1,5,main]
B2 Thread[pool-1-thread-2,5,main]
•
•
•
•
A1 Thread[pool-1-thread-1,5,main]
A2 Thread[pool-1-thread-1,5,main]
Threads started, main ends
Mittels newFixedThreadPool wird ein Pool mit
zwei Threads erzeugt.
execute startet einen Thread aus dem Pool.
shutdown verhindert, dass weitere
Workerthreads verwendet werden können.
Ohne shutdown läuft der ExecutorService
unendlich (außer System.exit(0);) !!!!!!
B1 Thread[pool-1-thread-2,5,main]
B2 Thread[pool-1-thread-2,5,main]
Name des Pools
Alois Schütte
AOSD
Name des Thread
Priorität des
Thread
25
Concurrent Packet – Callable
Problem:
Ein nebenläufige Thread kann nur über Umwege dem aufrufenden Programm/Thread
Ergebnisse mitteilen.
Etwa:
Runnable runnable = ...;
Thread t = new Thread(runnable);
t.start();
t.join();
String value = someMethodtoGetSavedValue()
Lösung:
In der Schnittstelle Callable, die Runnable erweitert, lässt sich eine Datenstruktur
übergeben, in die der Thread das Ergebnis hineinlegt. Die Datenstruktur kann dann vom
Aufrufer auf Änderungen untersucht werden.
interface java.util.concurrent.Callable<V>{
V call()
}
Diese Methode enthält den parallel auszuführenden Programmcode und liefert eine
Rückgabe vom Typ V.
Alois Schütte
AOSD
26
Concurrent Packet – Callable
Beispiel: int-Felder sortieren im Hintergrund durch Callable
$ cat Callable/CallableApp.java
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
class Worker implements Callable<int[]> {
private final int[] data;
Worker(int[] data) {
this.data = data;
}
public int[] call() { // overwrite call
Arrays.sort(data);
return data;
}
} // Worker
•
•
Worker implementiert Callable.
Callable bietet die Methode call an, die in Worker überschrieben wird..
Alois Schütte
AOSD
27
Concurrent Packet – Callable
public class CallableApp {
public static void main(String[] args) {
int[] unsorted = {106,101,110,110,105};
Callable<int[]> c = new Worker(unsorted);
ExecutorService executor = Executors.newCachedThreadPool();
Future<int[]> result = executor.submit(c); // worker starts
try {
int[] sorted = result.get(); // blocks until worker finished
for (int i=0; i<sorted.length;i++) System.out.print(sorted[i] + " ");
executor.shutdown(); // !!! without shutdown, the executor waits
//
infinitely
} catch (Exception e) {}
} // end main
} // end class CallableApp
•
•
•
Der ExecutorService bietet eine submit-Methode, die das Callable c annimmt und
einen Thread für die Abarbeitung aussucht.
Weil das Ergebnis asynchron ankommt, liefert submit das Future-Objekt result zurück, über
das man erkennen kann, ob das Ergebnis schon verfügbar ist. Mittesl result.get() wird
auf das Ergebnis gewartet.
alternativ result.isDone()== true oder sorted = result.get(2,
TimeUnit.SECONDS); um 2 Sekunden zu warten, nach 2 Sekunden wird Ausnahme
geworfen.
Alois Schütte
AOSD
28
Herunterladen