Java (22) - Vlákna

Programy, které jsme dosud tvořili, byly striktně sekvenční – žádné dvě jejich části nebyly vykonávány ve stejném čase. V tomto dílu si ukážeme, jak vytvářet programy tak, aby některé úseky kódu probíhaly paralelně. Tohoto chování pak využijeme v dalších dílech věnovaných grafickému uživatelského rozhraní, kdy budeme vyžadovat, aby se v jednu chvíli zároveň překreslovalo rozhraní a probíhal výpočet (složitého) úkolu.

Jelikož je však paralelní programování poměrně náročné na schopnosti vývojáře a programy se velmi špatně debugují, tak si ukážeme pouze jeho základní vlastnosti, možnosti, pasti a propasti.

Co je to vlákno?

Vlákno můžeme chápat jako abstrakci nad stavem programu – pozice zpracovávané instrukce v kódu a předané parametry jednotlivých volání metod. Každý běžící program obsahuje minimálně jedno vlákno, které začíná svou práci u javovských desktopových aplikací v metodě main(String[] args) a přestává existovat v okamžik, kdy tuto metodu opět opustí (tak dosud pracovaly všechny naše programy).

Je ale důležité říci, že vlákna můžeme také vytvářet a dávat jim ke zpracování určité části kódu. Pokud má náš počítač více jader procesorů, než je aktuálně spuštěných vláken, tak vlákna budou vykonávána souběžně. Pokud je volných jader méně než vláken, tak mezi nimi byde procesor ve velmi krátkých intervalech přepínat, čímž vytvoří zdání souběžnosti.

Program terminuje v okamžiku, kdy jej opustí poslední vlákno, které není označeno jako daemon. Takto označené vlákno je bráno jako pomocné a při terminaci programu na něj není brán ohled.

Odbočka stranou: procesy

Na podobném principu jako vlákna jsou založeny i procesy. Tyto jsou ovšem abstrakcí nad celým běžícím programem (proces může obsahovat mnoho vláken, avšak vždy minimálně jedno), jednotlivé procesy mají vlastní paměťový kontext (nikoliv pouze zásobník volání) a vlastní zámky nad soubory.

Vlákna vs. procesy

Z výše uvedených rozdílů plyne, že výměna vlákna na procesoru, stejně jako jeho vytvoření nebo zrušení, je oproti stejné akci s procesem výrazně levnější operací (není třeba měnit paměťový kontext). Dále je komunikace vláken výrazně snazší než komunikace procesů, protože stačí využít sdílených struktur (objektů a jejich fieldů), zatímco u procesů by bylo zapotřebí využít volání vzdálených metod, sdílet data skrze soubor namapovaný do paměti atp. (ani jedna z těchto technik se nedá označit za přívětivou).

Nevýhodou vláken pak je zejména obtížné ošetření jejich korektního souběhu – jedno vlákno nesmí druhému modifikovat data pod rukama, jelikož by tak mohlo dojít k ovlivnění výpočtu a navrácení zcela nesmyslného výsledku.

Vlákna a procesy v Javě

Z pohledu javovského programátora nás však tyto rozdíly moc trápit nemusí, protože budeme pracovat vždy pouze s vlákny. Procesy lze sice také využívat, ale zde se jedná o velmi speciální případy – například o volání externích programů.

Jednoduchá aplikace

/**
 * Demonstrace jednovlaknove aplikace
 * @author Pavel Micka
 */
public class Main {
    /**
     * Do teto metody vstoupi hlavni vlakno aplikace
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        List<Integer> list = new ArrayList<Integer>();
        list.add(1);
        list.add(2);
        printList(list);
    }
    
    /**
     * Vypise obsah predaneho seznamu
     * @param l seznam
     */
    public static void printList(List l){
        for(Object o : l){
            System.out.println(o);
        }
    }
}

Ukažme si nejprve průchod vlákna skrz jednoduchou aplikaci, která pouze přidá do seznamu dvě čísla a posléze obsah seznamu vypíše.

Vlákno vstoupí do hlavní metody main na řádce 10, vytvoří seznam na haldě (společný paměťový prostor, ve kterém jsou vytvářeny všechny objekty), odkaz na něj je však uložen na zásobník volání (proměnná list je lokální). Nakonec je seznam předán metodě printList, která seznam vypíše. Po návratu z volání vlákno program na řádce 15 opustí, čímž jej terminuje.

V případě, že bychom volali metodu main z více vláken, tak by nemohlo dojít k jejich interakci, jelikož jsou reference na použité proměnné uložené na zásobníku, který má každé vlákno samo pro sebe. Stejně tak je při volání metody printList odkaz na předávaný seznam uložen na zásobníku daného vlákna. Jediným problematickým místem je samotný výpis, kde by mohlo hrozit, že budou vlákna interagovat při zápisu do výstupního proudu (uvnitř volání metody System.out.println). Zde je ale situace ošetřena již uvnitř zmíněného volání – jak ošetřit souběžnou modifikaci sdílené struktury si ukážeme později v tomto článku.

Vícevláknová aplikace

Nyní si ukažme, jak by ten vypadal obdobný kód vícevláknové aplikace. Pro tvorbu nových vláken využijeme třídu Thread (dokumentace), respektive její metodu run. Obsah metody run bude při spušnění nového vlákna (zavolání jeho metody start) vykonán asynchronně na volajícím vláknu.

/**
 * Demonstrace dvouvlaknove aplikace
 * @author Pavel Micka
 */
public class Main {

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        t1.start();
        t2.start();
    }
}

class MyThread extends Thread{

    @Override
    public void run() {
        List<Integer> list = new ArrayList<Integer>();

        for(int i = 0; i < 1000; i++){
            list.add(i);
        }

        printList(list);
    }

    /**
     * Vypise obsah predaneho seznamu
     * @param l seznam
     */
    public static void printList(List l) {
        for (Object o : l) {
            System.out.println(o);
        }
    }
}
0
0
1
1
2
2
3
3
4
4
5
6
7
5
8
6
9
10
11
7
8
12
9
13
10
14
11
15
12
16
13
17
14
18
15
19
16
20
17
21
18
19
22
20
23
21
24
22
25
23
26
24
25
27
28
26
29
27
30
28
31
29
32
30
33
31
32
33
34
34
35
35
36
36
37
37
38
38
39
39
40
40
41
42
41
43
42
44
43
45
46
44
47
45
48
46
49
47
48
49

Ve výpisu na konzoli si všimněme, že jsou seznamy pomíchány, což je způsobeno tím, že je vnitřně chráněno pouze samotné volání System.out.println, nikoliv celé výpisy. Pokud by však nebylo chráněno ani samotné volání, tak bychom na výpis dostali rozsypaný čaj, jelikož by došlo k situaci, kdy by jedno vlákno zapsalo pouze část výstupního řetězce a procesor by přepnul na druhé vlákno (jež by začalo vypisovat svůj výstup).

Synchronizace

V minulém případě jsme využívali toho, že metody byly tzv. thread-safe – vláknově bezpečné, jelikož žádným způsobem nemodifikovaly sdílený stav. Ukažme si, jak vypadá vláknově nebezpečný kód – kód, který modifikuje stav neatomickými operacemi (atomická operace je taková operace, která proběhne na procesoru v rámci jediného taktu – je nedělitelná).

/**
 * Demonstrace chyby v praci nad spolecnym zdrojem
 * Ctenari se omlouvam za antiobjektovost tohoto prikladu :-)
 * @author Pavel Micka
 */
public class NonAtomicExample {

    public volatile static int i = 0; //volatile == necachuj na procesoru

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();

        t1.start();
        t2.start();

        Thread.sleep(1000); //na vterinu uspime hlavni vlakno a nechame dve pridana pracovat

        System.out.println(i); //melo by byt 20000
    }
}

class MyThread extends Thread {

    @Override
    public void run() {
        for(int i = 0; i < 10000; i++){
            NonAtomicExample.i++; // NonAtomicExample.i = NonAtomicExample.i + 1 - dve operace: scitani a prirazeni
        }
    }
}

19286

Tento kousek kódu, inkrementující společnou proměnnou, ukazuje hned dvě pasti, do kterých ochotně spadne téměř každý programátor.

Neatomická inkrementace

První problém spočívá v inkrementaci společné proměnné, jelikož ji téměř každý považuje za jednu operaci. Ve skutečnosti se ale jedná o operace dvě: součet a přiřazení. Proto může dojít k situaci, kdy vlákno A přečte hodnotu proměnné (1000) a přičte k ní 1, v ten okamžik je přerušeno ve prospěch vlákna B, které opět přečte hodnotu (1000) a přičte k ní 1 a je přerušeno ve prospěch původního vlákna A. Nakonec obě vlákna postupně zapíší hodnotu 1001, ačkoliv došlo ke dvěma inkrementacím (má být 1002). Tato situace je ještě velmi příznivá, protože nikde není psáno, za jak dlouho budou vlákna vystřídána, stejně tak by mohlo dojít k situaci, že první vlákno přečte hodnotu (0) a je vystřídáno, druhé vlákno 10000x inkrementuje a je vystřídáno a vlákno původní přepíše hodnotu 10000 číslem 1.

Právě kvůli této chybě je výstupem programu 19286 místo 20000.

Volatile

Druhou věcí, na kterou je třeba myslet, je cacheování na procesoru. Pokud pracujeme stále s jednou proměnnou, tak je tato proměnná z důvodu optimalizace přiřazena do vyrovnávací paměti (jádra) procesoru (práce s ní je tak řádově rychlejší, než kdyby byla v RAM). Vyrovnávací paměť procesoru ovšem nemusí být sdílená mezi jednotlivými jádry. Proto pokud vlákno A na prvním jádře pracuje se svojí lokální kopií, tak o případných modifikacích (v našem případě inkrementacích) není nijak informováno vlákno B na druhém jádře pracující nad další lokální kopií proměnné. Obě jádra tak pracují nezávisle s neaktuálními daty a výsledek bude samozřejmě nekorektní.

Abychom tomuto předešli, tak u sdílených proměnných, jejichž hodnotu modifikujeme, uvádíme klíčové slovo volatile, které zajistí, že proměnná nebude nikdy cacheována (jádro si vždy vyžádá novou hodnotu a zápis provádí do RAM).

Synchronized

Abychom napravili výše zmíněnou chybu neatomické inkrementace, tak musíme zabránit tomu, aby se v kritické oblasti kódu v jeden okamžik vyskytovalo více než jedno vlákno. K tomuto využijeme synchronizovanou sekci, což je blok, před nímž je uvedeno klíčové slovo synchronized a v závorce objekt zámku.

Zámek je libovolný javovský objekt, který slouží jako pešek – může ho vlastnit pouze jedno vlákno, a právě toto vlákno smí vstoupit do kritické sekce a vykonat zde obsažené instrukce, zbylá vlákna čekají na okamžik, kdy je zámek opět dostupný. Po opuštění kritické sekce vlákno zámek automaticky uvolní.

/**
 * Spravna implementace inkrementace spolecne promenne
 * @author Pavel Micka
 */
public class NonAtomicExample {

    public volatile static int i = 0; //volatile == necachuj na procesoru

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();

        t1.start();
        t2.start();

        Thread.sleep(1000); //na vterinu uspime hlavni vlakno a nechame dve pridana pracovat

        System.out.println(i); //melo by byt 20000
    }
}

class MyThread extends Thread {

    @Override
    public void run() {
        for(int i = 0; i < 10000; i++){
            synchronized(NonAtomicExample.class){ //objekt tridy NonAtomicExample slouzi jako zamek
                NonAtomicExample.i++; // NonAtomicExample.i = NonAtomicExample.i + 1 - dve operace: scitani a prirazeni
            }
        }
    }
}
20000

Synchronizované metody

Kromě bloků lze také synchronizovat celé metody, stačí uvést klíčové slovo synchronized v jejich hlavičce. Je však třeba dávat pozor na to, co slouží jako zámek. V případě statických metod se jedná o objekt obalující třídy, u nestatických metod se jedná o instanci, nad níž je metoda volána.

/**
 * Demonstrace dvouvlaknove aplikace nad sdilenym seznamem
 * @author Pavel Micka
 */
public class Main {

    /**
     * @param args the command line arguments
     */
    private List list = new ArrayList();

    public static void main(String[] args) throws InterruptedException {
        //promenne vyuzivane ve vnitrnich tridach musi byt konecne (final)
        final Main m = new Main(); //vytvorime instanci objektu Main (kvuli seznamu list)

        Thread t1 = new Thread() {

            @Override
            public void run() {
                for (int i = 0; i < 50; i += 2) {
                    synchronized (m) {
                        m.list.add(i);
                    }
                }
            }
        };
        Thread t2 = new Thread() {

            @Override
            public void run() {
                for (int i = 1; i < 50; i += 2) {
                    synchronized (m) {
                        m.list.add(i);
                    }
                }
            }
        };

        t1.start();
        t2.start();

        Thread.sleep(1000);
        Collections.sort(m.list);
        m.printList();
    }

    /**
     * Vypise obsah predaneho seznamu
     * Metoda je synchronizovana, u statickych metod je zamkem objekt obalujici tridy
     * u nestatickych metod obalujici objekt (pozor na tento zasadni rozdil)
     *
     * V tomto pripade se tedy jedna o synchronizaci nad objektem, ktery referencujeme
     * v programu jako "m"
     */
    public synchronized void printList() {
        for (Object o : this.list) {
            System.out.println(o);
        }
    }
}
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

V tomto příkladě přidáváme ve dvou vláknech čísla do sdíleného seznamu. Jedno vlákno přidává sudá čísla do 50, druhé lichá čísla do 50. Hlavní vlákno obě vlákna odstartuje a počká vteřinu (práce obou vláken by za vteřinu měla být velmi pohodlně hotova), seznam seřadí a vypíše.

Pro synchronizaci bloků používáme instanci třídy Main, nad kterou pracujeme. Stejný zámek je implicitně použit při volání instanční metody printList.

Deadlock

Další pastí, kterou jsme si dosud nezmínili, je deadlock (uváznutí). K deadlocku může dojít u vícevláknového programu, pokud synchronizujeme podle více zdrojů a zároveň nedodržujeme stálé pořadí zámků.

synchronized (pliers) {
    synchronized (hammer) {
        //do something
   }
}
.
.
.
synchronized (hammer) {
    synchronized (pliers) {
        //do something
   }
}

K demonstraci deadlocku se obvykle využívá problému dvou kovářů (zastupují vlákna). Každý z nich potřebuje ke své práci kleště a kladivo. První kovář bere do ruky nejprve kleště a teprve poté si vezme kladivo, druhý kovář volí opačný postup – nejprve kladivo, pak kleště. Vše funguje perfektně dokud jsou oba kováři dostatečně rychlí a vždy jeden stihne sebrat oba kusy nářadí. V okamžiku, kdy první kovář sebere kleště a druhý kladivo, dojde k uváznutí. Oba kováři budou do konce věků čekat na uvolnění druhého nástroje (zámek může vlákno uvolnit až po opuštění kritické sekce).

Komunikace vláken

Výše uvedené příklady mají jednu nectnost, kterou jsme přecházeli bez povšimnutí – volání Thread.sleep(time) nad hlavním vlákem. V takto jednoduché situaci nám dané řešení zcela postačuje, ale častěji budeme potřebovat, aby vlákno čekalo na nějakou událost (např. modifikace seznamu je hotova), čímž se jednak vyhneme zbytečnému čekání, za druhé se pak nedostaneme do situace, kdy zpracování trvalo nečekaně déle a čekající vlákno tak začne zpracovávat nehotová data.

Wait, notify, notifyAll

Pro čekání na událost můžeme využít volání monitor.wait(), kde monitor je objekt zámku (volající vlákno musí být v synchronizované sekci – tj. musí zámek vlastnit). Vlákno, které narazí na tuto operaci se zablokuje (tj. odevzdá veškerý svůj procesorový čas a bude čekat na probuzení) a uvolní držený zámek.

Inverzní operací je volání monitor.notify(), které probudí některé z čekajících vláken. Metodu notify() musí vždy volat vlákno, které vlastní příslušný zámek. Probuzené vlákno pak čeká, než se mu podaří získat zámek pro sebe a poté pokračuje ve vykonávání zbytku kritické sekce.

Posledním voláním pro komunikaci mezi vlákny je metoda notifyAll(), která probudí všechna čekající vlákna.

Pro úplnost si uveďme, že všechny zmíněné metody má každý objekt, protože jsou zděděny od společného předka všech tříd – třídy Object (dokumentace).

Spurious wakeup

Aby situace nebyla příliš jednoduchá, tak na operačních systémech implementujících standard POSIX (např. Linux, Unix) může zřídka dojít k probuzení vlákna, které nebylo explicitně notifikováno. Toto chování není chyba (bug), ale vlastnost (feature) – ošetření situace, která toto chování způsobuje, by bylo na některých typech systémů příliš výpočetně drahé.

Kvůli spurious wakeupu by mělo vždy probuzené vlákno kontrolovat, že invariant jeho opětovného spuštění je skutečně splněn. Pokud tomu tak není, tak by mělo pokračovat ve spánku.

V reálném světě bychom toto stejně museli vždy kontrolovat, protože nemáme pod kontrolou rozvrhování na procesoru a probuzené vlákno může získat zámek buď téměř okamžitě, za milisekundu nebo také za týden, kdy by zmíněný invariant téměř jistě také neplatil.

/**
 * Demonstrace dvouvlaknove aplikace nad sdilenym seznamem s notifikaci vlaken
 * @author Pavel Micka
 */
public class Main {

    /**
     * @param args the command line arguments
     */
    private List list = new ArrayList();

    public static void main(String[] args) throws InterruptedException {
        //promenne vyuzivane ve vnitrnich tridach musi byt konecne (final)
        final Main m = new Main(); //vytvorime instanci objektu Main (kvuli seznamu list)

        Thread t1 = new Thread() {

            @Override
            public void run() {
                for (int i = 0; i < 50; i += 2) {
                    synchronized (m) {
                        m.list.add(i);
                    }
                }
                if (m.list.size() == 50) {
                    synchronized (m) {
                        m.notify();
                    }
                }
            }
        };
        Thread t2 = new Thread() {

            @Override
            public void run() {
                for (int i = 1; i < 50; i += 2) {
                    synchronized (m) {
                        m.list.add(i);
                    }
                }
                if (m.list.size() == 50) {
                    synchronized (m) {
                        m.notify();
                    }
                }
            }
        };

        t1.start();
        t2.start();

        synchronized (m) {
            while (m.list.size() != 50) { //spurious wakeup
                m.wait(); //wait until notified
            }
        }

        Collections.sort(m.list);
        m.printList();
    }

    /**
     * Vypise obsah predaneho seznamu
     * Metoda je synchronizovana, u statickych metod je zamkem objekt obalujici tridy
     * u nestatickych metod obalujici objekt (pozor na tento zasadni rozdil)
     *
     * V tomto pripade se tedy jedna o synchronizaci nad objektem, ktery referencujeme
     * v programu jako "m"
     */
    public synchronized void printList() {
        for (Object o : this.list) {
            System.out.println(o);
        }
    }
}
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

V kódu vidíme, že obě vlákna po přidání čísel zkontrolují, zda-li není již celá práce hotova (seznam obsahuje 50 položek). Pokud ano, tak probudí hlavní vlákno uspané na řádku 54. Samotné ověření podmínky notifikace nemusí být v synchronizované sekci, protože pokud v daný okamžik není podmínka splněna, tak ji splní později druhé vlákno. Pokud je splněna právě pro jedno vlákno, tak dojde k notifikaci. A pokud je splněna pro obě vlákna (z důvodu přepnutí vláken před podmínkou), tak druhá z notifikací neprobudí žádné čekající vlákno (což nijak nevadí).

Na řádku 53 si pak povšimněme cyklu, který zajistí, že se v případě spurious wakeupu hlavní vlákno znovu uspí.








Doporučujeme

Internet pro vaši firmu na míru