concurrentPaket

Werbung
Das concurrent Paket
1
2
3
4
5
Überblick
Lock
Condition
Queue
Executors
Prof. Dr. Alois Schütte
Advanced System Programming
1
Concurrrent Paket: 1 Überblick


Standard-Mittel von Java bezogen auf Nebenläufigkeit sind:

Threads mit dem Interface Runnable

Synchronisations-Mechanismen synchronized, wait, notify und
notifyAll.
Ab Java 5 sind im Paket java.util.concurrent Klassen für Standard-Muster
der parallelen Programmierung enthalten, z.B.:

Locks

Queues

Thread-Pooling

Scheduling

Semaphore

Exchanger

CountDownLatch

CyclicBarrier
Prof. Dr. Alois Schütte
Advanced System Programming
2
Concurrrent Paket: 2 Locks



Lockkonzept

Ein Lock ist ein Mittel, um in multithreading Umgebungen den
gemeinsamen Zugriff auf Ressourcen zu koordinieren.

Um eine Ressource nutzen zu können, muss ein Thread den zugehörigen
Schlüssel anfordern.

Solange ein Thread den Schlüssel besitzt, kann kein anderer Thread die
Ressource verwenden, er muss warten.

Der den Schlüssel besitzende Thread gibt ihn frei, daraufhin kann ein
wartender Thread den Schlüssel bekommen und die Ressource
verwenden.
Dieses Lockkonzept könnte mit synchronized umgesetzt werden. Dabei hat
man aber immer die Blockstruktur als Einschränkung.
java.util.concurrent.locks beinhaltet Interfaces und Klassen für Locks.
Prof. Dr. Alois Schütte
Advanced System Programming
3
Concurrrent Paket: 2 Locks

Die Klasse TimeUnit wird im Zusammenhang mit Locks verwendet, um eine
Zeitdauer in SECONDS, MICROSECONDS, MILLISECONDS oder NANOSECONDS
angeben zu können.

Class TimeUnit

Beispiel:TimeUnit/MainClass.java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
import static java.util.concurrent.TimeUnit.*;
public class MainClass extends Thread {
// This field is volatile because two different threads may access it
volatile boolean keepRunning = true;
public void run() {
while (keepRunning) {
long now = System.currentTimeMillis();
System.out.printf("%tr%n", now);
try { Thread.sleep(1000); // millisecs
} catch (InterruptedException e) {
return;
}
}
} // run
Prof. Dr. Alois Schütte
Advanced System Programming
4
Concurrrent Paket: 2 Locks
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
public void pleaseStop() {
keepRunning = false;
}
public static void main(String[] args) {
MainClass thread = new MainClass();
thread.start();
try { SECONDS.sleep(10); // = MILLISECONDS.sleep(10000)
} catch (InterruptedException ignore) {
}
thread.pleaseStop();
} // main
as@hal:TimeUnit> java MainClass
}
10:19:51 AM
10:19:52 AM
10:19:53 AM
10:19:54 AM
10:19:55 AM
10:19:56 AM
10:19:57 AM
10:19:58 AM
10:19:59 AM
10:20:00 AM
as@hal:TimeUnit>
Prof. Dr. Alois Schütte
Advanced System Programming
5
Concurrrent Paket: 2 Locks

Das Interface Lock spezifiziert das Verhalten von Lock-Objekten.
public interface Lock {
void lock();
void lockInterruptible() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws
InterruptedException
void unlock();
Condition newCondition();
// Erklärung später
}

lock wartet, bis der Objektschlüssel verfügbar ist und belegt ihn dann.

unlock gibt das Objekt frei.

lockInterruptible funktioniert wie lock, aber es wird eine Ausnahme
geworfen, wenn ein anderer Thread den Thread durch interrupt unterbricht.


tryLock liefert false, wenn das Objekt nicht verfügbar ist; ansonsten wird
das Objekt in Besitz genommen und true returniert.
tryLock(long, TimeUnit) funktioniert wie tryLock, aber es wird eine
maximale Zeitspanne gewartet, wenn das Objekt nicht verfügbar ist.
Prof. Dr. Alois Schütte
Advanced System Programming
6
Concurrrent Paket: 2.1 ReentrantLock

Die Klasse ReentrantLock implementiert die Schnittstelle Lock.
public class ReentrantLock implements Lock, Serializable {
public ReentrantLock(boolean fair);
public ReentrantLock;
// Methods of Lock
void lock();
void lockInterruptible() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws
InterruptedException
void unlock();
Condition newCondition();
// additional Methods
public boolean isFair();
public int getHoldCount();
public int getQueueLength();
public boolean isHeldByCurrentThread();
public boolean isLocked();
protected Thread getOwner();
protected Collection<Thread> getQueuedThreda();
}
Prof. Dr. Alois Schütte
Advanced System Programming
7
Concurrrent Paket: 2.1 ReentrantLock



Der Konstruktor kann die Angabe eine fair-Paramerters haben. Wenn
mehere Threads auf den Lock warten, garantiert fair==true, dass der am
längsten wartende Thread das Lock-Objekt erhält.
isFair liefert den fair-Parameter des Konstruktors zurück.
Ein Lock enthält eine Zähler, der bei jedem lock inkrementiert, bei unlock
dekrementiert wird. Ein Thread kann also öfter lock aufrufen.
getHoldCount liefert den Wert des Zählers.

getQueueLength returniert die Anzahl der auf einen Lock wartenden
Threads.
isHeldByCurrentThread ist true, wenn der aufrufende Thread den Lock hält.

isLocked ist true, wenn irgendein Thread den Lock hält.


getOwner(), Collection<Thread> getQueuedThreads liefern den Besitzer
und die wartenden Threads.
Prof. Dr. Alois Schütte
Advanced System Programming
8
Concurrrent Paket: 2.1 ReentrantLock

Beispiel: Klasse Konto (Account), Geldabholer (Withdrawer als Thread)

Zunächst die Realisierung der Klasse Account mit synchronized
1.
2.
3.
4.
5.
class Account {
private float balance;
public Account (float initialBalance) {
balance = initialBalance;
} // Account
6.
7.
8.
public synchronized float getBalance() {
return balance;
} // getBalance
9.
10.
11.
12.
13.
14.
15.

public synchronized void withdraw( float amount) {
if (amount < 0 || balance < amount)
throw new IllegalArgumentException("withdraw: wrong amount " +
amount);
try { Thread.sleep(1000); } catch (Exception e) {};
balance -= amount;
} // withdraw
} // Account
synchronized ist erforderlich, da ein Konto von mehreren Threads verwendet
werden kann und mindestens einer den Zustand per withdraw ändern kann.
Prof. Dr. Alois Schütte
Advanced System Programming
9
Concurrrent Paket: 2.1 ReentrantLock

1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
Geldabheber sind Threads, die vom Konto die withdraw-Methode verwenden.
class Withdrawer extends Thread {
private Account account;
private float amount;
Withdrawer (Account account, float amount) {
this.account = account;
this.amount = amount;
}
public void run () {
try { account.withdraw(amount); }
catch (Exception e) { System.out.println(e); }
} // run
} // Withdrawer
Prof. Dr. Alois Schütte
Advanced System Programming
10
Concurrrent Paket: 2.1 ReentrantLock

1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
Zwei Kontobevollmächtigte (Jenni und Hannah) wollen Geld vom
gemeinsamen Konto abheben.
public class WithdrawApp {
public static void main(String[] args) throws InterruptedException {
Account account = new Account(1000);
Withdrawer hannah = new Withdrawer(account, 400);
Withdrawer jenni = new Withdrawer(account, 500);
hannah.start();
jenni.start();
hannah.join(); jenni.join();
System.out.println("balance = " + account.getBalance());
}
}
as@hal:synchronized> java WithdrawApp
balance = 100.0
as@hal:synchronized>
Prof. Dr. Alois Schütte
Advanced System Programming
11
Concurrrent Paket: 2.1 ReentrantLock

Nun die Realisierung der Klasse Account mittels Locks.

Die Blockstruktur von synchronized muss mittels lock und unlock nachgebildet werden:
import java.util.concurrent.locks.*;
private final ReentrantLock lock = new ReentrantLock(true);
lock.lock();
try {
...
}
finally {
lock.unlock();
}



Wichtig:

Da im try-Block Ausnahmen auftreten können ist mittels finally sicherzustellen,
dass stets unlock aufgerufen wird!

Nur so werden „gelockte“ Objekte immer freigegeben.
Die Verwendung von lock-unlock ist also aufwendiger, dafür aber universell: ein Thread
kann lock aufrufen, ein andere unlock.
Soll anstelle einer Objektsperre eine Klassensperre deklariert werden, wird die LockVariable als static definiert.
Prof. Dr. Alois Schütte
Advanced System Programming
12
Concurrrent Paket: 2.1 ReentrantLock
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
import java.util.concurrent.locks.*;
class Account {
private float balance;
private final ReentrantLock lock = new ReentrantLock(true);
public Account (float initialBalance) {
balance = initialBalance;
} // Account
public float getBalance() {
lock.lock();
try {
return balance;
}
finally {
lock.unlock();
}
} // getBalance
public void withdraw( float amount) {
lock.lock();
try {
if (amount < 0 || balance < amount)
throw new IllegalArgumentException("withdraw:...);
try { Thread.sleep(1000); } catch (Exception e) {};
balance -= amount;
}
finally {
lock.unlock();
}
} // withdraw
} // Account
Prof. Dr. Alois Schütte
Advanced System Programming
13
Concurrrent Paket: 2.1 ReentrantLock

Was muss geändert werden, wenn Jenni und Hannah nicht gleichzeitig Geld
abholen dürfen?


Lösung:

1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
Der erste Abholer hält den Lock, der zweite muss abgewiesen werden.
trylLock anstelle von lock
public void withdraw( float amount) {
if (lock.tryLock() == false) return;
try {
if (amount < 0 || balance < amount)
throw new IllegalArgumentException("withdraw: ...);
try { Thread.sleep(1000); } catch (Exception e) {};
balance -= amount;
}
finally {
lock.unlock();
as@hal:synchronized> java WithdrawApp
}
balance = 600.0
} // withdraw
as@hal:synchronized>
Prof. Dr. Alois Schütte
Advanced System Programming
14
Concurrrent Paket: 3 Condition

Die Methode newCondition des Interface Lock liefert ein Condition-Objekt zurück.
Genauer, ein Objekt einer Klasse die die Schnittstelle Condition implementiert.
9.
public interface Condition {
void await() throm InterruptedException;
void awaitUninterruptibly();
boolean await(long time Timeunit unit) throm
InterruptedException;
long awaitNanos(long time) throm InterruptedException;
boolean awaitUntil(Date deadline) throm InterruptedException;
void signal();
void signalAll();
}

Die Methoden haben Ähnlichkeit zu wait und notify.
1.
2.
3.
4.
5.
6.
7.
8.




Eine Condition ist signalisiert oder nicht signalisiert. Sofort nach ihrer Erzeugung ist
sie signalisiert.
Ein await-Aufruf auf einer signalisierten Condition kehrt sofort zurück. Vor Rückkehr
von await wird die Condition in den nicht signalisierten Zustand versetzt.
signal versetzt eine Condition in den signalisierten Zustand, weckt also einen
wartenden Thread
signalAll weckt alle auf die Condition wartenden Threads.
Prof. Dr. Alois Schütte
Advanced System Programming
15
Concurrrent Paket: 3 Condition

1.
2.
3.
4.
Beispiel: BoundedBuffer, zunächst mit synchronized
class BoundedBuffer {
private float[] buffer;
private int first, last;
private int numberInBuffer = 0, size;
5.
6.
7.
8.
9.
10.
BoundedBuffer(int length) {
size = length;
buffer = new float[size];
first = last = 0;
}
11.
12.
13.
14.
15.
16.
17.
public synchronized void dumpBuffer() {
System.err.print("Buffer: "); // use err channel to log
for (int i=(first+1)%size, j=0; j<numberInBuffer; j++,
i=(i+1)%size)
System.err.print(buffer[i] + " ");
System.err.println(" ");
}
18.
Prof. Dr. Alois Schütte
Advanced System Programming
16
Concurrrent Paket: 3 Condition
public synchronized void put(float item) throws
InterruptedException {
while(numberInBuffer == size) wait();
last = (last+1)%size;
numberInBuffer++;
buffer[last] = item;
dumpBuffer();
notifyAll();
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
public synchronized float get() throws InterruptedException {
while(numberInBuffer == 0) wait();
first = (first+1)%size;
numberInBuffer--;
dumpBuffer();
notifyAll();
return buffer[first];
}
// BoundedBuffer
18.
}

Die Methoden put und get sind mittels synchronized synchronisiert.
Prof. Dr. Alois Schütte
Advanced System Programming
17
Concurrrent Paket: 3 Condition
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.

class Producer extends Thread {
private BoundedBuffer buffer;
public Producer(BoundedBuffer b) {
buffer = b;
}
public void run() {
for(int i = 0; i < 100; i++) {
try { buffer.put(i);
System.out.println("put
} catch (InterruptedException
}
}
} // Producer
class Consumer extends Thread {
...
public void run() {
for(int i = 0; i < 100; i++) {
try { float x = buffer.get();
System.out.println("got
} catch (InterruptedException
}
}
} // Consumer
" + i);
ingnored) {};
" + x);
ingnored) {};
Ein Produzent ruft die put-Methode auf, ein Konsoment die get-Methode des
gemeinsamen Buffers.
Prof. Dr. Alois Schütte
Advanced System Programming
18
Concurrrent Paket: 3 Condition

1.
2.
3.
4.
Wie kann man dies nun mittels Condition realisieren und wo sind die Vorteile?
class BoundedBuffer {
private float[] buffer;
private int first, last;
private int numberInBuffer = 0, size;
5.
6.
7.
8.
private ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
9.
10.
11.
12.
BoundedBuffer(int length) {
...
}
13.
14.
15.
16.

public void dumpBuffer() {
...
}
Es gibt zwei Condition Attribute, notFull und notEmpty für das Objekt
lock.
Prof. Dr. Alois Schütte
Advanced System Programming
19
Concurrrent Paket: 3 Condition
1.
public void put(float item) throws InterruptedException {
lock.lock();
try {
while(numberInBuffer == size) notFull.await();
last = (last+1)%size;
numberInBuffer++;
buffer[last] = item;
dumpBuffer();
notEmpty.signal();
} finally {lock.unlock();}
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.


}
Wenn der Buffer voll ist, wird gewartet, bis eine Condition notFull
signalisiert wird.
Nach dem Schreiben in den Buffer wird signaliert notEmpty.
Prof. Dr. Alois Schütte
Advanced System Programming
20
Concurrrent Paket: 3 Condition
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.



public float get() throws InterruptedException {
try {
lock.lock();
while(numberInBuffer == 0) notEmpty.await();
first = (first+1)%size;
numberInBuffer--;
dumpBuffer();
notFull.signal();
return buffer[first];
} finally {lock.unlock();}
}
} // BoundedBuffer
Wenn der Buffer leer ist, wird gewartet, bis eine Condition notEmpty
signalisiert wird.
Nach dem Lesen des Buffer wird signaliert notFull.
Insgesamt ist man also mit Locks und Conditions flexibler, man kann
unterschiedliche Bedingungen signalisieren und so gezielt nur bestimmte
Threads wecken (eben die die auf die Condition warten).
Prof. Dr. Alois Schütte
Advanced System Programming
21
Concurrrent Paket: 4 Executors


1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
Bisher gab es stets eine enge Verbindung, zwischen dem was ein Thread
macht (definiert im Runnable Objekt) und dem Thread selbst.
Beispiel HelloWorld
public class HelloWorld {
public static void main (String args [ ]) {
HelloWorldThread t = new HelloWorldThread ("Hello, World!");
new Thread(t).start();
// creation and starting a thread
}
}
class HelloWorldThread implements Runnable {
private String str;
HelloWorldThread(String s) {
str = new String(s);
}
// task of a thread
12.
public void run ( ) {
System.out.println(str);
}
13.
14.
15.
16.

}
In größeren Anwendungen macht es Sinn, strikt zwischen ThreadManagement und Anwendungsfunktionalität des Thread zu unterscheiden.
Prof. Dr. Alois Schütte
Advanced System Programming
22
Concurrrent Paket: 4 Executors

Objekte, die das Management von Threads übernehmen, werden Executors genannt.

Es existieren drei Schnittstellen für Executor:


Executor erlaubt das Erzeugen und Ausführen von Threads

ExecutorService ist ein Subinterface von Executor, das den Lebenszyklus von
Thread beeinflussen kann

ScheduledExecutorService erlaubt das Definieren von zukünftigen oder
periodischen Abläufen
Executor hat eine Methode execute, mit der ein Thread erzeugt und gestartet werden
kann.

Wenn r ein Runnable Objekt ist und e ein Executor, dann gilt:
(new Thread(r)).start();


==
e.execute(r);
ExecutorService beinhaltet neben execute noch die Methode submit, die ebenfalls
ein Runnable-Objekt aufnehmen kann. Zusätzlich ist die in der Lage, ein CallableObjekt als Parameter zu erhalten. Ein Callable liefert einen Rückgabewert eines
Thread. submit liefert ein Future-Objekt zurück, das verwendet wird um den
Rückgabewert des Callable zu verarbeiten.
Die meisten der Executor-Schnittstellen-Implementieirungen benutzen
Threadpools.
Prof. Dr. Alois Schütte
Advanced System Programming
23
Concurrrent Paket: 4 Executors





1.
2.
3.
Die Meisten Implementierungen der Executor-Schnittstellen benutzen Threadpools,
die aus Workerthreads bestehen.
Die Idee ist es, eine Menge von Workerthreads zu haben, die einmal erzeugt werden
und unterschiedliche Aufgaben im Verlauf der Zeit ausführen können.

Vorteil: die Threaderzeugung geschieht nur einmal

Alternativ müsste für jede Aufgabe immer ein Thread erzeugt werden, dann
gelöscht werden, ein neuer Thread müsste erzeugt werden usw.
Es existieren unterschiedliche Arten von Threadpools. Hier sei eine stellvertretend
behandelt.
Bei einem fixed Threadpool gibt es eine feste Menge von Threads. Wenn mehrere
Aufgabe zu bearbeiten sind, als Threads verfügbar sind, werden sie in eine
Warteschlage eingereiht.
Beispiel 3 Aufgaben mit Pool von 2 Threads
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
Prof. Dr. Alois Schütte
Advanced System Programming
24
Concurrrent Paket: 4 Executors
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
class PrintTask implements Runnable {
private int sleepTime; // random sleep time for thread
private String threadName; // name of thread
private static Random generator = new Random();
// assign name to thread
public PrintTask(String name) {
threadName = name; // set name of thread
sleepTime = generator.nextInt(5000); // sleep time between 0 and 5 s
} // end PrintTask constructor
public void run() {
try {
// put thread to sleep for sleepTime amount of time
System.out.printf("%s going to sleep for %d millisecs.\n",
threadName, sleepTime);
Thread.sleep(sleepTime); // put thread to sleep
} // end try
catch ( InterruptedException exception ) {
exception.printStackTrace();
} // end catch
System.out.printf( "%s done sleeping\n", threadName );
} // end method run
} // end class PrintTask
Prof. Dr. Alois Schütte
Advanced System Programming
25
Concurrrent Paket: 4 Executors
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
public class Threadpool {
public static void main( String[] args ) {
// create and name each runnable
PrintTask task1 = new PrintTask( "thread 1" );
PrintTask task2 = new PrintTask( "thread 2" );
PrintTask task3 = new PrintTask( "thread 3" );
System.out.println( "Starting threads" );
// create ExecutorService to manage threads
ExecutorService threadExecutor = Executors.newFixedThreadPool( 2 );
// start threads and place in
threadExecutor.execute( task1
threadExecutor.execute( task2
threadExecutor.execute( task3
runnable state
); // start task1
); // start task2
); // start task3
threadExecutor.shutdown(); // shutdown worker threads: no more worker
// allowed, but running worker run to complete
System.out.println( "Threads started, main ends\n" );
as@hal:FixedThreadPool> java Threadpool
} // end main
Starting threads
} // end class Threadpool
Threads started, main ends
thread 1 going to sleep for 722 millisecs.
thread 2 going to sleep for 4965 millisecs.
thread 1 done sleeping
thread 3 going to sleep for 3703 millisecs.
thread 3 done sleeping
thread 2 done sleeping
as@hal:FixedThreadPool>
Prof. Dr. Alois Schütte
Advanced System Programming
26
Herunterladen