Možnosti dotazování nad proudy dat Jan Pešek
Datové proudy: použití - Monitor sítě - Monitor dopravy - Vyhledávání na webu
Model datových proudů - (celá) data nejsou uložena a nejsou dostupná z disku či paměti - data přicházejí průběžně v podobě jednoho či více proudů
Model datových proudů - model datových proudů vs. klasický relační model: ● Datové elementy přicházejí v reálném čase ● Systém nemá kontrolu nad pořadím příchodu dat ● Proud dat může mít neomezenou velikost ● Data jsou po zpracování zahozena nebo archivována (problém s opětovným zpracováním)
Dotazy nad proudy dat - dotazy mohou spojovat (join) jak proudy dat, tak uložená data - podobné jako dotazování nad tradičními databázemi - rozdíly: - jednorázové dotazy vs. nepřetržité dotazy - dopředu definované dotazy vs. ad hoc dotazy
Jednorázové dotazy - třída dotazů, které zahrnují i tradiční RDBMS dotazy - vyhodnocují se jednou za čas nad snapshotem dat
Nepretržité dotazy - odpověď na nepřetržité dotazy je tvořena v průběhu času - výsledek se může ukládat, nebo může vytvářet další proud dat
Předdefinované dotazy - jsou dostupné i před příchodem dat - obvykle nepřetržité dotazy
Ad hoc dotazy -
spouštějí se ve chvíli, kdy přijdou data můžou být jak jednorázové, tak nepřetržité nejsou známé předem -> nelze optimalizovat výsledek dotazu může vyžadovat data, která přišla dříve a již jsou smazané
Využití datových proudů: další příklady ● Traderbot - webový finanční vyhledávací engine ● iPolicy Networks - firewall a detekce průniku do sítě ● monitoring webu
Využití datových proudů: příklad - síť zákazníka ( C ) customer ----> isp - backbone ( B ) isp <-----------> isp src: Ip adresa odesílatele dest: Ip adresa příjemce id: identifikátor paketu len : délka paketu time: kdy byla nahraná hlavička
Využití datových proudů: příklad Q1: nepřetržitý dotaz - výpočet průměrného zatížení linky B po minutách Q1: SELECT notifyoperator(sum(len)) FROM B GROUP BY getminute(time) HAVING sum(len) - šlo by řešit triggery - neefektivní
Využití datových proudů: příklad Q2: blokující operátory - izoluje tok na backbone a zjistí zatížení každého proudu (sekvence dat od stejných send/rcv) Q2: SELECT flowid, src, dest, sum(len) AS flowlen FROM ( SELECT src, dest, len, time FROM B ORDER BY time ) GROUP BY src, dest, getflowid(src, dest, time) AS flowid
Využití datových proudů: příklad Q2: blokující operátory - problémy Q2: - použití klasické techniky pro GROUP BY a ORDER BY - blokace v exekučním plánu
Využití datových proudů: příklad Q3: Ad hoc nepretržitě Q3: (SELECT count (*) FROM C, B WHERE C.src = B.src and C.dest = B.dest and C.id = B.id) - join mezi proudy B a C - výsledek je počet společných paketů - problémy s nekonečným prostorem (neomezený delay mezi pakety) - řešení: udržovat konečnou paměť spojení proudů - spojovat dvojice pouze v určitém časovém okně
Využití datových proudů: příklad Q4: Nepretržitý dotaz - monitorování párů odesílatel - příjemce v top 5% backbone traffic Q4: WITH Load AS (SELECT src, dest, sum(len) AS traffic FROM B GROUP BY src, dest) SELECT src, dest, traffic FROM Load AS L1 WHERE (SELECT count(*) FROM Load AS L2 WHERE L2 .traffic < L1 .traffic) > (SELECT 0.95 * count(*) FROM Load) ORDER BY traffic
Dotazy nad proudy dat - specifické problémy ● Požadavek na neomezenou paměť ● Přibližná odpověď dotazu ● Pohyblivé okno dat ● Dávkové zpracování ● Vzorkování (sampling)
Neomezená paměť - kvůli možné neomezenosti datového proudu může být neomezený i prostor na vyhodnocení dotazu - algoritmy na operace v externí paměti nejsou na nepřetržité dotazy vhodné a na real-time aplikace jsou příliš pomalé - latence algoritmů musí být nízká
Neomezená paměť - rozlišení dotazů na ty, jež se dají ohraničit pamětí a na ty, které se musí aproximovat - jak odhadnout dopředu na základě dotazu jeho požadavky? - otevřený problém
Sliding Windows - namísto dotazu nad celým proudem se dotaz provede pouze nad vybranou částí dat - jednoduché a přirozené řešení - zohledňuje nedávná data, jež jsou pro real time aplikace (obvykle) nejdůležitější
Aproximace dotazu - i bez neomezené paměti je možné výsledek s vysokou přesností odhadnout - Možné techniky: sketches, random sampling, histograms, wavelet
Batch Sampling Synopses - idea: nezpracovávat každou část dat, ale vybrat si vhodné vzorky - předpoklad, že odpovědí je datová struktura, kterou lze spravovat - ta se pak upraví vždy, když přijdou nová data - namísto real time je možné update provádět periodicky v určitém čase - vhodné vzorkování umožní určení hranice chyby
Blokující operátory - některé operátory neumožní vyhodnocení dotazu do chvíle, než dorazí poslední data - např. agregační operátory (SUM, COUNT MAX..) - při nekonečných proudech dotaz nikdy neskončí
Queries Referencing Past Data - v klasickém modelu datových proudů platí, že co bylo jednou zpracované, nemůže být použité znovu - některé dotazy závislé na již zahozených datech tak nevrátí přesný výsledek - omezující pravidlo: ad hoc dotazy se mohou dotazovat pouze na budoucí data - jiná možnost: spravovat sumarizace (přehledy) starých dat
Stream: Implementace DSMS - Stanford University: Stream Stanford Stream Data Manager http://infolab.stanford.edu/stream/
- kód dostupný pod BSD licencí - server pouze pro Linux, klient v Javě - vývoj ukončen - primárně zpracovává online dotazy, ale podporuje i offline (Archive) - HTTP interface, podporuje SOAP
Stream: Dotazovací jazyk - modifikované SQL - klauzule FROM se může kromě relací vztahovat také na proudy - možnost definovat sliding window - SQL99 obsahuje analytické funkce, jež umožňují provádět agregační operace nad sliding window, nicméně toto nestačí pro datový proud, jelikož se nedá uplatnit na neagregační operace jako JOIN (viz. SELECT ... OVER)
Stream: Dotazovací jazyk - sliding windows - vyžadují možnost řazení datových prvků - pomocí časových razítek: buď implicitních (čas), nebo explicitních (integer, ...) - formálně: S = {(s1, i1), ..., (si, ii)}, kde S je datový proud, si n-tice (data) a ii razítko - specifikace SQL je rozšířena o možnost specifikace okna - za specifikací proudu v klauzuli FROM, uzavřeno do hranatých závorek
Stream: Dotazovací jazyk - sliding windows - Příklad: SELECT AVG(S.minutes) FROM Calls S [PARTITION BY S.customer_id ROWS 10 PRECEDING WHERE S.type = 'Long Distance'] - specifikace okna se skládá z: - volitelné klauzule PARTITION, která dělí data do skupin, a udržuje okna zvlášť pro každou skupinu
Stream: Dotazovací jazyk - sliding windows - velikost okna (specifikovaná buď ve "fyzických" jednotkách - počtu prvků v okně, nebo v "logických" jednotkách, např. časový rozsah 30 dnů) - volitelný filtrovací predikát (klauzule WHERE) - velikost okna je specfikována pomocí klíčového slova ROWS (např. ROWS 50 PRECEDING)
Stream: Dotazovací jazyk - sliding windows - alternativně je možno udat velikost okna pomocí klíčového slova RANGE (např. RANGE 15 MINUTES PRECEDING) - filtrovací predikát je přidán kvůli efektivitě, viz. následující příklad: - SELECT AVG(S.minutes) FROM Calls S [PARTITION BY S.customer_id ROWS 10 PRECEDING] WHERE S.type = 'Long Distance'
Stream: Dotazovací jazyk - sliding windows - SELECT AVG(V.minutes) FROM (SELECT S.minutes FROM calls S, Customers T WHERE S.customer_id = T.customer_id AND T.membership = 'Gold') V [ROWS 1000 PRECEDING]
Stream: Dotazovací jazyk - časové razítka - implicitní razítka (tedy takové, jež systém přidává automaticky) se použijí, pokud data nenesou žádnou informaci o jejich pořadí, nebo pokud na přesném čase (pořadí) v souvislosti s daty nezáleží, ale je třeba "porovnávat starší" - explicitní razítka představují informaci relevantní ke zbytku datům v n-tici (např. se vztahují k nějaké vnější události)
Stream: Dotazovací jazyk - časové razítka - explicitní razítka mají problém v tom, že n-tice v proudu nemusí přijít ve stejném pořadí jim odpovídajícím (např. vlivem sítě) - toto prakticky znemožňuje vyvářet sliding windows definovaná vůči explicitním razítkům - nicméně pokud je vstupní proud "prakticky" setříděný, malé odchylky je možné snadno opravit pomocí bufferování
Stream: Dotazovací jazyk - časové razítka - standardně předpokládáme, že razítko jednoznačně určuje pořadí prvku v proudu (ať už se jedná o časový údaj, nebo např. celočíselné číslování) - co když však budou data na výstupu odvozena z více proudů? (např. pokud máme okno nad výstupem dotazu, jež provádí operaci JOIN nad dvojicí proudů) - co přesně pak představuje dané razítko?
Stream: Dotazovací jazyk - časové razítka - buď přidělovat n-ticím produkovaným JOINem nové razítka - předpoklad, že n-tice, které přišli dříve, mají také větší pravděpodobnost skrz tento JOIN projít dříve - tento přístup lépe vyhovuje implicitním razítkům + jednoduchost implementace - nemožnost deterministicky definovat slidingwindow na poddotazech
Stream: Dotazovací jazyk - časové razítka - druhá možnost: uživatel v dotaze určí, které razítko se má použít jako výsledný (pro výstupní n-tici) - lze použít pro implicitní i explicitní razítka - např. jednoduše pomocí pořadí, v jakém jsou proudy v dotazu napsané (v každém JOINu se použijí razítka prvního napsaného proudu) - problém: více n-tic může mít stejné razítko
Stream: Dotazovací jazyk - časové razítka - řešení: v případě, že by se měl přidělit duplicitní razítko, použije se razítko n-tice z druhého proudu - přesněji: nejprve se řadí, "stejné" razítka jsou nahrazeny a dotříděny - např.: SELECT * FROM S1 [ROWS 1000 PRECEDING], S2 [ROWS 1000 PRECEDING] WHERE S1.A = S2.B
Stream: Dotazovací jazyk - časové razítka - druhý přístup - implementační problém - pokud chceme výstupní data z JOINu setříděné dle razítka, musíme bufferovat dokud nemáme jistotu, že budoucí vstup nenaruší pořadí odesílaných dat - př.: spojí-li se n-tice z S1 a S2, je stále možné, že budoucí n-tice z S2 se spojí se starší n-ticí z S1, která stále spadá do současného okna
Stream: Dotazovací jazyk - časové razítka - ve složitějších dotazech se tato chyba může propagovat a zhoršovat - který způsob tedy použít? - záleží na konkrétním dotazu - pokud sliding-window slouží jako prostředek pro zlepšení výkonu, zpravidla stačí použít první "best-effort" způsob -pokud však pořadí n-tic hraje roli ve významu dotazu, je třeba použít druhý způsob
Stream: Dotazovací jazyk - časové razítka - ve STREAMu jsou k dispozici oba způsoby, pro první ("best-effort") způsob se v definici okna nahradí klíčové slovo PRECEDING slovem RECENT - např. "ROWS 10 PRECEDING" určuje okno předchozích 10 n-tic seřazených striktně podle razítka, "ROWS 10 RECENT" pak umožňuje v případě potřeby DSMS použít vlastní systém řazení
Stream: Dotazovací jazyk - časové razítka - klíčové slovo RECENT je možné použít pouze s "fyzickými" velikostmi, není tedy možné specifikovat např. "RANGE 3 DAYS RECENT"
Stream: Dotazovací jazyk provádění dotazu
Stream: Dotazovací jazyk provádění dotazu - operátory - fronty (spojují operátory) - synopse (datové struktury) - paměť je přidělována dynamicky mezi synopse a fronty - narozdíl od jiných systémů (Aurora, Eddies) jsou jednotlivé fronty oddělené - operátor čte ze vstupu, upravuje jemu náležící synopse, a zapisuje výstup
Stream: Dotazovací jazyk provádění dotazu - čas na provádění dostávají operátory od centrálního "plánovače" (scheduler) - doba, kdy může operátor zpracovávat data může být různá - může se jednat o časový úsek, metrikou může být i určitý počet n-tic (na vstupu či na výstupu) - Aurora, Eddies: plánovač vezme n-tici z globální fronty a přidělí čas "jejímu" operátoru - umí i STREAM
Stream: Dotazovací jazyk provádění dotazu - optimalizace především na dostupnou paměť - run-time paměť může být operátoru kdykoli odebrána a předána jinému - přesnost vs. paměť
Stream: Shrnutí - využívá deklarativní jazyk pro dotazy (vycházející z SQL), narozdíl od jiných DSMS (Aurora, Hancock, ...) => vyžaduje komplexní plánovač dotazů - rovněž poskytuje možnost přímého zadávání již naplánovaných dotazů (relační algebra) - monitoring běžících dotazů - změny nastavení za jejich běhu (alokace paměti, nastavení plánovače)
Aurora: boxes and arrows
Algoritmické problémy Algoritmus datových proudů (data streams) příjímá jako vstup sekvenci prvků x1, ... xn, ... zvaných datový proud (tok), přičemž tato sekvence je vždy čtena pouze jednou ve směru vzrůstajících indexů. Algoritmus musí udržovat hodnotu funkce f na základě přečtených prvků této sekvence.
Algoritmické problémy - metriky pro porovnávání algoritmů: - spotřebovaná paměť - čas potřebný na zpracování prvku - čas potřebný na vyhodnocení funkce f z uchovávané datové struktury, pokud algoritmus nějakou má - dle alternativní definice, kde je možné číst proud opakovaně je metrikou rovněž počet průchodů
Algoritmické problémy - měřit je možné vzhledem k N, kde N značí počet dosud přečtených prvků - N je neomezené - ideální algoritmus by byl na N nezávislý - problém je považovaný za "efektivně řešitelný", pokud je řešitelný v místě O(poly(log N)) a v čase O(poly(log N)) na prvek - poly značí polynomiální funkci
Algoritmické problémy: Random Samples - základní metoda pro tvorbu synopsí - z proudu bere náhodný vzorek - uniform sample vs. stratified sample - reservoir sampling (nejdříve naplnit pole, následně se snižující se pravděpodobností měním náhodný prvek tohoto pole za nově příchozí)
Algoritmické problémy: Sketching Techniques - způsob, jak vytvořit "přehled" o proudu s využitím malého množství paměti - pomocí tohoto přehledu je možné poměrně přesně odhadovat některé dotazy
Algoritmické problémy: Sketching Techniques - jak to funguje: S=(x1, ..., xn) xi náleží jedné z domén D={1, ..., d} mi = |{j | xj = i} určuje počet výskytů i v S pro nezáporné k je pak k-tý moment četnosti (kth frequency moment) určen jako
Algoritmické problémy: Sketching Techniques - momenty četnosti zachycují rozdělení hodnot vS - např. F0 je množství odlišných hodnot v sekvenci, F1 je délka sekvence, Foo je četnost nejčetnější hodnoty - pro výpočet těchto hodnot existuje mnoho různých algoritmů různých výsledků (např. Alon, Matias, Szegedy - F0 v O(log d), F2 v O(log d + log N))
Algoritmické problémy: Histogramy - histogram = struktura pro sumarizaci dat - zachycuje rozdělení hodnot v množině dat - používají se např. k odhadu velikosti dotazu, aproximaci odpovědi dotazu či k data miningu - existuje několik vhodných typů histogramů
Algoritmické problémy: Histogramy V-optimal histogram - aproximuje rozdělení množiny v1, ..., vn pomocí "skokové" konstantní funkce v(i), minimalizuje druhou mocninu odchylky - histogram užívá metodu "věder" (buckets) rozděluje data do určitých částí - idea: každý čtený prvek je "aktualizací" vektoru délky N, který se snažíme aproximovat histogramem o B "vědrech"
Algoritmické problémy: Histogramy V-optimal histogram - na setříděném proudu: místo i čas O(B2 log N) - na nesetříděném: místo i čas ohraničeno poly (B, log N, 1/e), kde e je připouštěná relativní odchylka
Algoritmické problémy: Histogramy Equi-width histogram - počítá kvantily - hodnoty, které dělí proud na zhruba stejně velké části - Greenwald, Khanna: O(1/e * log N), garantuje přesnost eN
Algoritmické problémy: Histogramy End-biased histograms - Iceberg queries - často využívané dotazy jsou na jednoduché agregace určitého atributu v určitém rozsahu (např. počet záznamů) = tzv. Iceberg queries - Manku a Motwani: pro určitý atribut se udržuje množství odlišných hodnot spolu s jejich četností, při přidání nového prvku se zkoumá zda již existuje, prvky s nízkou čeností se mažou
Algoritmické problémy: Histogramy End-biased histograms - Iceberg queries - algortmus tedy udržuje přehled o prvcích, jejichž četnost je vysoká(přesněji vyšší než 1/e) - navíc garantuje, že četnost těchto prvků, ač ve skutečnosti menší, než udržovaná hodnota, není menší o více než eN - vyžaduje O(1/e log (eN)) místa
Algoritmické problémy: Wavelets - technika, jak poskytovat přehled o datech - koeficienty jsou projekce daného proudu na ortogonální množinu vektorů - z dané "vlnky" lze celkem přesně rekonstruovat původní množinu (proud)
Algoritmické problémy: Wavelets - efektivní např. pro odhady pro výsledky selectu či vícerozměrné agregace - na setříděném proudu lze použít upravený hladový algoritmus s paměťovou náročností O (B + log N) - implementace zůstává otevřeným problémem
Algoritmické problémy: Sliding windows - zabraňují "zastaralým" datům ovlivňovat statistiky, slouží jako aproximační nástroj - problém: jak efektivně udržovat statistiky nad určeným oknem - jak nejlépe "implementovat" předcházející algoritmy - Datar a spol.: implementace sketches s místem O(1/e log N), kde e je přesnost, N velikost okna
Algoritmické problémy - dále např. data mining - udržování rozhodovacích stromů nad proudem (Domingos: O(Ne) místa a O(poly(log N)) času na prvek)
- multiple streams - práce se sjednocením proudů (Gibbons, Tirthapura: lze využít sketching) - zkoumání seřazenosti - užitečné např. pro volbu třídícího algoritmu - odhad počtu inverzí v permutaci (Ajtai a spol.: O(log N log (log N)) místa, O(log N) času na prvek)
Závěr: zpět na začátek "Meta-otázky" položené Babcockem a spol.: - lze pro efektivní zpracování "on-line" proudů dat udělat lepší systém, než klasické DBMS s jejich triggery, dočasnými strukturami, ...? - je potřeba navrhovat další obecné modely, algoritmy a systému pro datové proudy? - existuje/í "killer app/s" pro systémy zpracování datových proudů?
Závěr: budoucnost DSMS - pokud na předcházející otázky odpovíme kladně, je třeba vyřešit několik otázek, především: - distributivita (přesměrovávat vysoce vytížené proudy na centrální bod ke zpracování je neefektivní) - interface DSMS (modifikované SQL vs. jiný přístup) - timestamping
Závěr: budoucnost DSMS - efektivnost vyhodnocování dotazů, konstrukce synopsí, správa zdrojů, aproximování zpracování dotazů, ... - definování extenze relačních operátorů "stream algebra"?
Závěr: zdroje Babcock, et.al.: Models and Issues in Data Stream Systems Maskei, Madden a spol.: Borealis Distributed Stream Processing Engine Arasu, et. al.: STREAM: The Stanford Stream Data Manager