CNAM Wintersemester 2014 / 2015 Nebenläufige Anwendungen in Java (J2SE, Rich Client, J2EE) Im Rahmen von: Betriebssysteme und nebenläufige Anwendungen Agenda Nebenläufigkeit in Java Anwendungen Basisfunktionen in allen Java Objekten Threads Thread Pools Fork/Join Monitor Semaphores Besonderheiten in Container Managed und Plug-In Umgebungen WorkManager Job APIs Java unterstützt Nebenläufigkeit seit der ersten Version Basisfunktionen in Object Java wurde als Sprache für nebenläufige Anwendungen konzipiert Jedes Java Objekt erbt von java.lang.Object wait notify notifyAll Jedes Objekt erhält durch die Objektspezialisierung automatisch die Methoden wait, notify und notifyAll Dadurch hat jedes Objekt eine einfache start/stop- Logik “im Bauch“ java.lang. Object Thread Programmierung - Basics Threads werden durch Vererbung von Thread oder durch Implementierung des Runnable Interfaces angelegt Objekte, welche Runnable implementieren müssen über eine Instance von Thread gestartet werden Die Laufzeitlogik wird durch Überschreibung der run Methode implementiert Callable (ab 1.5) ermöglicht Rückgabewerte Über die statische Methode sleep kann ein Thread schlafen gelegt werden run java.lang. Runnable start java.lang. Thread sleep Einfache Threads public class SetupThreads { public class Runner implements Runnable { public void run() { //do something useful } } public class MyFirstThread extends Thread { public void run() { //do something useful } } public SetupThreads() { new MyFirstThread().start(); new Thread(new Runner()).start(); } public static void main(String[] args) { new SetupThreads(); } Threads mit Schlafzeit public class SetupThreadsTwo { public static final int SLEEP_TIME = 200; public class MySecondThread extends Thread { public void run() { try { Thread.sleep(SetupThreadsTwo.SLEEP_TIME); } catch (InterruptedException e) { e.printStackTrace(); } } } public SetupThreadsTwo(int count) { MySecondThread thread [] = new MySecondThread[count]; for (int i=0; i < thread.length;i++) { thread[i] = new MySecondThread(); thread[i].start(); } } } for (MySecondThread handle : thread) { try { handle.join(); } catch (InterruptedException e) { e.printStackTrace(); } } Werten wir das letzte Beispiel mit Zahlen aus. h eP erT upTim read+ " et ; ; n + d "+s start LEEP_TIME count; e a k e a r h T ) time wo.S lis( p er T uble) meMil pThreadsT me / (do aken " + up time i T t n t t i u e e t Time - Set .curr s | S ToRun ystem imeTaken e)offset +count+" time+" m S = = t aken ToRun ads " doubl pub timeT ToRuntime read = ( ber thre "+offset g l n o m h t l u e T e f N m r s i e f or ( ic s of lt: p t meP u e i s e T e l p R s u long new int tati tln( " t to e set Set i=1 c vo } doubl .out.prin s | Offse upT 000 id m m e " t } Sys h re ;i< ma i ads =10 n(S Two 000 t ri ; ) " (i) ; i ng[ ms =i+ ] ; 100 arg 0) { s) { Gebrauchte Zeit = Endzeit – Startzeit Offset Zeit = Gebrauchte Zeit – Wartezeit Thread Thread Management Zeit = Offset Zeit – Anzahl Threads Result: Result: Result: Result: Result: Result: Result: Result: Result: Result: Number Number Number Number Number Number Number Number Number Number threads threads threads threads threads threads threads threads threads threads 1000 Time taken 267 ms | Offset to sleep time 67 ms | Setup time per Thread 0.067 ms 2000 Time taken 315 ms | Offset to sleep time 115 ms | Setup time per Thread 0.0575 ms 3000 Time taken 374 ms | Offset to sleep time 174 ms | Setup time per Thread 0.058 ms 4000 Time taken 447 ms | Offset to sleep time 247 ms | Setup time per Thread 0.06175 ms 5000 Time taken 502 ms | Offset to sleep time 302 ms | Setup time per Thread 0.0604 ms 6000 Time taken 567 ms | Offset to sleep time 367 ms | Setup time per Thread 0.06116666666666667 ms 7000 Time taken 638 ms | Offset to sleep time 438 ms | Setup time per Thread 0.06257142857142857 ms 8000 Time taken 702 ms | Offset to sleep time 502 ms | Setup time per Thread 0.06275 ms 9000 Time taken 779 ms | Offset to sleep time 579 ms | Setup time per Thread 0.06433333333333334 ms 10000 Time taken 852 ms | Offset to sleep time 652 ms | Setup time per Thread 0.0652 ms Was bringen Threads. Beispiel: Zerlegen von 1000 Zahlen Lauf/ Threadanzahl Summe Durchschnitt 1 2 4 16 1000 1 21943 6910 9992 2957 3646 2 15239 8070 4959 3611 3504 3 14485 7412 4573 3499 3280 4 15347 4905 4905 3521 3479 5 14649 4746 3792 3792 3360 6 23970 6870 10241 3425 3168 7 21830 11049 6262 3340 3019 8 22191 12353 6423 3952 3062 9 21039 10978 5721 3166 2844 10 21674 11369 5877 3565 3002 192367 84662 62745 34828 32364 19,2 Sek 8,5 Sek 6,3 Sek 3,4 Sek 3,2 Sek Pooling von Threads Basisfunktionen von Thread Pools Thread Pools sind fester Bestandteil der Concurency API in Java Pools können fix, dynamisch oder scheduled sein new Cached Thread Pool Wurde wie viele Erweiterungen über den JSR Prozess in Java aufgenommen (JSR 166) Seit Java 5 im Framework verankert Ältere Java Versionen benötigen externe Frameworks Das Framework enthält neben den allgemeinen Vorteilen von Thread Pools auch diese Vorteile: • Entlastung des Entwicklers (Thread Handling ist im Framework gekapselt) • Damit ebenfalls eine best-practise Implementierung • Man spart sich den manuellen Umgang mit notify, wait, synchronize etc. new Fixed Thread Pool java.util. concurrent .Executors java.util. concurren. Executor Service execute shutdown new Scheduled Thread Pool Verwendung von Thread Pools public class SetupThreadPool { Poo public class Runner implements Runnable { Poo l: 1 public void run() { Poo l: 2 take: try { Poo l: 3 take: 4005 Thread.sleep(200); tak l: 20 P 4 o ol: } catch (InterruptedException e) {e.printStackTrace();} tak e: 14 01 Poo 00 5 t e: } l 1 a : Poo } 6 t ke: 000 l : Poo 7 t ake: 800 l public void startAllThreads(int poolsize) { Poo : 8 ake: 800 long start = System.currentTimeMillis(); Poo l: 9 take: 600 tak l: 60 P 1 o ol: 0 t e: 6 0 ExecutorService executor = Poo 11 ake: 00 Executors.newFixedThreadPool(poolsize); l Poo : 12 take 401 for (int i = 0; i < 20; i++) { : Poo l: 13 take 400 executor.execute(new Runner()); : } Poo l: 14 take 401 : executor.shutdown(); Poo l: 15 take 401 : while (!executor.isTerminated()) { Poo l: 16 take 400 : } Poo l: 17 take 401 : Poo l: 18 take 400 System.out.println("Pool: "+poolsize+" take: "+ : Poo l: 19 take 400 (System.currentTimeMillis()-start)); : l: } 20 take: 400 tak 400 e : public static void main(String[] args) { 203 for(int i=1;i<=20;i++){ } } } new SetupThreadPool().startAllThreads(i); Weitere Implementierungen in Java 1.5 Barrieren Barrieren ermöglichen das Warten auf eine Gruppe von Threads Üblicherweise wird der CyclicBarrier verwendet • Der CyclicBarrier gibt eine Anzahl an Threads an, welche durchlaufen werden müssen • Nach Abschluß wird ein weiterer Thread angestoßen Queue Über Queues können ThreadSafe Queues generiert werden Dies bietet sich besonders für Consumer Provider Anwendungen an Beispiel: Eine BlockingQueue aufbauen und mit take und put aus mehreren Threads arbeiten class WorkerRun implements Runnable { private CyclicBarrier barrier; public WorkerRun(CyclicBarrier barrier) { this.barrier = barrier; } } public void run() { try { barrier.await(); } catch (Exception e) {} } public void start() { CyclicBarrier barrier = new CyclicBarrier (3, new AbschlussRun()); for (int i = 0; i < 3; i++) { new Thread( new WorkerRun(barrier) ).start(); } } Fork Join Ab Java 1.7 sind Funktionen ForkJoin neu im Java SDK Die Grundidee ist die „work-stealing“ Prinzip Im Gegensatz zum ThreadPool warten sie nicht per join auf das Ergebnis von allen Workern. Die Teil der Arbeit wird auf die freien Arbeitsprozesse verteilt. In der Java Doku wird angegeben, dass damit die Systemressourcen besser aufgeteilt werden Im Code wird ein ForkJoinPool aufgebaut Die Arbeit und der fork() wird durch eine RecursiveAction oder RecursiveTask durchgeführt (Action hat kein Return) java.util. concurrent .ForkJoin Pool java.util. concurrent .Recursive Task java.util. concurrent .ForkJoin Task java.util. concurrent .Recursive Action Arbeiten mit Monitor Es ist möglich kritische Bereiche in Java anzulegen Einzelne Abschnitte können mit synchronized „gesichert“ werden Ebenfalls kann das Schlüsselwort auf ganze Methoden angewendet werden Durch den wait / notify Mechanismus können andere Threads über den „nun freien kritischen Bereich“ benachrichtigt werden public class Runner implements Runnable { int criticalVariable; public void run() { synchronized (this) { criticalVariable ++; } criticalArea(2); Ebenfalls kann das Schlüsselwort volatile auf Variablen angewendet werden Diese Variablen haben das Prinzip „lesen vor schreiben“ und „erst den Cache leeren“ //non critical code } public synchronized void criticalArea(int v) { CriticalVariable += v; } } Sperren Ab Java 1.5 sind ebenfalls Lock Interfaces und Lock Klassen in der Implementierung Mit diesen Implementierungen wird das sperren und entsperren mittels Bedingungen ermöglicht Im Gegensatz zum klassischen Monitor gibt es zahlreiche Vorteile • Funktioniert tryLock nicht, blockiert der Code nicht • Locking kann an einzelne Bedingungen geknüpft werden • Lock / Unlock ist Methodenübergreifend • Schreib- und Leselock Lock sperre = new ReentrantLock(); sperre.tryLock(); //something critical sperre.unlock(); ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); readWriteLock.readLock().lock(); readWriteLock.readLock().unlock(); readWriteLock.writeLock().lock(); readWriteLock.writeLock().unlock(); Atomic Implementierungen Ab Java 1.5 sind verschiedene atomare Objekte in der JVM vorhanden Die Implementierungen betreffen Repräsentationen von • primitiven Datentypen wie z.B. int, long, boolean • Arrays • HashMaps • FieldUpdater (als wrapper) Die atomaten Klassen bieten einen Angebot an nicht unterbrechbaren Funktionen zur Verfügung. Dazu gehört zum Beispiel das „lesen und addieren“, „erhöhen und lesen“, „lesen und erhöhen“ public class AtomicSample { private AtomicInteger zaehler = new AtomicInteger(0); public void hochZaehlen() { zaehler.incrementAndGet(); } public int getZaehler() { return zaehler.get(); } } Alternative zum Monitor … Semaphoren Mit der Concurrency API haben auch die Semaphoren Einzug in Java gehalten. Sie bieten eine gute Alternative zum Monitor (synchronized) Semaphoren sollten static definiert werden, um übergreifend verwendet zu werden Mit acquire und release können Semaphoren „gebucht“ werden. Bei Acquire blockt der Code bis die semaphore wieder frei ist Alternativ gibt es den tryAcquire Aufruf, der ein boolean zurückliefert import java.util.concurrent.Semaphore; public class SemaHandling extends Thread{ private static Semaphore semaphore = new Semaphore(1); public void run() { try { semaphore.acquire(); //Alternative: semaphore.tryAcquire(); } catch (InterruptedException e) { e.printStackTrace(); } //Kritischer Code semaphore.release(); } public static void main(String[] args) { new SemaHandling().start(); } } Nebenläufigkeit in J2EE und Eclipse Apps Auf dem Application Server In J2EE Umgebungen sollten alle Threads vom Container verwaltet werden und ein „Wildwuchs“ an Threads muss vermieden werden Bei hosted AppServern und Cloud-Lösungen wird dieser Ansatz um so wichtiger. Der Server benötigt die volle Kontrolle über alle Arbeitsprozesse Aktuell steigt in der Industrie allerdings auch der Bedarf nach Konsolidierung Ebenfalls stieg der Wunsch nach Batch Processing in den letzten Jahren deutlich an Interessante Alternativen bieten ebenfalls die Async Schnittstellen von J2EE Komponenten wie Services, Servlets und Beans JSR 237 Timer & WorkManager wurde zurückgezogen und mit JSR 236 zusammengeführt JSR 236 „Concurrency Utilities for Java EE“ Teil von JEE 7 Im Rich Client Eclipse hat sich in den letzten Jahren als bevorzugte Komponente für viele Rich Client Lösungen durchgesetzt Leider verwenden viele Anwendungen Threads direkt und einige Plug-Ins starten nach dem Laden bis zu 20 Threads “auf Vorrat” Gerade für nebenläufige Arbeitsvorgänge bietet Eclipse neben den Unit of Work, Jobs, Progress Monitor noch viele andere Build-in Funktionen Die direkte Verwendung von Threads umgeht dabei diese Standardfunktionen und bestpractise Patterns Setup von WorkManager WorkManager werden im AppServer (z.B. IBM WebSphere) bekanntgemacht. Jede Anwendung referenziert die entsprechenden Einträge in der Deployment Description <resource-ref> <res-ref-name>wm/utlul</res-ref-name> <res-type>commonj.work.WorkManager</res-type> <res-auth>Container</res-auth> <res-sharing-scope>Shareable</res-sharing-scope> </resource-ref> Beispiele CommonJ WorkManager Initia Wor lContext kMa nage ctx = n ctx.l r wm = ew Initia ooku lCon (W o s t n r p k (“jav e Man text(); m e a l ager :com p m i ) p/en r e k r v /wm o /Mei yW M nWM s { s ) a l ( ”); c n c u i l r b pu k{ oid v c r i Wo publ gic //lo } MyWorker work1 = new MyWorker(); WorkItem item1; Item1=wm.schedule(work1 ); Collection col1 = new ArrayList(); col1.add(item1); wm.waitForAll(col1, WorkManager.INDEFINITE); Collection finished = wm.waitForAny(col1, 1000); Beispiel für den Rich Client – Eclipse Jobs Job job = new Job("My Job") { @Override protected IStatus run(IProgressMonitor monitor) { // do something long running //... // If you want to update the UI sync.asyncExec(new Runnable() { @Override public void run() { // do something in the user interface // e.g. set a text field } }); return Status.OK_STATUS; } }; // Start the Job job.schedule();