Budapesti M¶szaki és Gazdaságtudományi Egyetem
Big Data elemzési eszközök nyílt forráskódú platformokon el®adásjegyzet
Készítette:
Cseh Gábor Kócsó Balázs Váradi Szabolcs El®adó: Lektorálta:
Prekopcsák Zoltán
Holczer Tamás, Prekopcsák Zoltán
BMEVITMAV15
Budapest, 2014 AT X L E
Tartalomjegyzék
1. Bevezetés
4
2. Elosztott rendszerek, Hadoop, HDFS
6
2.1.
Elosztott rendszerek
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
6
Elosztott fájlrendszerek . . . . . . . . . . . . . . . . . . . . . . . . .
7
2.2.
Hadoop története . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
7
2.3.
Hadoop HDFS . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
8
2.4.
Hadoop szerver szerepek
9
2.1.1.
2.5.
. . . . . . . . . . . . . . . . . . . . . . . . . . . .
2.4.1.
Name Node (NN) . . . . . . . . . . . . . . . . . . . . . . . . . . . .
9
2.4.2.
Secondary Name Node (SNN) . . . . . . . . . . . . . . . . . . . . .
10
2.4.3.
Data Node (DN)
. . . . . . . . . . . . . . . . . . . . . . . . . . . .
10
Adatm¶veletek
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
10
2.5.1.
Olvasás
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
10
2.5.2.
Írás . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
11
3. MapReduce 3.1.
12
Az általános MapReduce modell . . . . . . . . . . . . . . . . . . . . . . . .
12
3.1.1.
1. Feladat . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
13
3.1.2.
2. Feladat . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
13
3.1.3.
Hadoop MapReduce felépítése . . . . . . . . . . . . . . . . . . . . .
14
3.1.4.
Hadoop Cluster . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
14
3.1.5.
Meghibásodások . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
16
4. MapReduce programozási minták, Streaming
17
4.1.
1. feladat
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
17
4.2.
2. feladat
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
17
4.3.
3. feladat
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
18
4.4.
4. feladat
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
18
4.5.
Streaming
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
19
4.6.
Adattömörítés . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
20
5. Hadoop csomagok, SQL for Hadoop: Hive 5.1.
22
Hadoop csomagok . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
22
5.1.1.
Flume
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
22
5.1.2.
Sqoop
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
22
5.1.3.
Hive
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
22
5.1.4.
Pig . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
23
5.1.5.
HBase
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
23
5.1.6.
Mahout
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
23
1
TARTALOMJEGYZÉK
5.1.7. 5.2.
Zookeeper . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
SQL for Hadoop: Hive 5.2.1.
23
. . . . . . . . . . . . . . . . . . . . . . . . . . . . .
24
. . . . . . . . . . . . . . . . . . . . . . . . . . . .
24
A Hive felépítése
6. Pig programozás
26
6.1.
Bevezetés
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
26
6.2.
A fájlok feltöltése . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
26
6.3.
6.4.
A script megírása . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
27
6.3.1.
Reláció deniálása
. . . . . . . . . . . . . . . . . . . . . . . . . . .
27
6.3.2.
Reláció deniálása sémával . . . . . . . . . . . . . . . . . . . . . . .
28
6.3.3.
Reláció létrehozása egy másik relációból
. . . . . . . . . . . . . . .
28
6.3.4.
Adatok kiírása . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
29
6.3.5.
Oszlop kiválasztása a relációból
. . . . . . . . . . . . . . . . . . . .
29
6.3.6.
Adatok írása a HDFS-re
. . . . . . . . . . . . . . . . . . . . . . . .
29
6.3.7.
Két reláció illesztése (Join m¶velet) . . . . . . . . . . . . . . . . . .
29
6.3.8.
Adatok rendezése . . . . . . . . . . . . . . . . . . . . . . . . . . . .
30
6.3.9.
Adatok sz¶rése és csoportosítása . . . . . . . . . . . . . . . . . . . .
30
További források
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
7. Hadoop klaszterek kongurációja és üzemeltetése . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
31
32
7.1.
Hardver
7.2.
Operációs rendszer
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
33
7.3.
Hálózat
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
33
7.4.
Disztribúció . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
34
7.5.
Feladatok ütemezése és kvóták használata
34
. . . . . . . . . . . . . . . . . .
8. Hadoop 2.0 - YARN, Tez, Spark 8.1.
8.2.
Hadoop 2.0
32
35
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
35
8.1.1.
Kialakulása
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
35
8.1.2.
Különbségek . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
35
8.1.3.
YARN job futtatás
. . . . . . . . . . . . . . . . . . . . . . . . . . .
36
Tez . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
37
8.2.1.
. . . . . . . . . . . . . . . . . . .
37
8.3.
Impala . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Eloszott rendezési példa Tez-ben
38
8.4.
Spark
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
39
8.5.
Storm
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
39
9. Hadoop & NoSQL: HBase 9.1.
Adatbázis-kezel® rendszerek
. . . . . . . . . . . . . . . . . . . . . . . . . .
40
Elosztott adatbázis-kezel® rendszerek . . . . . . . . . . . . . . . . .
40
NoSQL adatbázis-kezel® rendszerek . . . . . . . . . . . . . . . . . . . . . .
41
9.1.1. 9.2. 9.3.
40
HBase
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
9.3.1.
M¶ködése
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
9.3.2.
HBase adatszervezési példa
. . . . . . . . . . . . . . . . . . . . . .
A. MapReduce Patterns, Algorithms, and Use Cases A.1. Basic MapReduce Patterns . . . . . . . . . . . . . . . . . . . . . . . . . . . A.1.1. Counting and Summing
42 43 44
45 46
. . . . . . . . . . . . . . . . . . . . . . . .
46
A.1.2. Collating . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
47
2
TARTALOMJEGYZÉK
A.1.3. Filtering (Grepping), Parsing, and Validation . . . . . . . . . . . .
47
A.1.4. Distributed Task Execution
. . . . . . . . . . . . . . . . . . . . . .
47
A.1.5. Sorting . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
48
A.2. Not-So-Basic MapReduce Patterns
. . . . . . . . . . . . . . . . . . . . . .
A.2.1. Iterative Message Passing (Graph Processing)
48
. . . . . . . . . . . .
48
A.2.2. Distinct Values (Unique Items Counting) . . . . . . . . . . . . . . .
51
A.2.3. Solution II:
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
52
A.2.4. Cross-Correlation . . . . . . . . . . . . . . . . . . . . . . . . . . . .
53
A.3. Relational MapReduce Patterns . . . . . . . . . . . . . . . . . . . . . . . .
54
A.3.1. Selection . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
54
A.3.2. Projection . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
54
A.3.3. Union
55
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
A.3.4. Intersection A.3.5. Dierence
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
55
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
55
A.3.6. GroupBy and Aggregation . . . . . . . . . . . . . . . . . . . . . . .
56
A.3.7. Joining . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
56
3
1. el®adás Bevezetés BigData deníciója:
Olyan nagy és komplex adathalmaz, amelyet már a szokásos al-
kalmazásainkkal nem tudunk kezelni. Ez nem egy túl pontosan értelmezhet® deníció, ezért ez felhasználási területenként változik (1.1. ábra). Ha a felhasználási terület
Excel, akkor 1 millió sor, ha
az
valamilyen
memória alapú feldolgozóeszköz,
akkor amennyi memóriánk van,
azaz 4-8 GB, ha
adatbázis pl.
valamilyen
az általunk készített hatékony
fölött számít BigData-nak.
MySQl, akkor 50-100 GB, ha
diszket jól használó programok, akkor 1 TB
Nagy általánosságban azt lehet mondani, hogy 50-100 GB
alatt nem számít az adat BigData-nak, mert egy nagy memóriával rendelkez® felh® szolgáltatással ezek az adatok még nagyon jól kezelhet®ek.
1.1. ábra. A BigData értelmezése a felhasználási terület függvényében
4
ELADÁS 1.
BEVEZETÉS
Ett®l nagyobb adatmennyiség esetén már csak valamilyen elosztott rendszerrel tudjuk hatékonyan kezelni az adatainkat, ilyen elosztott rendszer a Hadoop. A scale up felfogás szerint, ha egy gépen szeretnénk minél nagyobb számítási kapacitást, akkor csak egyszer¶en felskálázzuk (azaz több memóriát és er®sebb processzort teszünk bele), míg a scale out szerint teljesen hagyományos PC-ken elosztott rendszerben gondolkodunk. Egy másik felfogás szerint onnantól beszélünk BigData problémákról, amikor az adat olyan nagy, hogy az már részévé válik a problémának, például amikor már nem férünk a memóriába, vagy amikor már egy számítógépen napokig fut az adott feladat. Összefoglalva bármilyen probléma, amely abból származik, hogy túl sok adatot kell kezelni. Ezek legtöbbször valamilyen hardver korlátból adódnak (1.2. ábra), amelyek egy normál PC esetén
12-24 TB méret¶ merevlemezt,
32-64 GB méret¶ memóriát
jelent.
Ezen felül jelent®s probléma az is, ha egy átlagos 2 TB méret¶ merevlemezr®l
le akarjuk olvasni a teljes tartalmát, akkor az 6 órába tellene a merevlemez átereszt® képessége miatt.
1.2. ábra. A hardver korlátok egy átlagos PC esetén
5
2. el®adás Elosztott rendszerek, Hadoop, HDFS
2.1. Elosztott rendszerek Az elosztott rendszerek lehet®vé teszik azt, hogy bármilyen m¶veletet több számítógép között elosztottan hajtsunk végre, ezáltal megnövelve annak sebességét, kapacitását, rendelkezésre állását. Egy elosztott rendszer rengeteg számítógépb®l állhat, melyek egy közös cél érdekében együtt tudnak dolgozni, és m¶ködni. Ezek a számítógépek egy hálózaton (vagy akár az interneten) keresztül vannak egymással összekötve, így tudnak kommunikálni. Fontos megemlíteni, hogy a meghibásodás ténye teljesen más, mint egy egyedülálló számítógép esetében. Az elosztott rendszerek ezt rugalmasan tudják kezelni, folytatódik a rendszer futása szinte észrevehetetlenül. Egyedülálló gép esetében természetesen ez nem lehetséges (a hiba típusától függ®en). Ez a fajta rugalmasság azért is fontos, mert egy több tíz vagy száz gépb®l álló hálózat esetén a meghibásodás egy napi vagy heti rendszerességgel el®forduló esemény, és nagyon fontos, hogy 1-2 gép kiesése esetén a többi gond nélkül tovább m¶ködjön, s®t lehet®leg a feladatok futása se szakadjon meg. Többféle célra is használják ezeket a rendszereket:
Elosztott adattároló központok:
Ezeket a rendszereket kifejezetten nagy mennyi-
ség¶ adatok tárolására találták ki. Mivel egy adathordozón és számítógépen meglehet®sen limitált a tárolható adatmennyiség, így ezt több számítógépen több adathordozón tudjuk tárolni. Többnyire az adatok megfelel®en vannak replikálva is, így az adatvesztés kockázata jelent®sen lecsökken.
Elosztott számítási kapacitás:
A másik nagy felhasználás, amikor az elosztott
rendszereket a teljesítményük miatt kapcsolják össze egy nagy számítási felh®vé. Ilyenkor egy bizonyos nagy számításigény¶ feladatot nagyon gyorsan és hatékonyan végre lehet hajtani, míg ez egy önálló számítógépen több évig eltarthatna (pl. genetikai minták elemzése). Az ilyen típusú rendszerek lehetnek el®re konguráltak, és telepítettek, de lehetnek akár önkéntesek is, ahova egyszer¶ felhasználói számítógépeket az interneten keresztül kapcsolnak be a rendszerbe, és használják az er®forrásaikat. (Ezek lehetnek a GRID computing, HPC, Volunteer computing, Distributed programming, MPI. Ezek mindegyike számítás- és nem adatigényes feladatokra vannak optimalizálva.)
6
ELADÁS 2.
ELOSZTOTT RENDSZEREK, HADOOP, HDFS
2.1.1. Elosztott fájlrendszerek Régen egy 1 GB-os merevlemezt körülbelül 4 MB/s sebességgel tudtunk olvasni. Mára a technológia sokat javult a tárolható adatmennyiség terén, de az olvasási sebesség növekedése nem a várt értékeket mutatja. Egy ma kapható 2 TB-os merevlemezt 100 MB/s sebességgel tudunk olvasni, ami még így is lassúnak mondható abban az esetben, ha a teljes lemeztartalmat szekvenciálisan végig szeretnénk olvasni (kb.
5,5 óra).
Ha ezt az
adatmennyiséget elosztva 100 db merevlemezen tárolnánk, akkor az adatokat ugyanilyen olvasási sebesség mellett szinte néhány perc alatt fel tudnánk dolgozni. Emellett a statisztikák szerint 100 db merevlemezb®l 5 hibásodik meg évente. Ezen okok miatt kezdték el létrehozni az elosztott fájlrendszereket. Az elosztott fájlrendszerek lényege, hogy a fájljainkat egy kliens-szerver architektúra segítségével tárolhatjuk illetve érhetjük el.
Egy elosztott fájlrendszerben egy vagy több
központi szerver található, amelyek tárolják az alapvet® információkat a többi szerverr®l, melyek száma elméletben korlátlan lehet. Ezek a kitüntetett szerepkörrel rendelkez® szerverek tárolják a fájlstruktúra hierarchikus felépítését, a metaadatokat, és a fájlok konkrét szerveren való helyét. A gyakorlatban legalább kett® központi szervert használnak annak érdekében, hogy redundancia biztosított legyen. Ha egy elosztott fájlrendszerbeli fájlt egy kliens géppel szeretnénk elérni, akkor gyakorlatilag semmilyen különbséget nem veszünk észre, tehát teljesen olyan, mintha az adott fájlt egy darab szerveren tároltuk volna, vagyis a tároló Node-ok struktúrája elfedésre kerül. Err®l az elfedésr®l az elosztott rendszer gondoskodik. Az elosztott rendszerek er®ssége abban rejlik, hogy általuk könnyen elérhet®ek több kliens számára is ugyanazok az adatok, és gyakorlatilag egy központi tárolóegységr®l beszélünk, a kliensek nem tárolnak lokálisan olyan fájlokat, amelyek más kliensek számára is fontosak lehetnek.
2.2. Hadoop története A 2.1. ábrán látható a Hadoop rendszer fejl®déstörténete. Eredetileg a Hadoop egy elosztott fájlrendszerként indult, amelyet a Google kezdett el fejleszteni Google File System néven. Ezután többen megpróbálták ezt reprodukálni, és elkészíteni Open Source projekt keretein belül.
2.1. ábra. Hadoop kialakulásának története
Ezek után a Yahoo is elkezdte ugyanezt a rendszert saját maga elkészíteni. A próbálkozásoknak és a fejlesztéseknek köszönhet®en egyre többen elkezdték használni a Hadoop rendszert (pl. Facebook, Last.FM), és ennek következtében az Apache felkarolta, és egy
7
ELADÁS 2.
ELOSZTOTT RENDSZEREK, HADOOP, HDFS
dedikált Open Source fejlesztési projekt keretein belül a mai napig ®k foglalkoznak a témával.
Több nagy cég üzleti szolgáltatásként is elindította a saját továbbfejlesztett és
kiegészített verzióját. A 2.2. ábrán látható, hogy 2012-ben a Hadoop rendszert céges környezetben 1 éven belül nagyon sokan tervezik bevezetni, tehát jelent®s növekedés várható a piacon.
2.2. ábra. Bevezetni kívánt technológiák céges környezetben
2.3. Hadoop HDFS A Hadoop rendszer egy teljesen saját, egyedülálló elosztott fájlrendszert használ, a Hadoop Distributed File System-et (HDFS). Ez a fájlrendszer arra lett tervezve, hogy kifejezetten nagy méret¶ fájlokat tároljunk rajta, mivel a rendszerben nagy adathalmazokon tudunk valamilyen lekérdezést, vagy m¶veletet egyszer¶en végrehajtani. Például log-ok esetén érdemes nagyobb egységekben tárolni az adatokat, így nagy fájlméret érhet® el. Az adatokat általában egyszer letároljuk, és többször teljesen végig kell olvasnunk ®ket (full scan). A Hadoop fájlrendszer arra lett tervezve, hogy átlagos otthoni PC-kre vagy átlagos szerverekre is telepíthet® legyen, ezzel is azt ösztönözve, hogy költséghatékonyabban tudjunk egy elosztott rendszert kialakítani (ez nagy mértékben különbözik a SAN szemlélett®l). A HDFS egy blokkalapú fájlrendszer (tipikusan 64-256 MB egy blokkméret), tehát a nagy fájlok (tipikusan a GB-os nagyságrendben) ilyen méret¶ blokkokra darabolódnak szét, és a blokkok egymástól függetlenül tárolhatók más-más er®forrásokon, és számítógépeken. A blokkméretet az elérési id® miatt kell nagyobbra venni, mint egy átlagos fájlrendszer esetében.
A fentebb említett 64-256 MB blokkméretnél nagyobbat már nem érdemes
választani, mert ezzel elveszítenénk a párhuzamosítás lehet®ségét, mivel a számítási feladatok esetében a blokkok betölt®dnek a memóriába, tehát így több is befér egyszerre, nem kell állandóan cserélgetni ®ket a számítás során. Ez a gyakorlatban azt jelenti, hogy egy 1 GB-os fájl esetében, 1 MB-os blokkok esetén az olvasás során a merevlemez fejének
8
ELADÁS 2.
mozgatása
ELOSZTOTT RENDSZEREK, HADOOP, HDFS
1000 · 10ms
keresési id®t jelent. Nagyobb blokkméret esetén ez a késleltetés
jóval csökken. Ezen kívül nagyon fontos, hogy minden blokk replikálódik legalább egy - de általában több (2-3) - másik számítógépen is, tehát ezzel elkerülhet® az adatvesztés (Hadoop esetén az alapértelmezett replikációs faktor a 3, ez azt jelenti, hogy minden blokk 3 különböz® helyen van eltárolva). Ha kiesik egy számítógép, azt egyszer¶en kivehetjük a rendszerb®l, és egy újat tehetünk a helyébe. Ezáltal a rendszer nagyon jól skálázható, rendkívül jól m¶ködik óriási géppark esetén is. A blokkok helyének meghatározását egy komplex terheléselosztó algoritmus határozza meg, ami többek között gyelembe veszi az egyes gépek terhelését, a replikálásból származó adatforgalom minimalizálását, illetve a minél nagyobb megbízhatóságot (ennek érdekében lehet®leg több rack illetve adatpark között is megosztja a replikákat).
2.4. Hadoop szerver szerepek A fájlrendszerben több kitüntetett szereppel rendelkez® gépnek kell lennie ahhoz, hogy az egész rendszer megfelel®en tudjon m¶ködni (2.3. ábra).
2.3. ábra. Hadoop szerver szerepek forrás:
http://www.atlantbh.com/how-to-build-optimal-hadoop-cluster/
2.4.1. Name Node (NN) Ebb®l általában egy van egy Hadoop rendszerben, feladata a metaadatok tárolása és karbantartása. Gyakorlatilag ® tudja azt, hogy milyen mappák, milyen fájlok találhatóak a fájlrendszeren. Ezt két fájl segítségével tudja tárolni:
Namespace image:
Edit log:
Ez egy lenyomat a fájlokról, mappákról.
A legutóbbi namespace image-hez képest történt változtatásokat írja le,
ezáltal nem kell mindig azt az image-t frissíteni, ami jelent®s er®forrásigénnyel járna. Ennek ellenére ezt a log-ot egy id® után bele kell f¶znünk a namespace image-be, mert feleslegesen nagy méret¶vé válna.
9
ELADÁS 2.
ELOSZTOTT RENDSZEREK, HADOOP, HDFS
Fontos megemlíteni, hogy a Data Node-ok rendszeresen jelentik a náluk lév® blokkok listáját a Name Node-nak, amit természetesen ® nyilván is tart. Így ha egy kliens a Name Node-hoz fordul egy fájlért, akkor ebb®l a naplóból tudja, hogy melyik blokkok kellenek, és a jelentésekb®l el tudja irányítani a klienst a megfelel® Data Node-hoz, ugyanis ezen jelentések nélkül a Name Node nem tudná, hogy milyen blokkok találhatóak a Data Nodeokon (amelyek nem kerültek mentésre még a namespace image-be). A metaadatok tárolásának hátránya, hogyha a Name Node kiesik, és a metaadatokról nincs biztonsági másolatunk, akkor minden adatunk elveszik annak ellenére, hogy zikailag az adatblokkok megmaradnak a Data Node-okon.
Ennek kiküszöbölésére RAID
és/vagy NFS (Network File System) technológiákat használnak, így mind helyileg, mind hálózaton is tudjuk ezeket tárolni. Létezik egy Name Node High Avaibility megoldás, amely szerint több Name Node van a rendszerben. Így mindegyik folyamatosan tárolja az információkat és közülük valamelyik aktív, a többi pedig passzív, a kliensek pedig mindig az aktívhoz fordulnak.
Az aktív
kiesése esetén ekkor bármelyik passzív rögtön átveheti az aktív szerepét. Ez a megoldás természetesen jelent®sen nagyobb terhet ró a Name Node-ra, hiszen biztosítani kell a konzisztenciát az aktív és passzív Name Node-ok között.
2.4.2. Secondary Name Node (SNN) Ez a szerepkör ahogy els®re gondolnánk nem egy másodlagos Name Node, tehát az elnevezése megtéveszt® lehet. Feladata az, hogy segítsen a NameNode-nak az általa tárolt adatok kezelésében, és a namespace image karbantartásában. gondoskodik arról, hogy a fentebb említett Edit log-ot belef¶zze a Namespace image-be, és ezeket újra átadja az els®dleges Name Node-nak.
2.4.3. Data Node (DN) Ezek a számítógépek tárolják gyakorlatilag a fentebb említett adatblokkokat. k jelentkeznek be a Name Node-nak egy fájl keresése során, hogy annak a fájlnak egy blokkja náluk található meg (reporting). A reporting folyamatosan megy, így szerez tudomást a Name Node a túl és alul replikált blokkokról, illetve így tudja a klienst a megfelel® Data Node-hoz irányítani.
2.5. Adatm¶veletek 2.5.1. Olvasás Az olvasást egy példán keresztül mutatjuk be. A 2.4. ábrán láthatjuk, hogy a kliens el®ször a Name Node-dal veszi fel a kapcsolatot (Ê), akinek megmondja, hogy a Results.txt állományt szeretné olvasni. A Name Node az általa tárolt metaadatok alapján megmondja a kliensnek, hogy ennek a fájlnak három blokkja létezik, és a blokkok külön-külön melyik Data Node-okon vannak tárolva (Ë). Ezután a kliens egyesével közvetlenül az adatblokkokat tartalmazó valamelyik Data Node-hoz fordul, és letölti azokat (Ì) (minden adatblokk esetében az azt tartalmazó valamelyik Data Node-hoz).
10
ELADÁS 2.
ELOSZTOTT RENDSZEREK, HADOOP, HDFS
2.4. ábra. HDFS olvasás forrás:
http://blog.csdn.net/suifeng3051/article/details/17288047
2.5.2. Írás Az írás hasonló módon történik, mint az olvasás.
A 2.5.
ábrán látható, hogy a kliens
el®ször felveszi a kapcsolatot a Name Node-al (Ê), hogy ® egy File.txt-t szeretne tárolni, amelyet 3 blokkra tudott felbontani. A Name Node visszaküld annyi Data Node címet, amennyi blokk szükséges (Ë). A kliens ezután közvetlenül a Data Node-oknak elküldi az egyes blokkokat (Ì). Ha ez készen van, akkor a Data Node-ok replikálják az új blokkokat (Í), és jelentik ezt a Name Node-nak (Î).
2.5. ábra. HDFS írás forrás:
http://blog.csdn.net/suifeng3051/article/details/17288047
11
3. el®adás MapReduce A MapReduce nem egy programnyelv hanem programozási modell, egy programozási keretrendszer, amely megadja, hogy hogyan hozzunk létre elosztott programokat. Kifejezetten alkalmas arra, hogy nagy adathalmazon dolgozzon elosztott módon, sok adat párhuzamos feldolgozására van kitalálva. Mivel nem egy programnyelv, ezért van megvalósítása többféle programozási nyelv esetén is, például Java, Python, R stb. A MapReduce nem csak Hadoop specikus, más adattárházakat is lehet MapReduce-ban programozni, ilyen például a MongoDB.
3.1. Az általános MapReduce modell Egy példán keresztül nézzük meg az általános MapReduce modell m¶ködését.
Adott
egy adathalmaz, amely h®mérsékleti értékeket tárol. Határozzuk meg az évi maximum h®mérsékletet.
Adat: Eredmémy:
év, hónap, nap, óra, perc, város, h®mérséklet (nagy adathalmaz) év, maximum h®mérséklet (kis adathalmaz)
Ha egy gépen szeretnénk leprogramozni ezt a feladatot, akkor egy sima maximum kiválasztást kell valamilyen nyelven implementálnunk, így nem kell memóriában tartani az adatot, egyszeri végig olvasással meg tudjuk mondani a kérdésre a választ. Ha viszont már annyi adatunk van, hogy egy gépen ezt nem tudjuk tárolni, akkor valamilyen elosztott rendszeren kell az adott problémát megoldani. Ez viszont nem egyszer¶, ezért találták ki a MapReduce keretrendszert, amely elfedi a programozótól a párhuzamosítás feladatát (pl.: nem kell foglalkozni a gépek közötti kommunikációval, különböz® hibák és kiesések kezelésével, er®forrás kiozstással stb).
A keretrendszer alapvet®en kulcs-érték párokon
dolgozik, egy szöveges tartalom esetén a kulcs az, hogy a fájl hanyadik sora, és az érték pedig a sor tartalma szövegként. A MapReduce a nevéb®l adódóan két lépésb®l áll egy Map-b®l és egy Reduce-ból.
Map: (K1 , V1 ) −→ list(K2 , V2 ) Reduce: (K2 , list(V2 )) −→ list(K3 , V3 ) A Map lépésben kulcs-érték pár jön bemenetként és egy újfajta kulcs-érték párt, listát fog kiadni a kimenetén. A Reduce lépés a Map kimenetén lév® kulcs-érték párokat kapja meg, olyan formában, hogy az azonos kulcsokhoz tartozó összes érték egy listába kerül.
12
ELADÁS 3.
MAPREDUCE
Err®l a lépésr®l a keretrendszer gondoskodik. Végül a Reduce kimenetén újabb kulcs érték párokat fogunk kapni. Nézzük meg ez alapján az el®bbi példát:
Map: Reduce:
(sorID, sor)
−→
(év, h®mérséklet)
(év, list(h®mérséklet))
−→
(év, max(h®mérséklet))
A Map bemenetén kulcs-érték párokat kapunk, ami jelen esetben sorazonosító-sor párokat jelent. Érdemes kiadnunk minden sorból, hogy melyik évr®l szól az a sor és mi volt az ottani h®mérséklet.
Ezzel lényegében a Map lépésben ebb®l az adatból kisz¶rjük azt
amire szükségünk van, és minden sorra kiadjuk ezt a párost úgy, hogy az év lesz a kulcs. A Reduce bemenetén megjelenik az év mint kulcs és az összes abban az évben mért h®mérséklet. A Reduce-ból pedig kiadjuk az évet és a maximum h®mérsékletet. A MapReduce keretrendszerben megírt program esetén a programozónak annyi a feladata, hogy logikailag megírjon egy Map és egy Reduce függvényt majd a keretrendszer gondoskodik róla, hogy elossza ezt a feladatot. A Map függvény bemenete lehet blokk, fájl de akár sor is és a bemeneten egymástól függetlenül soronként hajtódik végre. Tehát a Map lépés triviálisan párhuzamosítható. A Reduce lépés is, hogyha megkapja az egy kulcshoz tartozó összes elemet akkor az is egyszer¶en párhuzamosítható.
A keretrend-
szer azt biztosítja számunkra, hogy amikor kijönnek ezek a kulcs érték párok akkor ez biztosan megérkezzen egy Reduce függvény bemenetére úgy, hogy az összegy¶jtés már tulajdonképpen megtörtént.
Ha két gépr®l beszélünk akkor úgy lehet elképzelni, hogy
meg van írva egy Map függvény, (hogy hogyan lehet kiszedni az évet és a h®mérsékleteket az adatokból) és itt a Map függvénynek meg van adva az, hogy páros évszámú adatokat az egyikhez a páratlan évszámú adatokat a másik géphez küldi. A Reduce gépen történik ezeknek a listába rendezése, kulcs alapján a beérkez® adatokat rendezi és utána könnyedén létre tudja hozni ezeket a listákat. Hadoop esetén az MapReduce keretrendszer gyel a lokalitás elvére is, tehát minden adatot lehet®leg ott dolgoz fel, ahol az rendelkezésre áll, így nagyban csökkentve a hálózati kommunikációt, és növelve a sebességet.
3.1.1. 1. Feladat Vannak dokumentumaink vagy valamilyen szöveges fájlok amik soronként tetsz®leges szöveget tartalmaznak, az elvárt eredmény az, hogy minden szóra mondjuk meg, hogy hányszor szerepel.
Map: Reduce:
−→ (szó, 1) list(#)) −→ (szo, #)
(sorID, sor) (szó,
Ahogy látunk egy szót, azt rögtön kiadjuk a Map oldalon, míg a Reduce oldalon egy sima számlálót kell megvalósítani. Ennek egy variációja, ha a Map oldalon megszámláljuk az egyforma szavakat a sorban, és a szavak számát adjuk meg a konstans 1 helyett. Az utóbbi megoldás nagyobb terhet rak a Map oldalra, viszont csökkenti a hálózati kommunikációt és a Reduce oldal terhelését.
3.1.2. 2. Feladat Kérem az összes sort amiben szerepel a BME szó. A feladatot Map oldalon lehet a legegyszer¶bben megoldani. A kiadott kulcs bármi lehet. A Reduce egyszer¶en megismétli a kimenetén a bemenetét.
13
ELADÁS 3.
Map: Reduce:
MAPREDUCE
(sorID, sor)
−→
(?, sor)
dummy reduce
3.1.3. Hadoop MapReduce felépítése
3.1. ábra. A MapReduce m¶ködése Hadoop esetén forrás:
http://www.drdobbs.com/database/hadoop-the-lay-of-the-land/240150854
Vannak HDFS blokkjaink, a Hadoopban az esetek jelent®s részében egy Map task egy HDFS blokkon dolgozik (egy HDFS blokk egy számítógépet jelent). A Map feladat soronként egymástól független, általában JAVA-ban megírt program részletr®l beszélünk, indul egy JVM és az soronként elkezdi feldolgozni. A kimenetét pedig megfelel®en irányítani kell a Reduce-ok felé. Az a feladata, hogy a megfelel® kulcsú elemeket a megfelel® Reduce-hoz juttassa el. A Map után létezik még egy Partitioning függvény ami kap egy kulcsot és megmondja, hogy melyik Reduce folyamathoz tartozik, lényegében ez egy hash függvény.
A Particioning-et már a Map gépen végezzük.
Tudunk csinálni egy Combi-
ner függvényt is ami egy adatblokkon a Partitioning után lefuttat egy másik függvényt is. Tudunk írni saját particionáló függvényt is de alapértelmezetten egy egyenletes hash függvény, amely csak nagyon egyenetlen adateloszlás esetén nem m¶ködik jól. Ahhoz, hogy majd a Reduce-nál össze tudjuk gy¶jteni a kulcshoz tartozó összes értéket egy nagy Sortolást kell végrehajtani, történik egy lokális sort így lesznek kisebb eredményhalmazaink amik már rendezettek és ezt küldik el a Reducer-nek.
A kisebb eredményhalmazain-
kat egy összefésüléses rendezéssel érdemes összef¶zni (természetesen err®l a keretrendszer gondoskodik). A Reduce a kimenetét egy HDFS fájlba szokta tipikusan kiírni. Ahogy az architektúrából látszik egy MapReduce feladat indításának van egy elég nagy overhead-je (ez akár perces nagyságrendbe is eshet), tehát csak kell®en nagy feladatok esetén érdemes használni, és interaktív feladatokra nem igazán alkalmas.
3.1.4. Hadoop Cluster Vannak hasonló szerepek mint a fájlrendszer esetében. Master szerep: JobTraceker, ® felel azért, hogy egy teljes job végrehajtásra kerüljön. A TaskTraceker pedig a végrehajtásért
14
ELADÁS 3.
MAPREDUCE
felel®s, vannak neki szabad kapacitásai, Map illetve Reduce slot-ja (mennyi az ® kapacitása) és a JobTraceker ezzel gazdálkodik, egyszer¶en odaadja neki a feladatokat, hogy melyik Map függvényt melyik Reduce függvénnyel futassa le. A TaskTrackernek semmi információja nincs arról, hogy a többiek hogy állnak, néha visszaszól a JobTracekernek, hol tart a feladatával (HeartBeat). Lényegében minden szervezési logika itt is a Masterben (JobTrackerben) összpontosul. A 3.2. ábra az írás és olvasás m¶ködését szemlélteti. Mi történik amikor egy számítási feladatot szeretnénk végrehajtani. Van egy kliensünk, egy JobTrackerünk illetve több TaskTracker. Illetve van egy HFDS (közös fájlrendszer) amivel mindenki tud kommunikálni.
Az els® lépés az az, hogy a kliens bejelentkezik a
JobTrackerhez, hogy szeretne egy feladatot végrehajtani. Erre válaszul kap egy azonosítót és egy jogosultságot, hogy onnantól kezdve azzal a job-al foglalkozzon. Különböz® szükséges dolgokat a HDFS-re másolja a kliens (Map és Reduce kódja illetve a el®re megírt library-k amiket ezek a függvények felhasználnak) lényegében a job-nak egy teljes információs csomagját felrakja a HDFS-re. A TaskTracker sebessége nagyban függ attól, hogy mekkora replikáció számot állítunk be a HDFS-en. Nagyon sokszor a replikációk számát magasra állítják, így minden TaskTracker gyorsan hozzáférhet az HDFS-en tárolt adatokhoz. A következ® lépés a submit vagyis a kliens szól a JobTrackernek, hogy induljon el ez a job. A HDFS-en megvan minden információ ami a job-ot leírja, ez tipikusan egy XML vagy egy JAR fájl stb.
3.2. ábra. A JobTracker m¶ködése Hadoop esetén Amikor a JobTracker megkapja ezt a submit-ot megnézi, hogy a leíró helyes illetve az adatot is leírja amin le kell futtatni ezt a job-ot. Így ki tudja számolni hogy melyik fájl hány blokkból áll illetve hány Map és hány Reduce folyamat lesz. Rendelkezésre kell álljon minden olyan információ ami a feladatok kiosztását és elkezdését lehet®vé teszi. Nagyon fontos dolog, hogy nem a JobTracker fog kapcsolatot létesíteni a TaskTrackerek-kel, hanem várja, hogy a TaskTracker-ek bejelentkezzenek (HeartBeat)és elküldjék mennyi üres slot-juk van illetve az egyes taskok, hogy állnak náluk. A JobTracker tudja, hogy az egyes
15
ELADÁS 3.
MAPREDUCE
adatblokkok hol helyezkednek el és melyik gépen vannak HDFS-en, továbbá tud arról, hogy milyen aktív jobok vannak és melyek azok amelyek nem futnak sehol és erre válaszul ad egy task-ot a TaskTrackernek amiben benne van, hogy a HDFS-r®l hol tudja elérni annak a task-nak az információját. A JobTracker kiadja a task-ot a TaskTracker begy¶jti az információkat, hol érhet® el a Map függvény illetve a különböz® Partícionáló függvényeket és ezeket a szükséges adatokat a HDFS-r®l betölti. Így m¶ködik minden HDFS esetén, amíg vannak szabad Taskok amiket végre kell hajtani, addig mindegyik bejelentkezésnél meg fogják kapni a feladatokat. Ha pedig végeznek küldenek egy HeartBeatet, hogy ez a task készen van. Adott esetben egy Map task szól hogy ® készen van és amikor a Reduce ezt kiolvasta akkor lesz befejezettnek tekintve a Map-nek a feladata.
3.1.5. Meghibásodások Ha meghal egy task akkor nincs semmi baj mert újraindítja azt a taskot a TaskTracker. Sajnos a JobTracker nem tudhatja, hogy egy-egy task azért halt meg mert rossz a felhasználó kódja, elment az áram, elromlott egy switch stb. Ilyenkor van egy limit, hogy hányszor lehet újraindítani azt a taskot. Igyekszik egy másik Node-on újraindítani az elhalt taskot a JobTracker. Létezik egy spekulatív futtatás nev¶ eljárás, miszerint ugyanazt a taskot több TaskTrackeren is futtatja. Ennek az a célja, hogy ha bármikor van egy olyan node ami túl lassú, akkor az ne blokkolja az egész rendszert ezért több példányon elindul és ha valami befejez®dött a másikat eldobja (ezt leginkább az utolsó task-ok esetén szokta csinálni a JobTracker). Ez abban az esetben jó, ha viszonylag kihasználatlan a cluster és van szabad er®forrás. Ha egy TaskTracker kiesik akkor nem fog jönni t®le több HeartBeat ilyenkor a JobTracker azt látja, hogy az összes Task ami ott futott elhalt és újraindítja máshol. Ha a JobTracker áll meg, akkor köztes állapotban maradnak a Nodeok és ha újra elindul akkor elölr®l indulnak az adott task-ok. Erre megoldás a redundancia alkalmazása, létezik egy High Availability konguráció ahol két JobTracker van.
16
4. el®adás MapReduce programozási minták, Streaming
4.1. 1. feladat Adott egy adathalmaz, amely h®mérsékleti értékeket tárol.
Határozzuk meg egy város
átlagh®mérsékletét egy adott hónapban.
Adat1: Eredmény: Map: Reduce:
év, hónap, nap, óra, perc, város, h®mérséklet város, hónap, átlagh®mérséklet
(sorID, sor)
−→
((város + hónap), h®mérséklet)
((város + hónap), list(h®mérséklet))
−→
((város + hónap), avg(h®mérséklet))
A szövegfájloknál a bemeneti kulcs-érték szinte mindig (sor, sorID) lesz, amikor valamilyen aggregálást végzünk.
Ha SQL lekérdezést készítenénk erre a feladatra, akkor a Map
kimenetének a kulcsa az lesz, amit a
GROUP BY
mögé írnánk, ami szerint aggregálunk.
Ezért lesz a Map kimenetének kulcsa a (város + hónap). A keretrendszer automatikusan ezen kulcs alapján csinál egy listát, amely a Reduce bemenete lesz majd, így egy kulcshoz tartozó értékek egy Reducehoz kerülnek.
A Reduce pedig kiszámolja az egy kulcshoz,
azaz a (város + hónap)-hoz tartozó átlagh®mérsékletet.
4.2. 2. feladat Adott egy adathalmaz, amely egy weblog adatait tárolja. Határozzuk meg hogy melyik nap hány oldalletöltés volt.
Adat2: Eredmény:
dátum, id®, sessionID, url dátom, oldalletöltések száma
Ha ez egy strukturált SQL tábla lenne, akkor a következ® lekérdezéssel lehetne a választ megadni:
SELECT date, COUNT(*) FROM table GROUP BY date A szövegfájloknál a bemeneti kulcs-érték szinte mindig (sor, sorID) lesz, amikor valamilyen aggregálást végzünk. A
GROUP BY
mögött a dátum van, ezért a Map kimeneti kulcsa a
17
ELADÁS 4.
Map: Reduce:
MAPREDUCE PROGRAMOZÁSI MINTÁK, STREAMING
(sorID, sor)
−→
(dátum, list(1))
(dátum, 1)
−→
(dátum, cnt(1))
dátum lesz, és csak meg kell számolni, hogy abból hány darab van. A Reduce-nak már csak ezeket az 1-eseket kell megszámolnia.
4.3. 3. feladat Az adathalmaz ugyanaz, mint az el®bb, de most azt szeretnénk megtudni, hogy hány egyedi látogató volt az oldalon egy napon. Egyedi látogatás alatt az egy sessionID-hoz tartozó lekéréseket értjük.
Adat2: Eredmény:
dátum, id®, sessionID, url dátum, egyedi látogatószám
Ha ez egy strukturált SQL tábla lenne, akkor a következ® lekérdezéssel lehetne a választ megadni:
SELECT date, COUNT (DISTINCT(sessionID)) FROM table GROUP BY date
Map: Reduce:
(sorID, sor)
−→
(dátum, sessionID)
(dátum, list(sessionID))
unique
−−−→
(dátum, cnt(distinct(session)))
A szövegfájloknál a bemeneti kulcs-érték szinte mindig (sor, sorID) lesz, amikor valamilyen aggregálást végzünk. A Map kimeneti kulcsa a dátum lesz és a Reduce fogja az egyediséget garantálni (unique).
Az egy Reduce folyamatnak kell az egy dátumhoz tartozó összes
kérést a memóriában tartani és ez sz¶k keresztmetszet lehet, ha egy napon nagyon sok adat keletkezett, akkor a folyamatunk nem tud skálázódni. Ezért érdemesebb szétbontani ezt két külön MapReduce folyamatra:
Map 1: Reduce 1: Map 2: Reduce 2:
(sorID, sor)
−→
((dátum + sessionID), 1)
−→
((dátum + sessionID), list(1))
((dátum + sessionID), 1) (dátum, list(1))
−→
−→
((dátum + sessionID), 1)
(dátum, 1)
(dátum, cnt(1))
Az els® MapReduce folyamat kimeneti kulcsa a (dátum + sessionID), így minden naphoz egy sessionID csak egyszer tartozhat. A második folyamatban már csak meg kell számolnunk, hogy az adott dátumhoz hány bejegyzés található. Ezzel módszerrel skálázhatóvá tettük a megoldást még akkor is, ha egy naphoz nagyon sok adat tartozik, mert nem kell egy Reduce folyamatnak a memóriában tartani az egy naphoz tartozó összes adatot.
4.4. 4. feladat Az adathalmazunk az el®bb már megismert weblog adatai, amihez felveszünk még egy kapcsolótáblát, amely megmondja, hogy melyik sessionID melyik felhasználóhoz tartozik. Gyakori feladat, hogy a két táblát össze kell illeszteni (join).
18
ELADÁS 4.
Adat2: Adat3: Map: Reduce:
MAPREDUCE PROGRAMOZÁSI MINTÁK, STREAMING
dátum, id®, sessionID, url - LEFT sessionID, userID - RIGHT (sorID, sor)
−→
(sessionID, (L/R + sor))
(sessionID, list(L/R + sor)
Erre az egyik lehet®ség a
−→
list(sessionID, (sorR + sorL))
REDUCE-SIDE join:
Az Adat2 halmazt nevezzük el baloldali táblának, az Adat3 halmazt pedig nevezzük el jobb oldali táblának. A Map bemenete a mindkét tábla soraiból adódik, de tudnia kell, hogy melyik táblából érkezett az adat. Ha SQL lekérdezést készítenénk erre a feladatra, akkor a Map kimenetének a kulcsa az lesz, ami alapján végeznénk a join m¶veletet. Ezért a Map kimenetén a kulcs a sessionID lesz, az érték a sor tartalma és hogy melyik táblából származik (L/R). A Reduce folymat a sessionID-hoz tartozó értékeket a memóriában tartja és úgy végzi el az összeillesztést (például egy for ciklussal összeilleszt mindenkit mindenkivel). Ez akkor okozhat problémát, ha az összeilleszt® elemhez (jelen esetben a sessionID-hoz) nagyon sok adat tartozik, amely nem fér el a Reducer memóriájában. A másik hibája ennek a megoldásnak, hogy lehet olyan sessionID, amir®l nincs rekordunk a másik táblában. Ekkor ezt feleslegesen küldjük át és generálunk vele nagy adatforgalmat. (Sok esetben el®fordul, hogy a felhasználó nem lép be az adott oldalra, így csak sessionID-t tudunk hozzá társítani, de userID-t nem.) Ez a probléma egy másfajta joinnal elkerülhet®.
A Map kimenetén nem adjuk ki már
azokat az elemeket, amikhez nem lehet másik táblából adatokat társítani, ez lesz a
SIDE (broadcast) join: Map: Reduce:
(sorID, sorL)
−→
MAP-
(sorID, (sorL + sorR))
-
A joinok nagy része olyan szokott lenni, hogy van egy nagy táblánk és van egy kisebb táblánk, például minden felhasználóhoz egy rekord. Ha ez a kisebb tábla elfér a memóriában, akkor el®ször minden Mapnek elküldjük ezt a táblát, hogy csináljon bel®le egy sessionID-userID struktúrát a memóriában. Eztán a másik táblából átküldjük a sorokat és megnézzük, hogy melyiket kell ehhez hozzáilleszteni. Így a Map bemenete csak a sorID és a bal oldali tábla sorai lesznek, míg a jobb oldali tábla a Map memóriájában van. Az összeillesztés már a Map lépésben megtörténik, tehát nincs szükség Reduce lépésre. Általában ez a két join típus használatos, mindkett®nek megvannak a saját korlátai, de ha van valamilyen plusz információnk az adatok szerkezetér®l vagy rendezettségér®l, akkor gyorsabb és jobban skálázódó join függvényeket is tudunk készíteni. További MapReduce példák az A. függelékben találhatóak.
4.5. Streaming Java-ban MapReduce kódot írni nehézkes, mert meglehet®sen sok kódot kell írni, ezért valós igény az, hogy ne csak Hadoop-Java API-val írhassunk kódot. Erre van egy olyan megoldás, aminek a neve Hadoop-streaming: bármilyen nyelvben meg lehet írni MapReduce függvényeket, amelyek a standard bemenetr®l (stdin) tudnak olvasni és standard kimenetre (stdout) tudnak írni. A keretrendszer a standard bemenetre fogja küldeni az
19
ELADÁS 4.
MAPREDUCE PROGRAMOZÁSI MINTÁK, STREAMING
adatot és a standard kimeneten fogja várni a függvény eredményét. A kulcs-érték párok szeparálása soronként tabbal történik. A következ® példán keresztül nézzük meg, hogy ezt hogyan lehet megvalósítani pythonban.
Bemenet: Kimenet:
dátum, h®mérséklet adott évi maximum h®mérséklet
Ennek a MapReduce megoldása:
Map: Reduce:
(sorID, sor)
−→
(év, h®mérséklet)
(év, list(h®mérséklet) )
−→
(év, maxh®mérséklet)
A Map és a Reduce függvények megvalósítása pythonban:
1 2 3
for line in sys . stdin : vals = line . split (" ,") print vals [0] \t vals [1] 4.1. Kódrészlet. map.py
1 2
last_key = None max_val = -9999
3 4 5 6 7 8 9 10 11 12
for line in sys . stdin : ( key , value ) = line . split ( "\t ") if key == last_key : if max_val < value : max_val = value else : print last_key max max_val = -9999 last_key = key 4.2. Kódrészlet. reduce.py
4.6. Adattömörítés Általános felhasználás mellett a Hadoop keretrendszer inkább tárhely, mintsem processzorigényes. Nagyon nagy adathalmazokat kell tárolnunk, illetve egy Map és egy Reduce lépés között nagyon sok adatot kell mozgatnunk. Ezért ezeket a lépéseket érdemes optimalizálni, amelynek az els®dleges módja az adattömörítés. Adattovábbítás a hálózaton Snappy tömörítéssel történhet, mivel nagyon kicsi a CPU overheadje. Nem olyan hatékony tömörít® algoritmus, mint például a gzip, de streamként tud tömöríteni és így elég sokat tud spórolni az adatátvitelen (20-30 %-os teljesítménynyereség). A másik lehet®ség a tömörített fájlok használata. A rendszer automatikusan felismeri a tömörített fájlokat és kitömöríti a különböz® feladatok el®tt. tároláshoz a következ® tömörítéseket szokták alkalmazni:
20
A merevlemezen történ®
ELADÁS 4.
gzip:
MAPREDUCE PROGRAMOZÁSI MINTÁK, STREAMING
Tegyük fel hogy van egy 1 GB-os fájlunk ami 16 blokkból áll és tömörítve
van. A blokkméret 64 MB, így ekkora részekre van vágva a tárolónkon. Ha csak egy rész kell a fájlból, akkor is ki kell ki kell tömöríteni az egészet, hogy ki tudjuk olvasni bel®le az adatot. Egy Map folyamat el®tt mindenhova el kellene juttatni a teljes fájlt, hogy az adott blokkot a Map fel tudja dolgozni. Ezért a gzip esetén a fájlokat úgy érdemes szervezni, hogy a fájlok a blokkmérettel azonosak legyenek.
bzip2:
Olyan tömörítési eljárás, amelyet ha bárhol elkezdünk olvasni, akkor is ki
lehet olvasni a fájl tartalmát, de ezt nem annyira használják, mert eléggé lassú és nem annyira hatékony.
lzo:
Az ezzel a tömörített fájlt sem lehet bárhonnan olvasni alapesetben anélkül,
hogy ki kellene az egész fájlt tömöríteni, de emellé kitaláltak egy metaadat fájlt, amely tartalmazza a Map folyamat számára, hogy mely blokkot kell kitömöríteni. A legegyszer¶bb eset, ha nyers adatokkal dolgozunk, de nagyon nagy fájlméretek esetén már szükségessé válik a tömörítés, akkor ha lehetséges a gzip-es tömörítést érdemes alkalmazni a blokkmérettel azonos nagyságú fájlokkal. Ezen kívül sok kicsi fájl összevonására még a SequenceFile formátum is használatos, ami egy Hadoop specikus formátum, leginkább a
tar
formátumra hasonlít.
21
5. el®adás Hadoop csomagok, SQL for Hadoop: Hive
5.1. Hadoop csomagok Minden csomag vagy projekt opensource, Apache oldalán megtalálható a megfelel® aloldalon forráskóddal és dokumentációval együtt.
5.1.1. Flume Logok tárolása és gy¶jtése a szerepe, képes HDFS-en tárolni a logokat.
Egy általános
use-case, hogy van egy webes cég ahol kiszolgálják felhasználókat. Minden webszerveren indítanak egy Flume ágenst aminek van valami bemeneti pontja, és bármilyen üzenetet kap onnantól az ® feladat az, hogy továbbítsa ezt az üzenetet. Ez egy megbízható logolási rendszer.
Fontos, hogy a logok megmaradjanak ugyanis kés®bb ezek alapján lehet
elemezni a program m¶ködését és a felhasználói élményt. Megkönnyíti, hogy bármilyen számítógépt®l logokat gy¶jtsünk és nem kell a webalkalmazásnak azzal foglalkoznia, hogy a logokat elküldje, a logszerver megkapja, hanem mindezt átveszi a Flume.
Nem egy
speciális Hadoop csomag de a Hadoophoz nagyon sokan használják. Fontos megjegyezni, hogy ez egy egyirányú adat közvetítés.
5.1.2. Sqoop Különböz® SQL alapú adatbázisokkal tud adatot cserélni. Valamilyen JDBC kapcsolaton keresztül elér egy adatbázist és képes arra, hogy ha valahol van egy 10 gépb®l álló MySQL cluster akkor a sqoop, ezt a táblát át tudja másolni a HDFS-re.
Ilyenkor több Mapet
elindít és elosztottan tud másolni adatbázisok és Hadoop között. Ez természetesen odavissza m¶ködik és az a nagy el®nye, hogy habár ezt bármilyen adatbázissal meg tudná csinálni, képes arra, hogy párhuzamosítsa ezt feladatot így sokkal gyorsabb. A folyamat lényegében az, hogy indul egy MapReduce job a JDBC connection kiépül és jönnek az adatok.
5.1.3. Hive Ez a csomag már az elemzési réteghez tartozik míg a korábbiak az adatelérési rétegben nyújtanak szolgáltatásokat.
Tulajdonképpen egy SQL lekérdez® nyelvet nyújt Hadoop
22
ELADÁS 5.
HADOOP CSOMAGOK, SQL FOR HADOOP: HIVE
felé. A Hive-nak az a feladata, hogy SQL lekérdezéseket lehet vele végrehajtani. Az SQL lekérdezésb®l csinál egy MapReduce job-ot és ezt lefuttatja a clusteren majd visszaadja strukturált táblázatos formában az eredményeket. Ez kényelmes mivel csak SQL kifejezéseket kell írni és nem kell gondolkozni a Map és a Reduce függvényeken. Sok mindenben különbözik egy hagyományos adatbázisnak az SQL lekéréseit®l, sokkal több id®be telik egy lekérdezés, ezért tipikusan analitikus elemzésekhez használják.
5.1.4. Pig Ez egy szkript nyelv ami céljában hasonlít az SQL-hez. A feladata lényegében ugyanaz, MapReduce job-okat csinál a szkriptb®l. Nagyon rugalmasan kezeli a sorok feldolgozását bármit megeszik. Preziben szinte kizárólag piget használnak, viszonylag könnyen tanulható és hasonló szerepe van mint a Hive-nak, hogy helyetted MapReduce jobokat hozzon létre.
5.1.5. HBase Minden amir®l eddig szó volt az HDFS és MapReduce alapokon m¶ködött.
Az HBase
egy specialitása, hogy nem indít MapReduce jobot mivel az HBase az egy kulcs-érték adatbázis mely megvalósítja a NoSQL paradigmát. A NoSQL (egyes értelmezések szerint Not only SQL, azaz nem csak SQL, más értelmezés szerint egyszer¶en csak nem SQL) adatbázis szoftverek gy¶jt®neve.
A NoSQL adatbázisok els®sorban nem táblázatokban
tárolják az adatokat és általában nem használnak SQL nyelvet lekérdezésre. Ez egy kulcsérték adatbázis amelynek hatalmas el®nye az, hogy nem MapReduce alapokon dolgozik, hanem közvetlen hozzáférése van a HDFS-hez.
Így képes milliszekundumos lekérések
végrehajtására (sokkal gyorsabb mint a Hive).
5.1.6. Mahout Ez egy szintén népszer¶ csomag, statisztikai gépi tanulási algoritmusokat tartalmaz, osztályozó algoritmusok, gyakori mintakeres®, szöveg osztályozásra használják sokan.
5.1.7. Zookeeper Létezik egy állatgondozó nev¶ csomag (Zookeeper). Minden egyéb Hadoop csomag felhasználja ennek a szolgáltatásait, mivel egy elosztott rendszerben elég nehéz a konkurens dolgoknak a kezelése. Ha két kliens ugyanazt a HDFS blokkot akarja írni vagy ugyanazt a táblát akarják írni, ezek nem triviális problémák. Ezeket a szervezési koordinációs dolgokat valósítja meg a Zookeeper.
23
ELADÁS 5.
HADOOP CSOMAGOK, SQL FOR HADOOP: HIVE
5.1. ábra. A Hadoop csomagok felépítése forrás:
http://www.colfax-intl.com/nd/clusters/hadoop.aspx
5.2. SQL for Hadoop: Hive A Hadoop most már lassan 10 éves, de kezdetben eléggé üres volt és kevés volt hozzá az eszköz.
Szükség volt valami egyszer¶ felületre amit tömegesen tudnak használni az
emberek. A Facebook hozta létre, kellett egy SQL alapú csomag mert a JAVA MapReduce programozás nem igazán m¶ködött és eléggé nehéz volt. Elkezdték a HIVE fejlesztését ami egy SQL interface a hadoopra (egy plusz réteg de nem tárház).
Maga a Hadoop
platformot azért választották mert olcsó és jól skálázódik.
5.2.1. A Hive felépítése
5.2. ábra. A Hive m¶ködése forrás:
http://blog.octo.com/en/how-to-crunch-your-data-stored-in-hdfs/
Van egy Driver ami a fordításért felel és az SQL kifejezéseket fordítja MapReduce jobokra.
A következ® lépcs® az optimalizálója, ezt ® kitalálja maga, hogy hogyan legyen
hatékonyabb a lekérdezés.
(minden SQL motorban található egy ilyen).
24
Az Executor
ELADÁS 5.
HADOOP CSOMAGOK, SQL FOR HADOOP: HIVE
pedig az aki operatívan ezt a futtatást elvégzi. Kommunikál a MapReduce-szal és elvégzi ennek a futtatását. Ezen kívül meg találhatók a következ® elemek még benne: Command Line Interface (CLI), JDBC, ODBC kapcsolat távoli elérésekhez.
Létezik továbbá egy
MetaStore nev¶ komponens is amelyben olyan bejegyzések vannak, hogy milyen tábláink vannak, milyen a fájlok formátuma, mi a szeparátor karakter, milyen típusú adatok illetve mez®k vannak (ez írja le, hogy lehet a szöveges fájlokat adatbázis táblaként értelmezni). Az esetek túlnyomó többségében ez egy MySQL adatbázis amely általában kis méret¶, egy gépen futó SQL adatbázis. Amikor err®l a számítógépen elindítanak egy lekérést akkor ez végig megy az összes említett komponensen és a végén megkapjuk az eredményeket. Furcsa lehet azonban, hogy adatot feltöltünk és utólag valami sémát deniálunk rá, ez az eddigi tudásunkkal pont ellentétesen m¶ködik. Ezt a folyamatot úgy hívják, hogy schemeon-write. Amikor bekerül az adat akkor ellen®rzöm a sémát (amikor a táblába betöltöm) és onnantól tudom, hogy jó az adat ami bekerül a táblába. A scheme-on-read pedig kiolvasáskor ellen®rzi a sémát, ha rossz az adat azokon a helyeken null-t fogunk visszakapni nem hibát. További különbségek az SQL és a Hive között: nincs olyan hogy UPDATE. Egyszer¶en nincs rá szükség, logokat gy¶jtünk és miért akarnánk frissíteni a tegnapi adatokat (persze ha meg akarjuk hamisítani akkor lehet) de ráadásul az adatok is blokkokban vannak tárolva és egy blokkban egy adatot módosítani nem éri meg. Indexeléssel vannak közepesen kiforrott próbálkozások, de alapvet®en adattáblák végig olvasására találták ki és nem arra, hogy egy felhasználó adataival m¶veleteket végezzünk, majd pedig ezeket az adatokat újra beleírjuk az adatbázisba.
25
6. el®adás Pig programozás
6.1. Bevezetés A Pig egy magas szint¶, az SQL nyelvhez hasonló szkript nyelv, amit az Apache Hadoop használ MapReduce job-ok létrehozására. Lehet®vé teszi komplex adat transzformációk írását Java tudás nélkül. A nyelv eszközkészlete teljes, ezért mindenféle adatmanipulációs m¶veletet tudunk vele végrehajtani. A felhasználó által deniált függvényeken keresztül (User Dened Functions - UDF), pedig bármilyen kódot meghívhatunk különféle nyelvekb®l mint például: JRuby, Jython és Java.
A scriptünket más nyelvekbe is beágyazhatjuk, ennek eredményeképp
komponensként használhatjuk, hogy nagyobb rendszereket készítsünk, és még komplexebb alkalmazásokat, amik releváns problémákra adnak megoldást. A Pig többféle forrásból származó adattal tud dolgozni, beleértve strukturált és strukturálatlan adatokat, az eredményt pedig eltárolhatjuk a HDFS-en.
A Pig szkripteket
MapReduce job-okra fordítják, amik az Apache Hadoop klaszteren futnak. Ebben a fejezetben követve a Hortonworks leírását (Hogyan használjuk az alapvet® pig parancsokat
http://hortonworks.com/hadoop-tutorial/how-to-use-basic-pig-commands/),
át-
nézzük az alapvet® m¶veleteket, és a szkript nyelv használatát. A következ® területeket érintjük:
Egy reláció készítése séma nélkül
Egy új reláció készítése egy létez® relációból
Két reláció illesztése (Join m¶velet)
Adatok rendezése az
Az adatok sz¶rése és csoportosítása a
ORDER BY
használatával
GROUP BY
használatával
A leírás a Hortonworks rendszeren lett elkészítve, de valószín¶ könnyedén átültethet® egyéb rendszerekre is.
6.2. A fájlok feltöltése Els®ként szerezzük be a szükséges adatokat a feladat elvégzéséhez.
Err®l a linkr®l le
tudjátok tölteni a szükséges adatokat, amik a New York Stock Exchange adatait tartal-
https://s3.amazonaws.com/hw-sandbox/tutorial1/ infochimps_dataset_4778_download_16677-csv.zip mazzák 2000 és 2001-es évb®l.
26
ELADÁS 6.
PIG PROGRAMOZÁS
Töltsük fel az adatokat a File Browser segítségével.
6.1. ábra. Adatok feltoltese
6.3. A script megírása A Pig felhasználói felületét a Pig ikonra kattintással tudjuk el®hozni. A baloldali panelen a szkriptjeink listáját láthatjuk, a jobb oldalon pedig a szerkeszt® felületet, a szkriptek írásához és módosításához. Egy speciális funkció az oldal alján található, mégpedig a Pig helper, ami templateket nyújt a Pig kifejezésekhez, függvényekhez, I/O m¶veletekhez, stb. Az oldal alján látható egy státusz felület, amikor futtatjuk a szkriptet itt láthatjuk majd az eredményeket és a log fájlokat. Adjunk egy nevet a szkriptnek és kezdjük el a szkript írását egy reláció létrehozásával.
6.2. ábra. Pig script szerkeszt® felülete
6.3.1. Reláció deniálása Ebben a lépésben beolvassuk az adatokat és létrehozunk egy relációt.
1 2
STOCK_A = LOAD ' nyse / NYSE_daily_prices_A . csv ' using PigStorage ( ','); DESCRIBE STOCK_A ; Az els® sorban deniálunk egy relációt STOCK_A néven és beolvassuk a *.csv fájlunkat, a második sorban pedig kiírjuk a STOCK_A relációt. A "Save" gombbal elmenthetjük
27
ELADÁS 6.
PIG PROGRAMOZÁS
a szkriptet az "Execute" gombbal pedig futtathatjuk. Ez a m¶velet egy, vagy több MapReduce job-ot fog indítani. Egy kis id® múlva a script futni kezd, és az "Execute" gomb egy "Kill" gombra fog változni. Ezzel megállíthatjuk a szkript futását. A Progress bar mutatja, a folyamat el®rehaladását. Ha kék a színe, akkor a job fut. Ha piros a színe, akkor valami probléma, vagy hiba történt. Ha zöldre vált, a szkript helyesen lefutott. Az eredményt megtekinthetjük a zöld dobozban a Progress bar alatt, vagy el is menthetjük. Jelenleg a STOCK_A relációnak nem lesz sémája, mert nem deniáltunk neki sémát mikor betöltöttük az adatot a STOCK_A relációba.
6.3. ábra. A séma kiírása
6.3.2. Reláció deniálása sémával Módosítsuk az el®z® szkriptünknek az els® sorát és adjuk hozzá a "AS" kifejezést, hogy egy sémát deniáljunk az adatokra.
1
2
STOCK_A = LOAD ' NYSE_daily_prices_A . csv ' using PigStorage ( ',') AS ( exchange : chararray , symbol : chararray , date : chararray , open : float , high : float , low : float , close : float , volume : int , adj_close : float ); DESCRIBE STOCK_A ; Mentsük el és futtassuk a szkriptet. A kimeneten látható lesz a séma, amit deniáltunk.
6.4. ábra. A létez® séma kiírása
6.3.3. Reláció létrehozása egy másik relációból Egy létez® relációból tudunk új relációkat létrehozni. Például szeretnénk egy B relációt készíteni, ami a STOCK_A relációnak az els® 100 bejegyzését tartalmazza. Adjuk hozzá a szkriptünkhöz a következ® sort.
28
ELADÁS 6.
1 2
PIG PROGRAMOZÁS
B = LIMIT STOCK_A 100; DESCRIBE B; DESCRIBE B; Mentsük el és futtassuk a szkriptünket. Láthatjuk, hogy a B relációnak ugyanaz a sémája, mint a STOCK_A relációnak, mivel egy részhalmaza a STOCK_A relációnak.
6.3.4. Adatok kiírása Az adatok kiírásához használjuk a
DUMP
parancsot a Pig szkriptünkben.
A szkriptünk
végéhez f¶zzük hozzá a következ® sort. Mentsük el és futtassuk újra a szkriptünket.
1
DUMP B ;
6.3.5. Oszlop kiválasztása a relációból Töröljük az eddig megírt szkriptet, mivel a következ®kben nem lesz rá szükségünk. Az egyik el®nye annak, hogy a Piget használjuk az adatok transzformációja. Új relációkat tudunk létrehozni, a meglév® relációkban lév® mez®k kiválasztásával, a használatával.
FOREACH
parancs
Deniáljunk egy új relációt C néven, ami csak a symbol, adat és close
mez®ket tartalmazza a B relációból. A teljes kód a következ®:
1
2 3
STOCK_A = LOAD ' NYSE_daily_prices_A . csv ' using PigStorage ( ',') AS ( exchange : chararray , symbol : chararray , date : chararray , open : float , high : float , low : float , close : float , volume : int , adj_close : float ); B = LIMIT STOCK_A 100; C = FOREACH B GENERATE symbol , date , close ; DESCRIBE C; Mentsük és futtassuk a szkriptünket, a kimeneten a következ® fogjuk látni.
6.3.6. Adatok írása a HDFS-re Ebben a részben megnézzük, hogy hogyan tudunk írni a HDFS-re a szkriptünkb®l.
A
következ® parancsot írjuk bele a szkriptünkbe:
1
STORE C INTO ' output /C '; Ezúttal is indulni fog egy MapReduce job, így várnunk kell egy kicsit az eredményre. Amikor a Job futása befejez®dik nyissuk meg a File Browsert, és keressük meg az újonnan készített könyvtárunkat, aminek a neve output.
Ebben lesz egy alkönyvtár C néven,
amiben találni fogunk egy part-r-00000 nev¶ fájlt. Ha rákattintunk láthatjuk a tartalmát.
6.3.7. Két reláció illesztése (Join m¶velet) Készítsünk egy új Pig szkriptet Pig-Join néven, majd hozzunk létre egy új relációt DIV_A néven. Ezután illesszük a két relációt A és B a symbol és dátum mez®k alapján, majd
29
ELADÁS 6.
PIG PROGRAMOZÁS
6.5. ábra. Adatok kiírása a HDFS-re
írjuk ki az új reláció sémáját.
1
STOCK_A = LOAD ' NYSE_daily_prices_A . csv ' using PigStorage ( ',') AS ( exchange : chararray , symbol : chararray , date : chararray , open : float , high : float , low : float , close : float , volume : int , adj_close : float );
2 3
DIV_A = LOAD ' NYSE_dividends_A . csv ' using PigStorage ( ',') AS ( exchange : chararray , symbol : chararray , date : chararray , dividend : float );
4 5 6
C = JOIN STOCK_A BY ( symbol , date ) , DIV_A BY ( symbol , date ) ; DESCRIBE C; Ha meggyeljük a C reláció mind a két relációból tartalmazni fogja az összes mez®t. A
DUMP
paranccsal tudjuk kiírni a C reláció tartalmát.
6.3.8. Adatok rendezése Az
ORDER BY
parancs segítségével tudjuk a relációt rendezni egy, vagy több mez®jére.
Készítsünk egy új Pig szkriptet Pig-Sort néven, majd adjuk ki a következ® parancsokat az adatok rendezéséhez.
1
2
DIV_A = LOAD ' NYSE_dividends_A . csv ' using PigStorage ( ',') AS ( exchange : chararray , symbol : chararray , date : chararray , dividend : float ); B = ORDER DIV_A BY symbol , date asc ; DUMP B ;
6.3.9. Adatok sz¶rése és csoportosítása A
GROUP
paranccsal tudjuk egy mez®re csoportosítani az adatokat, a
FILTER
paranccsal
pedig sz¶rhetjük ®ket egy mintára. Készítsünk egy új Pig szkriptet Pig-group néven és írjuk be a következ® parancsokat a szerkeszt® felületen.
30
ELADÁS 6.
1
2 3 4 5
PIG PROGRAMOZÁS
DIV_A = LOAD ' NYSE_dividends_A . csv ' using PigStorage ( ',') AS ( exchange : chararray , symbol : chararray , date : chararray , dividend : float ); B = FILTER DIV_A BY symbol == ' AZZ '; C = GROUP B BY dividend ; DESCRIBE C; DUMP C ;
6.4. További források Az alapvet® parancsok átnézése után, ha szeretnél jobban elmélyedni a Pig szkriptek írásában, akkor remek elfoglaltság a Pig dokumentációjának az átolvasása, amelyet a következ® linken értek el.
http://pig.apache.org/docs/r0.7.0/tutorial.html.
31
7. el®adás Hadoop klaszterek kongurációja és üzemeltetése Egy Hadoop cluster felállítása annyira nem is egyszer¶ feladat, mint ahogy azt gondolnánk. Sok szempontot gyelembe kell vennünk az elosztott rendszerek megtervezésekor.
7.1. Hardver A cluster legfontosabb épít®elemei a számítógépeket (Node-okat) kiszolgáló hardverek. Minden Node-ként egy olyan hardverrel felszerelt gépet kell üzemeltetnünk, amely kell®en sok háttértárral rendelkezik, és a számítási feladatokhoz er®s CPU és jelent®s memóriamennyiség jellemzi. A hardverméretezést egy példán keresztül mutatjuk be.
Tegyük fel, hogy egy cég át
szeretne térni Hadoop cluster-es szerverparkra, amelyen naponta 100 GB új bejöv® adatot szeretnének tárolni. Ezt a mennyiséget egy évre levetítve kívánt adat.
∼36
TB-ra becsülhet® a tárolni
Tudjuk, hogy a Hadoop cluster egy adatot 3 különböz® helyen tárol el a
replikáció miatt, tehát ez a mennyiség
∼100
TB lesz.
Az egyéb dolgok és elemzések
céljából +25%-kal számolhatunk, tehát 125 TB-nál tartunk. A Hadoop rendszer elég sok mindenr®l készít log-ot, valamint emellett minden gépen kell operációs rendszernek futnia, tehát szükség van olyan adatok tárolására is, amik nem a HDFS-en belül találhatóak. Erre +10%-ot számolunk, tehát nagyjából
∼140
TB tárhelyre van szükségünk.
Egy átlagos
szervergépben 4-6 HDD található, ha 3 TB-os egységekben gondolkozunk, akkor ez 12-18 TB. Ha a fentebbi adatmennyiséget nézzük, akkor
∼10
gépes cluster-t kell felállítanunk.
Mivel tipikusan az IO a sz¶k keresztmetszet Hadoop esetén, ezért érdemesebb több kisebb HDD-t berakni, mint kevés extrém nagyot. Hozzá kell tennünk, hogy ez az adatmennyiség nem tömörített, tömörítéssel sokkal kevesebb adatmennyiséggel is lehet számolni. Például egy olyan log esetében, ahol ugyanaz az érték sokszor szerepel, vagy ismétl®dik, ott gzip esetén akár 2-3-szoros tömörítési ráta is elérhet®, ezzel rengeteg helyet tudunk spórolni. Memória tekintetében a master szerepeket birtokló gépek esetében az ajánlott mennyiség 24 GB, a CPU-nak pedig érdemes 8 magost választani (2013-as adatok alapján). Emellett a fontos szerepeket kiszolgáló gépek esetében érdemes egy bizonyos védelmet is beiktatni (pl. RAID). Az egyszer¶ Data Node-ok esetében elég egy 4 magos CPU, és 8-24 GB memória. A Hortonworks által ajánlott hardverméretezésr®l a 7.1. táblázatban találhatunk részletesebb bemutatást.
32
ELADÁS 7.
HADOOP KLASZTEREK KONFIGURÁCIÓJA ÉS ÜZEMELTETÉSE
Machine Type
Workload Pattern/ Cluster Type
Storage
Processor (# of Cores)
Memory (GB)
Slaves Masters
Balanced workload
Four to six 2 TB disks
One Quad
24
HBase cluster
Six 2 TB disks
Dual Quad
48
Balanced and/or HBase cluster
Four to six 2 TB disks
Dual Quad
24
Network 1 GB Ethernet all-to-all
1
7.1. ábra. Hardver méretezés ajánlások 5-50 gépes cluster-ekhez
7.2. Operációs rendszer Az operációs rendszert tekintve az összes Hadoop disztribúció UNIX-os környezetben használható, ezekre lett alapvet®en kialakítva.
Legtöbbet használt Linux disztribúciók
Hadoop cluster-ekhez a RedHat, és a CentOS. Céges környezetben leginkább az el®bbivel találkozhatunk a legtöbbször, míg otthoni felhasználók körében az Ubuntu, és különböz® Debian verziók az elterjedtebbek. Mivel nincs nagy különbség Hadoop szempontjából a különböz® operációs rendszerek között, ezért általános ajánlás, hogy a támogatási id®intervallum és a rendszergazdák tapasztalata alapján érdemes OS-t választani. Létezik emellett egy Windows Server számítógépekre átalakított verzió is, amelyet újszer¶sége és kisebb közösségi támogatása miatt kockázatos lehet használni.
7.2. ábra. Hadoop futtatására alkalmas operációs rendszerek
7.3. Hálózat A jó és alapos hálózati beállítások rendkívül fontosak, mivel ezek szokták okozni a legtöbb problémát és furcsa hibajelenséget telepítés és üzemeltetés közben. Emiatt fontos, hogy minden számítógépnek tudnia kell a saját host nevét, IP címét. DNS és reverseDNS-nek hibátlanul kell m¶ködnie privát címek esetén is, mert léteznek olyan feladatok, ahol a Node-tól a rendszer megkérdezi a host nevét, és erre neki válaszolnia kell tudnia, hiszen egyéb esetben mondjuk nem tud az a feladat lefutni, és rengeteg hibát kaphatunk. A cluster általában bels® hálózaton van, és nincs küls® interfésze, tehát csak a bels® hálón érhet® el. cluster.
Erre a bels® hálóra pl.
VPN-el szokás bejelentkezni, és így használható a
Ennek az a célja, hogy a küls® támadásokat teljesen ki tudjuk sz¶rni, és csak
akkor törhet® fel a rendszer, ha valaki a bels® hálóra be tudott jutni.
1 http://hortonworks.com/
ajánlása 33
ELADÁS 7.
HADOOP KLASZTEREK KONFIGURÁCIÓJA ÉS ÜZEMELTETÉSE
7.4. Disztribúció Mivel a Hadoop-nak sok cég által elkészített változata van, így ezek különböz® egyéb szolgáltatásokkal vannak kiegészítve, és más-más feladatokra vagy akár platformokra vannak optimalizálva. A legelterjedtebb a Cloudera (CDH) által készített verzió, amely egy olyan komplex segédszolgáltatásokat nyújtó programokkal, illetve webes felülettel rendelkezik, amelyekkel nagyon könny¶ kezelni az egyedi Node-okat, és a különböz® keretszolgáltatásokat, amelyek segítik a rendszer összhangban való m¶ködését. Ezen felül vannak az Intel, Amazon, Hortonworks által készített verziók is. Általában minden verzióból létezik zet®s és ingyenes vagy akár nyílt forráskódú verzió is. A zet®sök leginkább internetes támogatás meglétében és pár egyéb extra szolgáltatásban térnek el az ingyenes verzióktól, és általában csak nagyvállalati környezetben használják ®ket az áruk miatt. Érdemes megemlíteni még, hogy természetesen a Hadoop minden része letölthet® az Apache oldaláról is, de azok egyesével történ® telepítése és kongurációja nagyságrendekkel nagyobb szakértelmet és id®t igényelnek, mint egy disztribúció használata.
7.3. ábra. Hadoop disztribúciók
7.5. Feladatok ütemezése és kvóták használata MapReduce feladat esetén a FIFO alapelv¶ feladatütemez® volt a legelterjedtebb, de ez túl egyszer¶ volt, és egy kis slotigény¶ (slot - szabad kapacitás) feladatnak is várnia kellett egy nagy slotigény¶re, ami igen sok várakozást eredményezett a lekérdezést futtató számára. Ennek a kiküszöbölésére találták ki a Fair scheduler-t, amely lehet®séget ad arra, hogy a kisebb job-okat a nagyok közé be tudjuk szúrni, ezáltal csökken a várakozási id®. Léteznek slot pool-ok, amelyek lehet®vé teszik, hogy a slot-ok egy csoportját például a kisebb job-oknak fenntartjuk, így a nagy job-ok a többi slot használatával helyet hagynak a kisebbek számára. Az üzemeltetés egy másik tipikus problémája, ha egy HDFS fájl mérete tévedésb®l hatalmasra n® (például két nagy fájl join-olása miatt), és elfoglalja a helyet minden más adat el®l. Ez ellen kvóták bevezetésével lehet védekezni.
34
8. el®adás Hadoop 2.0 - YARN, Tez, Spark
8.1. Hadoop 2.0 8.1.1. Kialakulása A Hadoop 2.0-ban rengeteg új lehet®séget és szolgáltatást vezettek be, amelyek sokkal könnyebbé teszik az elosztott rendszerek használatát, azok kezelhet®ségét és feladatok futtatását. A rendszer újragondolásánál az volt az els®dleges cél, hogy ne csak MapReduce job-okat lehessen indítani, hanem más típusú szolgáltatások és applikációk is gond nélkül fussanak. Emellett a régi rendszerben nem volt lehet®ség különböz® csoportoknak különböz® adathalmazokon dolgozni, elkülönített elemzéseket futtatni. Eddig a HDFS-en erre csak a különböz® jogosultságokkal való tárolás volt a lehetséges mód.
8.1.2. Különbségek A Hadoop 1.0-ban JobTracker-nél volt minden szerep, ® felügyelte az er®forrásokat, osztotta ki a feladatokat a DataNode-ok számára MapReduce job-oknál, valamint monitorozta a feladatok futását.
Felismerték, hogy az er®forrás menedzsmentet és az alkalmazások
kezelését külön kell választani. Emiatt az új YARN architektúra került kialakításra (lásd 8.1. ábra), ami gyakorlatilag egy új generációs MapReduce-t jelent. Az er®források kiosztása és a feladatok ütemezése leválasztódik a JobTracker-r®l, és a JobTracker szerep feldarabolódik. Így kerültek bevezetésre a ResourceManager az ApplicationMaster szerepek. A régi TaskTracker-ek helyett NodeManager-ek vannak az egyes node-okon, amelyek feladata a programkód futtatása és a MapReduce kommunikációk felügyelete. A ResourceManager a job futtatása során kiválasztja, hogy melyik node-on helyezi el a feladat futtatásáért felel®s ApplicationMaster szerepet, amelyb®l minden feladathoz külön példány kerül létrehozásra. Az ApplicationMaster a ResourceManager-t®l kapott információ alapján kell® mennyiség¶ ResourceContainer-t fog igényelni, amely egy er®forrás foglalás a Map és Reduce taskok számára. A ResourceContainer-ek kommunikálnak az ApplicationMasterrel, és jelentik, hogy hol tartanak a feladat futtatásával.
Az ApplicationMaster a job
futása után megsz¶nik. A korábbi gyakorlattal ellentétben az er®forrás foglaláskor a NodeManager-ek - a régi TaskTracker-ek slot és progress információi helyett - szabad memória és CPU állapotot küldenek vissza.
Az alkalmazás futásáért az ApplicationMaster a felel®s, tehát a
ResourceManager-nek nem kell a progress-t jelenteni.
35
ELADÁS 8.
HADOOP 2.0 - YARN, TEZ, SPARK
Mivel a MapReduce job-ok mellett már alkalmazások is lehetnek, így az ApplicationMaster visszajelez, hogy melyik módban indult el a beküldött feladat.
Például az HBase
szolgáltatás is a ResourceManager-en keresztül tudja elérni a cluster-t, így a különböz® folyamatok nem veszik el egymás el®l az er®forrásokat.
8.1. ábra. Az új YARN réteg forrás:
http://hortonworks.com/hadoop/yarn/
8.1.3. YARN job futtatás Az új YARN rendszerben jelent®sen megváltozott a rendszerben indítható feladatok futtatása. A 8.2. ábrán bemutatott módon futtatja a rendszer a beküldött feladatokat: 1. A kliens beküldi a feladatot a rendszernek az alkalmazás specikus ApplicationMaster indításához szükséges információkkal együtt. 2. A ResourceManager eldönti, hogy melyik node-ra delegálja az ApplicationMaster elindításával és futtatásával járó feladatkört. 3. Az ApplicationManager a létrejötte után bejelentkezik a ResourceManager-nél, aki lehet®vé teszi a kliens számára a közvetlen kommunikációt az ApplicationMaster-rel. 4. A futás során az ApplicationMaster a resource-request protokoll segítségével megfelel® ResourceContainer-eket foglal az alkalmazás futásához. 5. A sikeres er®forrás foglalás után az ApplicationManager a NodeManager számára elküldi a megfelel® információkat és adatokat, így elindul a Container futtatása.
Ezek az információk tartalmazzák azt is, hogy a Container miként tud az
ApplicationManager-rel kommunikálni. 6. A Container elkezdni futtatni a programkódot, miközben az applikáció specikus protokoll segítségével folyamatosan jelenti a feladat el®rehaladtát, állapotát. 7. Szintén az applikáció specikus protokoll segítségével a feladatot beküld® kliens az ApplicationMaster-rel kommunikálva kérdezi le a feladat státuszát, illetve az azzal kapcsolatos egyéb információkat. 8. Amikor a feladat elkészült és minden szükséges háttérfolyamat lefutott, az ApplicationMaster törli magát a ResourceManager-nél, és leáll. Ekkor a Container újra felhasználható lesz.
36
ELADÁS 8.
HADOOP 2.0 - YARN, TEZ, SPARK
8.2. ábra. YARN job futtatás forrás:
http://hortonworks.com/hadoop/yarn/
8.2. Tez A Tez projekt létrehozásának célja az volt, hogy az új YARN rendszerhez egy forradalmasított, gyorsabb, egyszer¶bb MapReduce megoldást nyújtsanak, amely csökkenti a rendszer er®forrás használatát. A korábbi MapReduce-ok m¶ködési elve az volt, hogy minden Map és Reduce fázis között a köztes adatok a HDFS-re íródtak ki (replikációval együtt), így jelent®s I/O kapacitást használtak el (különösen több MapReduce job egyszerre történ® futtatásával). Ezt a Tezben azzal küszöbölték ki, hogy nem feltétlen kell külön MapReduce job-okat létrehozni az adatok feldolgozása során, hanem lehet ezeket batch-esíteni (Map
→ Reduce → Reduce),
így nem kell mindig közöttük HDFS-t használni, tehát spórolhatunk az I/O kapacitással. Ezen kialakítás els®dleges célja volt, hogy a Hive és a Pig-beli feladatokat egyetlen MapReduce job-bal valósítsunk meg. A rendszer a feladatokból itt is egy DAG-ot (körmentes gráf ) alakít ki, és készíti el a futtatás sorrendjét. Eltérés, hogy a Tez-ben a MapReduce-al ellentétben nem kell ezt megadni, hogy hány Reduce feladatunk lesz, ezt a rendszer futás közben kioptimalizálja.
8.2.1. Eloszott rendezési példa Tez-ben A 8.4.
ábrán látható egy elosztott rendezési példa, amely módszerrel akár 10-szeresen
gyorsabb adatfeldolgozás érhet® el. A korábbi egy Reduce job-tól eltér®en, itt 3 részb®l áll a folyamat. Az el®feldolgozó szakasz mintákat küld a Sampler-nek, amely az adatokat egyenletesen elosztja rendezett tartományokra osztja. Ezek a tartományok a partícionáló és aggregáló fázisba kerülnek, ahol a megfelel® feldolgozók a saját tartományukat olvassák be, és elvégzik az adatok összef¶zését. Ez az adatfolyam egyetlen Tez feladat, amely elvégzi az
37
ELADÁS 8.
HADOOP 2.0 - YARN, TEZ, SPARK
8.3. ábra. Tez és a YARN kapcsolata forrás:
http://hortonworks.com/hadoop/tez/
egész számítást.
8.4. ábra. Tez elosztott rendezés forrás:
http://hortonworks.com/hadoop/tez/
8.3. Impala Az Impala egy Hive alternatívaként jött létre annak érdekében, hogy egy egyszer¶ lekérdezésre egy kisebb táblán ne kelljen 5-10 sec-et várni. A megvalósítás tisztán C és C++ alapú, és az adatokat is cache-eli, ezért jóval gyorsabb, mint a Hive. A cache-elés lényege ebben az esetben, hogy a már korábban kiszámolt vagy futtatott lekérdezések eredményét memóriában tárolja, így azokat nem kell minden esetben újra a HDFS-r®l olvasni, ha gyakran használjuk.
38
ELADÁS 8.
HADOOP 2.0 - YARN, TEZ, SPARK
8.4. Spark A Spark valójában egy memória alapú MapReduce keretrendszer, amelyet szintén a YARN-hoz fejlesztettek ki.
Ez leginkább iteratív számításokhoz ideális (pl.
logisztikus
regresszió, neurális hálók). Bizonyos adatokról kódszinten meg lehet mondani, hogy legyen cache-elve, tehát akár az adathalmaz tárolható memóriában a futás idején, és csak a végeredményt írjuk HDFS-re. A Shark a Spark rendszerhez nagyon hasonlóan egy memória alapú megoldás, amelyet a Hive futtatásához terveztek.
8.5. Storm A Storm egy valósidej¶, nem batchelt feldolgozó rendszer, amelyben topológiába rendszerezhetünk forrásokat és feldolgozó egységeket, amely folyam végén kerül csak az adat elmentésre a háttértárra vagy adatbázisba (lásd 8.5.
ábra).
Ez a leggyakrabban olyan
felhasználási módoknál jelent hatalmas el®nyt, ahol azonnali jelentéseket vagy valós idej¶ analízist kell készíteni. A feldolgozó egységek lényege, hogy azonnali számolást végeznek, adattárolást egyáltalán nem.
Felhasználási példák közé tartozik gyakori szavak, kulcs-
szavak keresése sok bejöv® rövid üzenetben, amelyet azonnal kivezethetünk egy valós megjelenít® felületre. Ezek mellett természetesen a tárolt adatokon kés®bb Tez-t vagy MapReduce-t is futtathatunk a korábbi gyakorlatnak megfelel®en.
8.5. ábra. Storm topológia forrás:
https://storm.incubator.apache.org/
39
9. el®adás Hadoop & NoSQL: HBase
9.1. Adatbázis-kezel® rendszerek Az adatbázis-kezel® rendszereknek két nagy csoportját különböztetjük meg, amelyek összehasonlítása a 9.1. táblázatban látható.
Célja Formája M¶veletek
OLTP
OLTP
(On-line Transaction Processing)
(On-line Analytical Processing)
tranzakciókezelés
adatelemzés
adatbázis
adattárház
SELECT, UPDATE, DELETE, INSERT
gyors,
Tervezés
kis m¶veletek:
normalizált
forma
(1NF,
2NF,
3NF, BCNF)
komplex
SELECT típusú lekérdezé-
sek nem
normalizált,
csillag
séma:
egy f® tábla és több hozzá f¶z®d® kapcsolótábla
Technológiák
standard SQL adatbázisok
HBase, Accumulo, Cassandra
9.1. táblázat. OLTP vs. OLAP
Általában megpróbáljuk a kétféle adatbázis-kezel® rendszert elszeparálni akkor is, ha ugyanazzal az eszközökkel valósítottuk meg ®ket. Ennek legf®bb szerepe az er®források kiosztásánál van, hiszen egy komolyabb elemzési m¶velettel nem szeretnénk a tranzakciós adatbázist lebénítani.
9.1.1. Elosztott adatbázis-kezel® rendszerek Az adatbázis kezel® rendszerek egygépes környezetben viszonylag egyszer¶en elkészíthet®ek, de elosztott esetben számos problémába ütközhetünk, amelyhez kapcsolódik a következ® tétel:
CAP-tétel:
egy-egy konkrét elosztott rendszer az alábbi három alapvet® képesség közül
legfeljebb kett®t tud megvalósítani:
Consistency Availibility
(konzisztencia): mindig ugyanazt olvassuk ki, mint amit beírtunk
(rendelkezésre állás): mindig kapunk választ a kérdésre
Parition tolerance
(particionálás-t¶rés): mennyire viseli el a rendszer, ha valamilyen
okból kifolyólag szétesik több különálló részre
40
ELADÁS 9.
A 9.1.
HADOOP & NOSQL: HBASE
ábrán látható a különböz® adatbázis-kezel® rendszereknek az összefoglalása a
CAP-tétel alapján, hogy melyik két képességgel rendelkezik.
9.1. ábra. CAP-tétel forrás:
http://robertgreiner.com/2014/06/cap-theorem-explained/
9.2. NoSQL adatbázis-kezel® rendszerek A NoSQL adatbázisok nem relációs adatmodellt használnak, hanem különféle megoldások léteznek:
dokumentum alapú :
MongoDB
Ezek az adatbázis-kezel® rendszerek legf®képp az aggregátum számolásra,
BY-jelleg¶
m¶veletekre vannak specializálva.
vagy nem hatékonyan tudnak elvégezni.
gráf alapú :
Neo4j
oszlop alapú :
HBase, Cassandra
kulcs-érték alapú :
DynamoDB, Riak
41
GROUP
Más m¶veletet vagy nem is tudnak,
ELADÁS 9.
HADOOP & NOSQL: HBASE
9.3. HBase A Google 2006-ban publikálta a BigTable nev¶ oszlop alapú adatbázis-kezel® rendszerét. Célja, hogy a Google által használt HDFS-hez hasonló fájlrendszeren skálázhatóan tudjanak adatokat tárolni.
Ennek mintájára készült az HBase, amelynek deníciója a
következ®:
sparse :
ritka, ugyanis az oszlopok értékét nem kötelez® mindig megadni és nem tárol
különféle NULL értékeket, mint az SQL alapú adatbázisok
consistant :
konzisztens
distributed :
elosztott, mivel az adatok HDFS-en elhelyezett fájlokban találhatóak
multidimensional :
többdimenziós, azaz egy kulcshoz több oszlop (column) is tartoz-
hat és ezek oszlopcsaládokba (columnfamily) vannak rendezve, tehát (columnfamily: column) alapján lehet értéket lekérni. Ezenfelül tárol még timestampet is, vagyis az értékek verziózva vannak és mindig a legfrissebb kerül visszaadásra. (A konzisztenciának feltétele, hogy mindig legyen timestamp a beszúrt értékek mellett.)
sorted :
rendezett, tehát nem csak kulcs alapján kereshet®, hanem tartományt is
egyszer¶en le lehet kérni
map :
kulcs-érték párok tárolása
A következ® m¶veleteket lehet elvégezni az HBase-ben:
put(kulcs: get(kulcs):
{'cf:c':
érték, 'cf:c': érték, ...}):
egy elem beszúrása
egy kulcshoz tartozó értékek lekérése
scan(kezd®érték, végérték):
egy tartomány lekérése, illetve használható prex
keresésre
delete(kulcs):
egy elem törlése
A használatához vegyünk egy webkereséses példát, aminek a célja, hogy a különféle URLekhez adatokat mentsünk le. HBase-ben a kulcsot rowkey-nek szokták nevezni, amely jelen esetben az URL lesz. Mivel nagyon sok URL-hez kell adatot tárolni, ezért mindenképpen szükség van valamilyen elosztott infrastruktúrára. Az értékek pedig a MIME-type, nyelv, tartalmazott szöveg, stb. lesz, amely a oszlopok szabad felhasználhatóságából adódóan bármikor b®víthet® és szabadon kihagyható. Mivel az HBase-nek nagyon kicsi a késleltetése, ezért a különféle modulok (crawlerek) nagyon gyorsan tudnak lekérdezni. Minden változtatás atomi, tehát nem fogják blokkolni egymást és nem okoz problémát az sem, ha két különböz® modul ugyanazt a rowkey-t akarja felülírni. Így kaptunk egy nagyon jól skálázódó, kis késleltetés¶ adatbázist, amelyet (cserébe) csak URL alapján tudunk elérni, de ehhez a felhasználáshoz ez pont elég. Egy columnfamily kerül egy HDFS fájlba, tehát ha egy rowkey-hez több columnfamily tartozik, akkor több fájlból kell az értékeket kiolvasni. Oszlop alapú adattömörítést használ az HBase, de használható a rendszer key-value adatbázis felfogásban, amelyb®l látszik, hogy nincs éles határ a két típusú adatbázis-kezel® rendszer között.
42
ELADÁS 9.
HADOOP & NOSQL: HBASE
9.3.1. M¶ködése HDFS fölött fut, de nem MapReduce alapokon, hiszen tudjuk, hogy a MapReduce-nak másodperces nagyságrend¶ késleltetése van, míg a NoSQL rendszerek
ms-os
válaszid®vel
dolgoznak. Felépítésében hasonlít a HDFS és a MapReduce struktúrára, hiszen MasterSlave architektúrára épül ahogyan azt a 9.2. ábrán látható.
9.2. ábra. HBase felépítése forrás:
http://blog.xebia.fr/2009/11/18/devoxx-jour-1-nosql-avec-hbase/
Van egy HBase master és vannak különböz® régiószerverek. Ezek külön szolgáltatásként futnak a Hadoop clusteren, tehát teljesen függetlenek a MapReduce keretrendszert®l. Mivel az adathalmazunk rendezett, ezért a régiószerverek a kulcs szerint vannak felosztva, tehát minden régiószervernek van egy kulcstartománya (start_key kezel.
stop_key ),
amit az
Ha a kliens le akar kérni egy értéket kulcs szerint, akkor el®ször a master meg-
mondja, hogy melyik régiószerverhez tartozik és utána attól a régiószervert®l fogja elkérni az adott elemet és beírás is hasonlóan m¶ködik. Ahhoz, hogy a rendszer valóban
ms-os
nagyságrendben futhasson, a kliensek cash-elik, hogy a régiók hogyan oszlanak el. Ha ez a felosztás valami miatt megváltozik, akkor az adott régiószerver válaszol, hogy ez a kulcs nem hozzá tartozik és akkor fordul újra a masterhez a kliens. A master elég kicsi szerepet kap a rendszerben, mivel csak a kezdeti kéréseket kell kiszolgálnia, illetve az újraszervezési feladatokat kell neki elvégeznie. A nagy terhelés a régiószerverekre jut. A felépítésb®l már jól látszik, hogy a rendszer miért nem áll mindig rendelkezésre. Ha egy régiószerver kiesik, akkor az ® tartománya aktuálisan kiszolgálatlan lesz addig, amíg a master ezt észre nem veszi (nem érkezik heartbeat).
Ezután kiosztja más régiószer-
vereknek ezt a tartományt, aki meg fogja találni a HDFS-en az ehhez a régiószerverhez tartozó fájlokat, de így vannak olyan átmeneti id®szakok, amely alatt bizonyos rekordok nem elérhet®ek. A particionálás-t¶rés viszont teljesülni fog, hiszen ha a cluster szétesik több részre akkor is adott régiószerverek a hozzájuk tartozó kéréseket teljesíteni tudják. Így a rendszer továbbra is m¶ködni fog, természetesen a HDFS replikációt nem biztos, hogy végre tudja hajtani amíg a teljes cluster újra össze nem kapcsolódik. Ahhoz hogy a kérések minnél el®bb ki legyenek szolgálva a régiószerverek folyamatosan nyitva tartják a HDFS fájlokat, amik hozzájuk tartoznak és általában replikákat is igyekszik a rendszer úgy elhelyezni, hogy legyen a régiószerven egy lokális replika a hozzá tartozó blokkokból. A régiószerver ezekbe a fájlokba nagyon gyorsan bele tud indexelni és visszaadni a megfelel® értéket. Ezen felül az adatok nagy részét memóriában tartják, tehát ezért fontos a sok memória az HBase-t kiszolgáló zikai gépekbe.
43
(Vessük össze
ELADÁS 9.
HADOOP & NOSQL: HBASE
a 7.1. ábrán található Slave-hez tartozó Balanced workload és HBase cluster értékeket.)
Az adatok beírásának folyamata Az HBase írási folyamata a 9.3. ábrán látható szekvenciadiagram szerint történik.
9.3. ábra. HBase változtatás írási folyamata forrás:
http://blog.cloudera.com/blog/2012/06/hbase-write-path/
Inicializáláskor a kliens megkérdezi a master-t, hogy melyik régiószerverhez kell fordulnia, ha ez még ezel®tt nem történt meg. Egy kulcsot és a hozzá tartozó értéke(ke)t szeretné a kliens beszúrni vagy törölni (Ê). A régiószerver el®ször a változást beírja a commit log-ba (Ë), ami a lokális diszken tárolódik, hogy adatvesztésünk ne legyen. Ezután kerül be a memstore-ba, ahol a régiószerver tárolja a változásokat és követi azok verzióit (Ì). Amikor ez a memstore kezd betelni, akkor a commit log-nak egy rendezett formáját HDFS-re írja (Î). A commit log és a memstore írása nagyon gyors és utána visszatér a parancs (Í), míg a HDFS-re írás már aszinkron módon hajtódik végre. Így esetleges hiba esetén, ha visszatér a régiószerver, akkor a commit log-ból újra fel tudja építeni a hiányzó módosításokat.
Ebb®l következik, ha régiószerver többet nem tér vissza, akkor azok a
változások elvesznek, amik HDFS-re nem kerültek át.
9.3.2. HBase adatszervezési példa A legf®bb kérdés: Hogyan válasszuk meg a kulcsokat? Ehhez el®ször is meg kell vizsgálnunk, hogy milyen lekérdezéseket szeretnénk futtatni.
Egy példán keresztül vizsgáljuk
meg, hogy milyen szempontokra érdemes odagyelni. Tegyük fel, hogy id®járási adatokat gy¶jtünk.
Vannak id®járás állomások, amelyekhez
timestamp-et és mérési adatokat szeretnénk letárolni. A lekérdezéseinket arra szeretnénk optimalizálni, hogy a leggyorsabban kapjuk vissza az adott id®járás állomáshoz tartozó legfrissebb mért adatokat.
Ebben az esetben triviálisnak t¶nik, hogy válasszuk az
állomás+timestamp-et kulcsnak. De ez ebben az esetben nem a legoptimálisabb megoldást adja, ugyanis a keresésnél mindig végig kell nézni az összes adott állomáshoz tartozó adatot amíg elérjük az utolsót. Erre azt a megoldást szokták alkalmazni, hogy egy távoli jöv®beli id®pontból kivonják az aktuális timestamp-et, ezzel megfordítva annak növekv® voltát. Ha így futtatjuk le a keresést, akkor els® elemként az állomáshoz tartozó legfrissebb adatokat kapjuk meg.
44
A. függelék MapReduce Patterns, Algorithms, and Use Cases Source:
http://highlyscalable.wordpress.com/2012/02/01/mapreduce-patterns/
In this article I digested a number of MapReduce patterns and algorithms to give a systematic view of the dierent techniques that can be found on the web or scientic articles. Several practical case studies are also provided. All descriptions and code snippets use the standard Hadoop's MapReduce model with Mappers, Reduces, Combiners, Partitioners, and sorting. This framework is depicted in the gure below.
A.1. ábra. MapReduce Framework
45
FÜGGELÉK A.
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
A.1. Basic MapReduce Patterns A.1.1. Counting and Summing
Problem Statement: of terms.
There is a number of documents where each document is a set
It is required to calculate a total number of occurrences of each term in all
documents. Alternatively, it can be an arbitrary function of the terms. For instance, there is a log le where each record contains a response time and it is required to calculate an average response time.
Solution:
Let start with something really simple. The code snippet below shows Mapper
that simply emit 1 for each term it processes and Reducer that goes through the lists of ones and sum them up:
1 2 3 4
class Mapper method Map ( docid id , doc d ) for all term t in doc d do Emit ( term t , count 1)
5 6 7 8 9 10 11
class Reducer method Reduce ( term t , counts [ c1 , c2 ,...]) sum = 0 for all count c in [c1 , c2 ,...] do sum = sum + c Emit ( term t , count sum ) The obvious disadvantage of this approach is a high amount of dummy counters emitted by the Mapper. The Mapper can decrease a number of counters via summing counters for each document:
1 2 3 4 5 6 7
class Mapper method Map ( docid id , doc d ) H = new AssociativeArray for all term t in doc d do H{ t} = H{t } + 1 for all term t in H do Emit ( term t , count H {t }) In order to accumulate counters not only for one document, but for all documents processed by one Mapper node, it is possible to leverage Combiners:
1 2 3 4
class Mapper method Map ( docid id , doc d ) for all term t in doc d do Emit ( term t , count 1)
5 6 7 8 9
class Combiner method Combine ( term t , [c1 , c2 ,...]) sum = 0 for all count c in [c1 , c2 ,...] do
46
FÜGGELÉK A.
10 11
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
sum = sum + c Emit ( term t , count sum )
12 13 14 15 16 17 18
class Reducer method Reduce ( term t , counts [ c1 , c2 ,...]) sum = 0 for all count c in [c1 , c2 ,...] do sum = sum + c Emit ( term t , count sum )
Applications:
Log Analysis, Data Querying
A.1.2. Collating
Problem Statement:
There is a set of items and some function of one item.
It is
required to save all items that have the same value of function into one le or perform some other computation that requires all such items to be processed as a group.
The
most typical example is building of inverted indexes.
Solution:
The solution is straightforward. Mapper computes a given function for each
item and emits value of the function as a key and item itself as a value. Reducer obtains all items grouped by function value and process or save them. In case of inverted indexes, items are terms (words) and function is a document ID where the term was found.
Applications:
Inverted Indexes, ETL
A.1.3. Filtering (Grepping), Parsing, and Validation
Problem Statement:
There is a set of records and it is required to collect all records
that meet some condition or transform each record (independently from other records) into another representation. The later case includes such tasks as text parsing and value extraction, conversion from one format to another.
Solution:
Solution is absolutely straightforward Mapper takes records one by one and
emits accepted items or their transformed versions.
Applications:
Log Analysis, Data Querying, ETL, Data Validation
A.1.4. Distributed Task Execution
Problem Statement:
There is a large computational problem that can be divided into
multiple parts and results from all parts can be combined together to obtain a nal result.
Solution:
Problem description is split in a set of specications and specications are
stored as input data for Mappers. Each Mapper takes a specication, performs corresponding computations and emits results.
Reducer combines all emitted parts into the
nal result.
Case Study: Simulation of a Digital Communication System There is a software simulator of a digital communication system like WiMAX that passes some volume of random data through the system model and computes error probability of throughput. Each Mapper runs simulation for specied amount of data which is 1/Nth of the required sampling and emit error rate. Reducer computes average error rate.
47
FÜGGELÉK A.
Applications:
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
Physical and Engineering Simulations, Numerical Analysis, Performance
Testing
A.1.5. Sorting
Problem Statement:
There is a set of records and it is required to sort these records
by some rule or process these records in a certain order.
Solution:
Simple sorting is absolutely straightforward Mappers just emit all items as
values associated with the sorting keys that are assembled as function of items. Nevertheless, in practice sorting is often used in a quite tricky way, that's why it is said to be a heart of MapReduce (and Hadoop). In particular, it is very common to use composite keys to achieve secondary sorting and grouping. Sorting in MapReduce is originally intended for sorting of the emitted key-value pairs by key, but there exist techniques that leverage Hadoop implementation specics to achieve sorting by values. See this blog for more details. It is worth noting that if MapReduce is used for sorting of the original (not intermediate) data, it is often a good idea to continuously maintain data in sorted state using BigTable concepts. In other words, it can be more ecient to sort data once during insertion than sort them for each MapReduce query.
Applications:
ETL, Data Analysis
A.2. Not-So-Basic MapReduce Patterns A.2.1. Iterative Message Passing (Graph Processing)
Problem Statement:
There is a network of entities and relationships between them. It
is required to calculate a state of each entity on the basis of properties of the other entities in its neighborhood. This state can represent a distance to other nodes, indication that there is a neighbor with the certain properties, characteristic of neighborhood density and so on.
Solution:
A network is stored as a set of nodes and each node contains a list of adjacent
node IDs.
Conceptually, MapReduce jobs are performed in iterative way and at each
iteration each node sends messages to its neighbors. Each neighbor updates its state on the basis of the received messages. Iterations are terminated by some condition like xed maximal number of iterations (say, network diameter) or negligible changes in states between two consecutive iterations.
From the technical point of view, Mapper emits
messages for each node using ID of the adjacent node as a key. As result, all messages are grouped by the incoming node and reducer is able to recompute state and rewrite node with the new state. This algorithm is shown in the gure below:
1 2 3 4 5
class Mapper method Map ( id Emit ( id n , for all id Emit ( id
n , object N) object N) m in N . OutgoingRelations do m , message getMessage (N) )
6 7 8 9
class Reducer method Reduce ( id m , [s1 , s2 ,...]) M = null
48
FÜGGELÉK A.
10 11 12 13 14 15 16 17
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
messages = [] for all s in [ s1 , s2 ,...] do if IsObject (s ) then M = s else // s is a message messages . add (s) M. State = calculateState ( messages ) Emit ( id m , item M) It should be emphasized that state of one node rapidly propagates across all the network of network is not too sparse because all nodes that were infected by this state start to infect all their neighbors. This process is illustrated in the gure below:
Case Study: Availability Propagation Through The Tree of Categories Problem Statement: This problem is inspired by real life eCommerce task. There
is
a tree of categories that branches out from large categories (like Men, Women, Kids) to smaller ones (like Men Jeans or Women Dresses), and eventually to small end-ofline categories (like Men Blue Jeans). End-of-line category is either available (contains products) or not. Some high level category is available if there is at least one available end-of-line category in its subtree. The goal is to calculate availabilities for all categories if availabilities of end-of-line categories are know.
Solution:
This problem can be solved using the framework that was described in the
previous section. We dene getMessage and calculateState methods as follows:
1 2
class N State in { True = 2, False = 1, null = 0} , 49
FÜGGELÉK A.
3
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
initialized 1 or 2 for end -of - line categories , 0 otherwise
4 5 6
method getMessage ( object N) return N. State
7 8 9
method calculateState ( state s , data [d1 , d2 ,...]) return max ( [ d1 , d2 ,...] )
Case Study: Breadth-First Search Problem Statement: There is a graph and it is required to calculate distance (a number of hops) from one source node to all other nodes in the graph.
Solution:
Source node emits 0 to all its neighbors and these neighbors propagate this
counter incrementing it by 1 during each hope:
1 2 3
class N State is distance , initialized 0 for source node , INFINITY for all other nodes
4 5 6
method getMessage ( N) return N. State + 1
7 8 9
method calculateState ( state s , data [d1 , d2 ,...]) min ( [d1 , d2 ,...] )
Case Study: PageRank and Mapper-Side Data Aggregation This algorithm was suggested by Google to calculate relevance of a web page as a function of authoritativeness (PageRank) of pages that have links to this page. The real algorithm is quite complex, but in its core it is just a propagation of weights between nodes where each node calculates its weight as a mean of the incoming weights:
1 2
class N State is PageRank
3 4 5
method getMessage ( object N) return N. State / N. OutgoingRelations . size ()
6 7 8
method calculateState ( state s , data [d1 , d2 ,...]) return ( sum ([ d1 , d2 ,...]) ) It is worth mentioning that the schema we use is too generic and doesn't take advantage of the fact that state is a numerical value.
In most of practical cases, we can perform
aggregation of values on the Mapper side due to virtue of this fact. This optimization is illustrated in the code snippet below (for the PageRank algorithm):
1
class Mapper
50
FÜGGELÉK A.
2 3 4 5 6 7 8 9 10 11
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
method Initialize H = new AssociativeArray method Map ( id n , object N) p = N. PageRank / N. OutgoingRelations . size () Emit ( id n , object N) for all id m in N . OutgoingRelations do H{ m} = H{m } + p method Close for all id n in H do Emit ( id n , value H{n })
12 13 14 15 16 17 18 19 20 21 22 23
class Reducer method Reduce ( id m , [s1 , s2 ,...]) M = null p = 0 for all s in [ s1 , s2 ,...] do if IsObject (s ) then M = s else p = p + s M. PageRank = p Emit ( id m , item M )
Applications:
Graph Analysis, Web Indexing
A.2.2. Distinct Values (Unique Items Counting)
Problem Statement:
There is a set of records that contain elds F and G. Count the
total number of unique values of led F for each subset of records that have the same G (grouped by G). The problem can be a little bit generalized and formulated in terms of faceted search:
Problem Statement:
There is a set of records. Each record has eld F and arbitrary
number of category labels
G = {G1 , G2 , . . . }.
Count the total number of unique values of
led F for each subset of records for each value of any label. Example:
1 2 3 4
Record Record Record Record
1: 2: 3: 4:
F =1 , F =2 , F =1 , F =3 ,
G ={a , b} G ={a , d , e} G ={ b} G ={a , b}
5 6 7 8 9 10
Result : a -> 3 b -> 2 d -> 1 e -> 1
// // // //
F =1 , F =2 , F =3 F =1 , F =3 F =2 F =2
51
FÜGGELÉK A.
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
Solution I: The rst approach is to solve the problem in two stages.
At the rst stage Mapper
emits dummy counters for each pair of F and G; Reducer calculates a total number of occurrences for each such pair. The main goal of this phase is to guarantee uniqueness of F values. At the second phase pairs are grouped by G and the total number of items in each group is calculated. Phase I:
1 2 3 4
class Mapper method Map ( null , record [ value f , categories [g1 , g2 ,...]]) for all category g in [g1 , g2 ,...] Emit ( record [g , f], count 1)
5 6 7 8
class Reducer method Reduce ( record [g , f ], counts [n1 , n2 , ...]) Emit ( record [g , f], null ) Phase II:
1 2 3
class Mapper method Map ( record [f , g ], null ) Emit ( value g , count 1)
4 5 6 7
class Reducer method Reduce ( value g , counts [n1 , n2 ,...]) Emit ( value g , sum ( [ n1 , n2 ,...] ) )
A.2.3. Solution II: The second solution requires only one MapReduce job, but it is not really scalable and its applicability is limited. The algorithm is simple Mapper emits values and categories, Reducer excludes duplicates from the list of categories for each value and increment counters for each category. The nal step is to sum all counter emitted by Reducer. This approach is applicable if th number of record with the same f value is not very high and total number of categories is also limited. For instance, this approach is applicable for processing of web logs and classication of users total number of users is high, but number of events for one user is limited, as well as a number of categories to classify by. It worth noting that Combiners can be used in this schema to exclude duplicates from category lists before data will be transmitted to Reducer.
1 2 3 4
class Mapper method Map ( null , record [ value f , categories [g1 , g2 ,...] ) for all category g in [g1 , g2 ,...] Emit ( value f , category g)
5 6 7
class Reducer method Initialize
52
FÜGGELÉK A.
8 9 10 11 12 13 14 15
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
H = new AssociativeArray : category -> count method Reduce ( value f , categories [g1 , g2 ,...]) [g1 , g2 ,..] = ExcludeDuplicates ( [g1 , g2 ,..] ) for all category g in [g1 , g2 ,...] H{ g} = H{g } + 1 method Close for all category g in H do Emit ( category g , count H{g })
Applications:
Log Analysis, Unique Users Counting
A.2.4. Cross-Correlation
Problem Statement:
There is a set of tuples of items. For each possible pair of items
calculate a number of tuples where these items co-occur. If the total number of items is N then N*N values should be reported. This problem appears in text analysis (say, items are words and tuples are sentences), market analysis (customers who buy this tend to also buy that). If N*N is quite small and such a matrix can t in the memory of a single machine, then implementation is straightforward.
Pairs Approach:
The rst approach is to emit all pairs and dummy counters from
Mappers and sum these counters on Reducer. The shortcomings are:
1 2 3 4 5
The benet from combiners is limited, as it is likely that all pair are distinct
There is no in-memory accumulations
class Mapper method Map ( null , items [i1 , i2 ,...] ) for all item i in [i1 , i2 ,...] for all item j in [i1 , i2 ,...] Emit ( pair [i j], count 1)
6 7 8 9 10
class Reducer method Reduce ( pair [i j ], counts [c1 , c2 ,...]) s = sum ([ c1 , c2 ,...]) Emit ( pair [i j], count s )
Stripes Approach:
The second approach is to group data by the rst item in pair
and maintain an associative array (stripe) where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.
Generates fewer intermediate keys. Hence the framework has less sorting to do.
Greately benets from combiners.
Performs in-memory accumulation. This can lead to problems, if not properly implemented.
More complex implementation.
53
FÜGGELÉK A.
1 2 3 4 5 6 7
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
In general, stripes is faster than pairs
class Mapper method Map ( null , items [i1 , i2 ,...] ) for all item i in [i1 , i2 ,...] H = new AssociativeArray : item -> counter for all item j in [i1 , i2 ,...] H{ j} = H{j } + 1 Emit ( item i , stripe H)
8 9 10 11 12 13 14
class Reducer method Reduce ( item i , stripes [H1 , H2 ,...]) H = new AssociativeArray : item -> counter H = merge - sum ( [H1 , H2 ,...] ) for all item j in H. keys () Emit ( pair [i j], H{j })
Applications: Text Analysis, Market Analysis References: Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce A.3. Relational MapReduce Patterns In this section we go though the main relational operators and discuss how these operators can implemented in MapReduce terms.
A.3.1. Selection 1 2 3 4
class Mapper method Map ( rowkey key , tuple t ) if t satisfies the predicate Emit ( tuple t , null )
A.3.2. Projection Projection is just a little bit more complex than selection, but we should use a Reducer in this case to eliminate possible duplicates.
1 2 3 4
class Mapper method Map ( rowkey key , tuple t ) tuple g = project ( t) // extract required fields to tuple g Emit ( tuple g , null )
5 6 7 8
class Reducer method Reduce ( tuple t , array n ) Emit ( tuple t , null )
54
// n is an array of nulls
FÜGGELÉK A.
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
A.3.3. Union Mappers are fed by all records of two sets to be united.
Reducer is used to eliminate
duplicates.
1 2 3
class Mapper method Map ( rowkey key , tuple t ) Emit ( tuple t , null )
4 5 6 7 8
class Reducer // n is an array of one or two nulls method Reduce ( tuple t , array n ) Emit ( tuple t , null )
A.3.4. Intersection Mappers are fed by all records of two sets to be intersected. Reducer emits only records that occurred twice.
It is possible only if both sets contain this record because record
includes primary key and can occur in one set only once.
1 2 3
class Mapper method Map ( rowkey key , tuple t ) Emit ( tuple t , null )
4 5 6 7 8 9
class Reducer // n is an array of one or two nulls method Reduce ( tuple t , array n ) if n. size () = 2 Emit ( tuple t , null )
A.3.5. Dierence Let's we have two sets of records R and S. We want to compute dierence R S. Mapper emits all tuples and tag which is a name of the set this record came from. Reducer emits only records that came from R but not from S.
1 2 3 4
class Mapper method Map ( rowkey key , tuple t ) // t. SetName is either 'R ' or 'S ' Emit ( tuple t , string t. SetName )
5 6 7 8 9 10
class Reducer // array n can be [ 'R '], ['S '], ['R ' 'S '], or ['S ', 'R '] method Reduce ( tuple t , array n ) if n. size () = 1 and n [1] = 'R ' Emit ( tuple t , null )
55
FÜGGELÉK A.
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
A.3.6. GroupBy and Aggregation Grouping and aggregation can be performed in one MapReduce job as follows. Mapper extract from each tuple values to group by and aggregate and emits them.
Reducer
receives values to be aggregated already grouped and calculates an aggregation function. Typical aggregation functions like sum or max can be calculated in a streaming fashion, hence don't require to handle all values simultaneously. Nevertheless, in some cases two phase MapReduce job may be required see pattern
1 2 3 4 5 6 7 8
Distinct Values as an example.
class Mapper method Map ( null , tuple [ value GroupBy , value AggregateBy , value ...]) Emit ( value GroupBy , value AggregateBy ) class Reducer method Reduce ( value GroupBy , [v1 , v2 ,...]) // aggregate () : sum () , max () ,... Emit ( value GroupBy , aggregate ( [ v1 , v2 ,...] ) )
A.3.7. Joining Joins are perfectly possible in MapReduce framework, but there exist a number of techniques that dier in eciency and data volumes they are oriented for. In this section we study some basic approaches. The references section contains links to detailed studies of join techniques.
Repartition Join (Reduce Join, Sort-Merge Join) This algorithm joins of two sets R and L on some key k. Mapper goes through all tuples from R and L, extracts key k from the tuples, marks tuple with a tag that indicates a set this tuple came from (`R' or `L'), and emits tagged tuple using k as a key. Reducer receives all tuples for a particular key k and put them into two buckets for R and for L. When two buckets are lled, Reducer runs nested loop over them and emits a cross join of the buckets. Each emitted tuple is a concatenation R-tuple, L-tuple, and key k. This approach has the following disadvantages:
Mapper emits absolutely all data, even for keys that occur only in one set and have no pair in the other.
Reducer should hold all data for one key in the memory.
If data doesn't t the
memory, its Reducer's responsibility to handle this by some kind of swap. Nevertheless, Repartition Join is a most generic technique that can be successfully used when other optimized techniques are not applicable.
1 2 3 4
class Mapper method Map ( null , tuple [ join_key k , value v1 , value v2 ,...]) Emit ( join_key k , tagged_tuple [ set_name tag , values [v1 , v2 , ...] ] )
5
56
FÜGGELÉK A.
6 7 8 9 10 11 12 13 14 15
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
class Reducer method Reduce ( join_key k , tagged_tuples [t1 , t2 ,...]) H = new AssociativeArray : set_name -> values // separate values into 2 arrays for all tagged_tuple t in [ t1 , t2 ,...] H{ t. tag }. add (t . values ) // produce a cross - join of the two arrays for all values r in H { 'R '} for all values l in H { 'L '} Emit ( null , [ k r l ] )
Replicated Join (Map Join, Hash Join) In practice, it is typical to join a small set with a large one (say, a list of users with a list of log records). Let's assume that we join two sets R and L, R is relative small. If so, R can be distributed to all Mappers and each Mapper can load it and index by the join key. The most common and ecient indexing technique here is a hash table. After this, Mapper goes through tuples of the set L and joins them with the corresponding tuples from R that are stored in the hash table. This approach is very eective because there is no need in sorting or transmission of the set L over the network, but set R should be quite small to be distributed to the all Mappers.
1 2 3 4 5 6
class Mapper method Initialize H = new AssociativeArray : join_key -> tuple from R R = loadR () for all [ join_key k , tuple [r1 , r2 ,...] ] in R H{ k} = H{k }. append ( [r1 , r2 ,...] )
7 8 9 10
method Map ( join_key k , tuple l ) for all tuple r in H{ k} Emit ( null , tuple [k r l] )
References:
Join Algorithms using Map/Reduce, Optimizing Joins in a MapReduce
Environment
57