Fronty (Queue) v JDK 1.5 (1.6)
Java.vse.cz
Knihovny, které jsou nepostradatelné
java.util.concurrent java.util.concurrent.atomic java.util.concurrent.locks
Ve verzi 1.5 jazyka Java byly přidány nové třídy pro konkurenční programování, které vycházejí z myšlenek Douga Leaho (kniha Concurrent Programming in Java). Obsahují tři hlavní balíčky:
java.util.concurrent – Obsahuje prostředky pro vytváření poolů (Executor, Callable, Future) včetně implementace jednoduchého ThreadPoolu, dále implementace konkurenčních front, map, množin a seznamů (poskytují vyšší výkon při konkurenčním přístupu), podpora pro časování s jemnou granularitou a nové synchronizační prostředky (Semaphore, Exchange a podobně).
java.util.concurrent.atomic – Jednoduché proměnné s atomickým přístupem (AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference, AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray …)
java.util.concurrent.locks – Sada rozhraní pro lepší práci se zámky: Lock, ReadWriteLock, Condition.
Od verze 1.5 Nový balíček java.util.concurrent Tento balíček obsahuje třídy pro bezpečnou a pohodlnou práci v prostředí multithread(více vláknových) aplikací Implementována bez významného použití synchronized sekcí=> nedochází k tomu, že by vlákna zbytečně stály a nebo čekaly na zámek Od Javy 5.0 je totiž možné využít neblokující algoritmy.
Třída BlockingQueue
Součastí java.util.concurrent Dědí z tříd: java.until.Queue a java.util.Collection využívá implementace tříd java.util.concurrent umožňuje čekat na položku (např: podle priority, nebo časového intervalu) Toto je zajištěno pomocí těchto implementací: LinkedBlockingQueue ArrayBlockingQueue SynchronousQueue PriorityBlockingQueue DelayQueue
Příklad instance BlockingQueue q = new SynchronousQueue();
Popis jednotlivých implementací
LinkedBlockingQueue - neomezená délka fronty PriorityBlockingQueue - přerovnává položky dle priority ArrayBlockingQueue - pevná délka fronty, o to efektivnější SynchronousQueue - fronta nulové délky, nikdy neobsahuje žádnou položku, k výměně jedné položky se musí sejít producent i konzument, přijde-li dříve producent, čeká na konzumenta a naopak DelayQueue - položky přerovnává podle jejich timeoutu, ven je pustí až timeout doběhne, metoda size() sice může vrátit nějaké číslo, jakožto počet položek, ale neznamená to že už vypršel timeout a jsou tedy k dispozici).
Některé Metody Třídy BlockingQueue
put(E e ) - vloží určitou položku do fronty a nebo čeká dokud se neuvolní místo take() – vrací a maže aktuálně vybranou položku a nebo čeká až bude daná položka dostupná offer(E e, long timeout, TimeUnit unit ) – nastavení časového intervalu, za jak dlouho bude daná položka dostupná pro přidání do fronty poll(long timeout, TimeUnit unit ) – nastavení časového intervalu, za jak dlouho bude daná položka dostupná pro odebrání z fronty add(E e) – umožňuje přidat danou položku do fronty pokud je to možné a pokud ne vyhodí vyjimku (IllegalStateException) remove(Object o) – vymaže jednu instanci z fronty
Dále například metody(které pochází už z dané implementace): size() – velikost fronty clear() – smaže vše ve frontě Další metody zděděné z java.util.Queue: element, peek, poll, remove
Výkonnostní srovnání Způsob Testování
public static void testArrayList(){ System.out.println("Arraylist:"); List<String> list = Collections.synchronizedList(new ArrayList<String>()); //vkladani long casPred = System.nanoTime(); synchronized(list) { for(Integer i = 0;i<100000;i++){ list.add(i.toString()); } } long casPo = System.nanoTime(); System.out.println((casPo-casPred)); //hledani long casPredHledanim = System.nanoTime(); if(list.contains("0")){ long casPoHledani = System.nanoTime(); System.out.println( (casPoHledani-casPredHledanim)); } if(list.contains("49999")){ long casPoHledani = System.nanoTime(); System.out.println((casPoHledani-casPredHledanim)); } if(list.contains("99999")){ long casPoHledani = System.nanoTime(); System.out.println((casPoHledani-casPredHledanim) ); } }
Výsledky Arraylist HashSet LinkedList ArrayBlockingQueue-FIFO Vkládání 128369946 239278344 150712395 71223633 Hledání 1. prvku 52185 54141 63528 62522 Hledání středního prvku 4474535 142867 6771755 5393479 Hledání posledního prvku 9988085 220140 16699832 11669752
ArrayBlockingQueue ConcurrentLinkedQueue LinkedBlockingQueue 88350056
67440918
109588565
53359
57829
322834
1606685
5916227
10952397
5553890
13228329
25016867
PriorityBlockingQueue PriorityQueue 90082790 98255002 52409 44308 3263376 1141877 10606264 5239884
250000000
200000000 Arraylist HashSet 150000000
LinkedList ArrayBlockingQueue-FIFO ArrayBlockingQueue ConcurrentLinkedQueue
100000000
LinkedBlockingQueue PriorityBlockingQueue PriorityQueue 50000000
0 Vkládání
Hledání 1. prvku
Hledání středního prvku
Hledání posledního prvku
Příklad: Producent a konzument package Queue; import java.util.concurrent.*; public class main { /** * @param args */ public static void main(String[] args) { BlockingQueue q = new SynchronousQueue(); Producent p = new Producent(q); Konzument c = new Konzument(q); Thread vlakno1 = new Thread(p); Thread vlakno2 = new Thread(c);
} }
vlakno1.start(); vlakno2.start();
package Queue; import java.util.Random; import java.util.concurrent.BlockingQueue; public class Producent implements Runnable { private final BlockingQueue fronta; //vytvoreni fronty private Random generator; public Producent(BlockingQueue q) { fronta = q; //vlozeni fronty s danou implementaci generator = new Random(); } public void run() { try { while(true) { fronta.put(produkuje()); //u fronty se provede metoda put(), ktera prida polozku System.out.println("velikost fronty: " + fronta.size()); } } catch (InterruptedException ex) { ex.printStackTrace(); } }
}
private int produkuje() { int nahoda = generator.nextInt(30); System.out.println("přídá:" + nahoda); return nahoda; }
package Queue; import java.util.concurrent.BlockingQueue; public class Konzument implements Runnable { private final BlockingQueue fronta; //vytvoreni fronty public Konzument(BlockingQueue q) { fronta = q; //vlozeni fronty s danou implementaci } public void run() { try { while(true) { konzumovat(fronta.take()); //u fronty se provede metoda tak(), ktera vrati polozku a pote odstrani System.out.println("velikost fronty: " + fronta.size()); } } catch (InterruptedException ex) { ex.printStackTrace(); } } private void konzumovat(Object x) { System.out.println("veme:" + x); }
Závěr: třída BlockingQueue umí bezpečně používat více producentů a konzumentů
Třída ConcurrentLinkedQueue
thread-safe, ale i paralelní fronta
Fronta ConcurrentLinkedQueue může mít více producentů i konzumentů. Přechody mezi stavy vlákna
new thread
new
Ve frontě. Možné řešení: Wait, sleep, notify blocked
start
runnable
yield
Přidělen procesor
Metoda „run“ skončí
running
dead
Rozdíl mezi CAS a synchronizovaným seznamem
CAS zkratka pro Compare-And-Swap instrukci, která stojí za celým packagem java.util.concurrent Sémantika CAS instrukce je v podstatě stejná jako synchronized s tím rozdílem, že je to na úrovni hardwaru namísto JVM. Z toho vyplývá mnohem lepší výkonnost
Ještě jedna fronta: sun.misc.Queue Lze použít jako střídaní zámku. Používá synchronizovaní. Méně efektivní, než je CAS Nelze se na ní spoléhat V dalších verzích možná nebude .