típus egy olyan delegáltnak felel meg, ami nem kap paramétert, és a visszatérési típusa R. Van neki azonban még nagyon sok verziója, amikkel a paraméterek típusát tudjuk megadni, azaz egy Func egy olyan metódusnak felel meg, ami vár egy P típusú paramétert, a visszatérési típusa pedig R. És ezt lehet fokozni 16 bemeneti paraméterig (P1, P2, P3, …). A jelenlegi példában 3 típusparaméter található, 2 paraméter és a visszatérési típus. Az első típusparaméter egy újabb Func, ami – újabb magyarázat nélkül – egy string típusú paramétert vár, a visszatérési típusa pedig Task>, azaz egy aszinkron metódusról van szó, esetünkben ez a szervizhívás lesz. A második típusparaméter egy string. Ezt fogjuk paraméterül átadni az előző paraméterként átadott delegáltnak. A harmadik típusparaméter, azaz a visszatérési típus, pedig az IObservable>. Ha nem lenne itt a kód többi része, akkor is sejteni lehetne, hogy az első paraméterként átadott aszinkron hívást fogjuk adatfolyammá alakítani. A kód többi része már csak egy Lambda kifejezés, ami ezt a csomagoló metódust reprezentálja. Láthatóan két paramétert kap (név szerint az asyncCall és parameter nevű paramétereket), és egy adatfolyamot ad vissza. Látható, hogy a paraméterül kapott aszinkron metódust meghívjuk a szintén paraméterül kapott paraméterrel, majd adatfolyammá alakítjuk a ToObservable() operátorral, és ráakasztjuk a Timeout() és Retry() operátorokat is. A sok szenvedés végül meghozza a gyümölcsét, most már szépen egyetlen sorban bele tudjuk fűzni a felokosított szervizhívást a keresődoboz adatfolyamába. 2-3-7 kódrészlet: Csomagoló funkció felhasználása var observableSuggestions = queryTextChanged .Select(q => wrapAsyncCall( service.GetSuggestionsForQuery, q));
25
A befűzés a Select() operátorral kezdődik, ám nem azzal fog véget érni. Ez a két sor kód azt eredményezi, hogy minden billentyűlenyomásra (persze lefojtva, megszűrve) elindítunk egy szervizhívást az aktuális tartalmával a keresőmezőnek. Ezzel azonban azt értük el, hogy van egy adatfolyamunk, amin további adatfolyamok jelennek meg, amikben majd végül megjelenik az egyes szervizhívások eredménye. A cél az volna, hogy megszüntessük ezt az „adatfolyam az adatfolyamban” állapotot és valahogy csak a legutolsó hívás által generált adatfolyamra figyeljünk, ezzel megszüntetve az egyes adatfolyamokban párhuzamosan megjelenő eredmények „versenyét”. Szerencsére kifejezetten erre a célra lett kitalálva a Switch() operátor, így nem fog nehezünkre esni, megoldani ezt a problémát. 2-3-8 kódrészlet: Versenyhelyzet kezelése var observableSuggestions = queryTextChanged .Select(q => wrapAsyncCall(service.GetSuggestionsForQuery, q)) .Switch();
Mostanra van egy adatfolyamunk, ami már lényegében mindent tartalmaz, amire csak szükségünk volt. Lefojtottuk és megszűrtük az ismétlődésektől az inputot, és „felokosítottuk” a szervizhívást. Az utolsó lépés, hogy végre a csővezeték végére álljunk, és megjelenítsük az eredményt. Normál esetben erre a célra a Subscribe() metódust használnánk, azonban itt ezt nem tehetjük meg. Az Rx-es adatfolyamok 3 csatornából állnak: OnNext, OnError és OnCompleted. OnNext eseményből bármennyi lehet egy adatfolyamban élete során, OnError vagy OnCompleted eseményből azonban csak egy. Ha valami hiba történik az adatfolyamban, akkor végigmegy rajta egy OnError üzenet, és lezárul a cső. Ilyen eset márpedig ebben a példában előfordulhat, és nekünk arra van szükségünk, hogy mégis csak megjelenhessen akármennyi OnError esemény és még se álljon le az adatfolyam. A hibákat a Retry() operátorral fogjuk eliminálni. Ez gondoskodik arról, hogy elnyelje az esetleges hibát és újraindítsa az adatfolyamot. Ahhoz pedig, hogy mégis értesüljünk a hibákról is (még mielőtt a Retry() elnyelné) a Do() operátort fogjuk használni. Ezt az operátort az adatfolyam tetszőleges részére beszúrhatjuk és megfigyelhetjük az adatfolyam aktuális állapotát, beleértve az OnNext, OnError és OnCompleted csatornákat. Megint csak a végleges kód szépítése érdekében érdemes bevezetni két segédmetódust: egyet az OnNext csatorna kezelésére, egyet pedig az OnError-éra.
26
2-3-9 kódrészlet: Feliratkozás az OnNext és OnError eseményekre var onNext = new Action>(values => { ErrorLabel.Visibility = Visibility.Collapsed; Suggestions.ItemsSource = values; }); var onError = new Action<Exception>(error => { ErrorLabel.Visibility = Visibility.Visible; ErrorLabel.Text = error.Message; });
Ezen két segédmetódus felhasználásával a végleges kódot a 2-3-10 kódrészlet mutatja. 2-3-10 kódrészlet: Végleges Rx kód a keresési javaslatokra var queryTextChanged = Observable.FromEventPattern(SearchBox, "TextChanged") .Select(e => SearchBox.Text) .Throttle(TimeSpan.FromMilliseconds(100)) .DistinctUntilChanged(); var observableSuggestions = queryTextChanged .Select(q => wrapAsyncCall( service.GetSuggestionsForQuery, q)) .Switch() .ObserveOnDispatcher() .Do(onNext, onError) .Retry(); observableSuggestions.Subscribe();
Az ObserveOnDispatcher() operátorra azért van szükség, hogy az utána következő műveletek a Dispatcher-en, azaz a UI szálon fussanak. A Subscribe() hívásra azért van szükség, mert azzal indítjuk be az egész adatfolyamot. A feliratkozás alulról indul el, azaz amikor egy ilyen hosszú operátorlánc végén meghívjuk a Subscribe() metódust, akkor hátulról kezdve elkezdenek feliratkozni egymásra az operátorok, míg nem a sor elejére érnek, ahol megtörténik a SearchBox TextChanged eseményére a feliratkozás és megindul az adatfolyam.
Keresési találatok A keresés találatainak megszerzése nagyon hasonló a fentebb leírtakhoz. A különbség csupán annyi, hogy több esemény hatására is elindulhat a keresés: megnyomja a felhasználó a keresés gombot, vagy entert üt a szövegdobozban, vagy kiválaszt egy javaslatot a listából.
27
2-3-11 kódrészlet: Eseményfeliratkozások var searchButtonClicked = Observable.FromEventPattern(SearchButton, "Click") .Select(_ => SearchBox.Text); var enterKeyPressed = Observable.FromEventPattern (SearchBox, "KeyDown") .Where(e => e.EventArgs.Key == VirtualKey.Enter) .Select(_ => SearchBox.Text); var suggestionSelected = Observable.FromEventPattern(Suggestions, "ItemClick") .Select(e => e.EventArgs.ClickedItem as string);
Ezzel a három adatfolyammal van három azonos típusú forrásunk, melyeken különböző események hatására a keresődoboz tartalma vagy épp egy kiválasztott javaslat jelenik meg. Ezeket a Merge() operátorral tudjuk összefésülni. 2-3-12 kódrészlet: Különböző eseményforrások összefésülése var observableResults = Observable.Merge(searchButtonClicked, enterKeyPressed, suggestionSelected) .DistinctUntilChanged() .Select(q => wrapAsyncCall(service.GetResultsForQuery, q)) .Switch() .ObserveOnDispatcher() .Do(onNext, onError) .Retry();
Érdemes megfigyelni, hogy a DistinctUntilChanged() operátort itt az összefésülés után használjuk, azaz ha valamit beír a felhasználó és entert nyom és rányom a keresés gombra is, akkor sem küldünk extra szervizhívást. A teljesség kedvéért most is mellékelem a teljes kódot. A kód itt mindenféle segédosztályok nélkül teljesen a XAML mögöttes kódjában, azon belül is az oldal konstruktorában helyezkedik el. A kód itt is az inicializációval kezdődik. 2-3-13 kódrészlet: Inicializáció public MainPage() { // Initialization this.InitializeComponent(); this.Loaded += (s, e) => SearchBox.Focus(FocusState.Keyboard); var service = new SearchService();
28
Majd pár helyben definiált segédfüggvénnyel folytatódik, amiktől majd a lényegi kód szép, rövid, átlátható lesz.
2-3-14 kódrészlet: Segédfüggvények // Helper methods var onNext = new Action>(values => { ErrorLabel.Visibility = Visibility.Collapsed; Suggestions.ItemsSource = values; }); var onError = new Action<Exception>(error => { ErrorLabel.Visibility = Visibility.Visible; ErrorLabel.Text = error.Message; }); var wrapAsyncCall = new Func< Func<string, Task>>, string, IObservable>>( (asyncCall, parameter) => { return asyncCall(parameter) .ToObservable() .Timeout(TimeSpan.FromMilliseconds(250)) .Retry(3); });
Végül pedig a két adatfolyam definiálása: A keresési javaslatok a 2-3-15 kódrészletben.
29
2-3-15 kódrészlet: Keresési javaslatok // Suggestions var queryTextChanged = Observable.FromEventPattern(SearchBox, "TextChanged") .Select(e => SearchBox.Text) .Throttle(TimeSpan.FromMilliseconds(100)) .DistinctUntilChanged(); var observableSuggestions = queryTextChanged .Select(q => wrapAsyncCall( service.GetSuggestionsForQuery, q)) .Switch() .ObserveOnDispatcher() .Do(onNext, onError) .Retry(); observableSuggestions.Subscribe();
És a keresési találatok a 2-3-16 kódrészletben:
2-3-16 kódrészlet: Keresési találatok // Results var searchButtonClicked = Observable.FromEventPattern(SearchButton, "Click") .Select(_ => SearchBox.Text); var enterKeyPressed = Observable.FromEventPattern(SearchBox, "KeyDown") .Where(e => e.EventArgs.Key == VirtualKey.Enter) .Select(_ => SearchBox.Text); var suggestionSelected = Observable.FromEventPattern(Suggestions, "ItemClick") .Select(e => e.EventArgs.ClickedItem as string); var observableResults = Observable.Merge(searchButtonClicked, enterKeyPressed, suggestionSelected) .DistinctUntilChanged() .Select(q => wrapAsyncCall(service.GetResultsForQuery, q)) .Switch() .ObserveOnDispatcher() .Do(onNext, onError) .Retry(); observableResults.Subscribe(); }
30
Összefoglalás Már ezen az egyszerű példán is jól látható, hogy mennyivel egyszerűbb, átláthatóbb, érthetőbb, könnyebben karbantartható vagy épp módosítható kódot kapunk az Rx felhasználásával. Érdemes azt is megfigyelni, hogy az operátorok „vakon” kapcsolódnak egymáshoz, azaz bármikor könnyedén ki lehetne cserélni például a keresési javaslatok forrásaként szolgáló keresődoboz eseményét egy rögzített tesztforrásra, amiben különböző előre beállított késleltetésekkel küldünk tovább az adatfolyam többi részének valamilyen szöveget. Vagy épp tesztelési célokból 1-1 sor „kikommentelésével” könnyedén levehetjük a szövegdobozról a fojtást vagy az egyezés vizsgálatot. A következő fejezetben ennél lényegesen mélyebben és átfogóbban fogom bemutatni, hogy hogyan is épül fel az Rx és mi mindent lehet még vele csinálni.
31
Rx = Adatfolyamok + LINQ + Ütemezők Előkészületek Mielőtt bármibe belekezdenénk, készítsük el megint az alapjait annak a projektnek, amivel ebben a fejezetben dolgozni fogunk. Mivel kifejezetten Windows 8 Metro stílusú alkalmazások vannak az írás középpontjában, viszont nekünk most egy konzolalkalmazás kellene, így csinálunk egy Metro stílusú konzolt. Nyissuk meg a Visual Studio-t Indítsunk egy új projektet (File -> New Project), majd a felugró ablakban a baloldali sávban válasszuk ki a Visual C#-on belül a Windows Windows Store-t, jobboldalon pedig a Blank Application-t. Miután elneveztük a projektet, nyomjunk az Ok gombra. Ezzel megvan az üres alkalmazás. Adjuk hozzá a referenciákhoz az Rx-et. Ezt a Project -> Add reference menüpontból tudjuk megtenni, a felugró ablakon belül baloldalt válasszuk a Windows fülön belül az Extensions menüpontot, majd rakjunk egy pipát a "Reactive Extensions for Windows 8" mellé. Ezzel az egy lépéssel hozzáadtuk az összes szükséges dll-t a referenciákhoz. A következő lépés ezután az lesz, hogy elkészítjük a konzolos felületet. Ez nem fog más tartalmazni, mint egy kicsit testreszabott TextBlock-ot, mely a következőképpen fog kinézni: 3-1-1 kódrészlet: A tesztkörnyezet felületének XAML kódja
Ezután pedig elkészítjük a saját WriteLine() metódusunkat a TextBlock osztályhoz, melyet a 3-1-2 kódrészlet szemléltet. 32
3-1-2 kódrészlet: A kiterjesztő metódus public static class Extensions { private static IEnumerable TakeLast(this IEnumerable enumerable, int count) { var takeFrom = enumerable.Count() - count; if (takeFrom < 0) takeFrom = 0; return enumerable.Skip(takeFrom); } public static void WriteLine(this TextBlock textBlock, object obj) { textBlock.Text += Environment.NewLine + obj.ToString(); var lines = textBlock.Text .Split(new string[] { Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries) .TakeLast(25) .Select(str => str += Environment.NewLine); textBlock.Text = string.Concat(lines); } }
Ennek a kis metódusnak a segítségével fogunk tudni az alkalmazásban a 3-1-3 kódrészlethez hasonló kifejezést írni. 3-1-3 kódrészlet: A kiterjesztő metódus használata Console.WriteLine("Hello Rx");
Most, hogy megvan a demózó felület, belefúrhatjuk magunkat a dolgok mélyébe.
Adatfolyamok Az Rx alapja, hogy IObservable típusú adatfolyamokként reprezentálja az események és callbackek sorozatát. Na de mégis miképpen teszi ezt? A rendszer alapját az Observer design pattern képezi, mely két interfészből áll. Az egyik az IObservable (“megfigyelhető”), mely egy adatforrást reprezentál, ami “szól” ha új eleme érkezik, a másik pedig az IObserver (“megfigyelő”), melyet egy megfigyelhető adatforrás megfigyelésére tudjuk feliratkoztatni. Az IObservable interfész mindösszesen egy Subscribe() metódust tartalmaz, ami egy IObserver típusú objektumot vár paraméterként, az IObserver-nek pedig három metódusa van, az OnNext(), OnError() és az OnCompleted(). A legelső kerül meghívásra, ha egy új elem jelenik meg az adatfolyamban, a második, ha valami probléma történne, és az utolsó egy 33
lezáró üzenet, melynek hatására a megfigyelés befejeződik, ezzel jelzi az adatforrás, hogy nem fog több értéket szolgáltatni. Fontos megérteni tehát, hogy az adatfolyam élete során nulla vagy több OnNext értesítést fog kiadni magából, majd a folyam végét (már amennyiben van neki) az OnCompleted vagy OnError üzenet fogja jelezni. Ha e kettő üzenet egyikét megkapjuk, akkor a csatorna megsemmisítésre kerül, a megfigyelés befejeződik, több adatot nem fogunk kapni. A következő alfejezetekben azt fogom bemutatni, hogy miképpen tudunk ilyen adatfolyamokat generálni, vagy meglévő forrást ilyen formába csomagolni. Mielőtt azonban ebbe belemennék, elöljáróban bemutatnám a “feliratkozás” menetét és szolgálnék egy kis jelmagyarázattal az adatfolyamok szemléltetéséhez. A feliratkozást 6 féleképpen tehetjük meg. Paraméterek nélkül, aminek látszólag nem sok értelme van, de később azért ez is értelmet fog nyerni Egy IObserver interfészt megvalósító objektumot átadva paraméterként A maradék négy lehetőség pedig azon esetek kombinációiból áll össze, amikor helyben adjuk meg az OnNext, OnCompleted és OnError eseménykezelőjét például egy-egy Lamdba kifejezéssel.
OnNext
OnNext + OnCompeted
OnNext + OnError
OnNext + OnCompleted + OnError
Egy jó ideig most a legutóbbi verziót fogjuk használni, mert annak a leírása áll a legkevesebb sorból. 3-2-1 kódrészlet: Feliratkozás egy Rx adatfolyamra source.ObserveOnDispatcher().Subscribe( value => Console.WriteLine("OnNext: " + value), error => Console.WriteLine("OnError: " + error.Message), () => Console.WriteLine("OnCompleted"));
Mikor elkészítünk egy tetszőlegesen bonyolult adatfolyamot sok művelet összeláncolásából, csak akkor kezd el dolgozni, mikor feliratkozunk rá. Ez logikus, hiszen ha nincs egyetlen megfigyelő sem, akkor miért dolgozzon feleslegesen a háttérben a sok operátor? Valójában a Subscribe() metódusnak van egy IDisposable visszatérési értéke, és értelemszerűen az adott feliratkozást menet közben ezzel tudjuk explicit módon megszakítani. Mikor meghívjuk a Dispose() metódust ezen a visszakapott objektumon, akkor az egész láncon végigmegy és megszüntet minden feliratkozást visszamenőlegesen. Ez automatikusan is megtörténik, ha az adatfolyamban érkezik egy lezáró üzenet. 34
Az operátorok működését idővonal(ak) segítségével fogom szemléltetni. Egy átlagos adatfolyam idővonalát a 3-2-1 ábra szemlélteti.
3-2-1 ábra: Idővonal szemléltetése Az adatfolyamban kiszámíthatatlanul, különböző időközönként megjelennek adatok (OnNext), majd a végén egy OnCompleted lezáró üzenet. Amennyiben valami hiba történne, akkor az adatfolyam végét egy pirosra színezett karika fogja jelölni, mely az OnError lezáró üzenetet fogja jelképezni.
Generátor függvények Amennyiben generáltatni szeretnénk az adatfolyamunkat, rendelkezésünkre áll egy pár primitív operátor. Ezekkel elő tudjuk állítani a legalapvetőbb adatfolyamokat, melyeknek önmagukban nem feltétlenül van sok értelme, de akár találkozhatunk is olyan szituációval, amikor tényleg nem kell ennél több.
Never A Never() operátor egy olyan adatfolyamot ad, amin soha nem történik semmi, de még csak vége sincs. 3-2-2 kódrészlet: Never függvény var source = Observable.Never<string>();
Ennek az operátornak tehát az idővonala nem túl bonyolult:
3-2-2 ábra: Never függvény idővonala
Empty Az Empty() operátor olyan adatfolyamot ad, mely feliratkozás után azonnal le is zárul egy OnCompleted üzenettel. 35
3-2-3 kódrészlet: Empty függvény var source = Observable.Empty<string>();
3-2-3 ábra: Empty függvény „idővonala”
Return A Return() operátor olyan adatfolyamot ad, mely egyetlen értékből és egy OnCompleted üzenetből áll. 3-2-4 kódrészlet: Return függvény var source = Observable.Return<string>("A");
3-2-4 ábra: Return függvény idővonala
Throw A Throw() operátor egy olyan adatfolyamot ad vissza, mely mindösszesen egy OnError üzenetet küld rögtön feliratkozáskor. 3-2-5 kódrészlet: Throw függvény var source = Observable.Throw<string>(new Exception("X"));
3-2-5 kódrészlet: Throw függvény idővonala
36
Range A Range() operátor segítségével egy megadott kezdőértéktől bizonyos számú elemet tudunk generáltatni. 3-2-6 kódrészlet: Range függvény var source = Observable.Range(1, 5);
3-2-6 ábra: Range függvény idővonala
Generate A Generate() operátorral már nagyobb kontrollt kapunk a generálásban, itt a for ciklushoz hasonló kifejezéseket kell megadnunk. Az első paraméter a kiinduló érték, a második paraméter a megállási feltételt vizsgáló funkció, a harmadik a léptető funkció, a negyedik pedig a kiválasztó, azaz az a függvény, ami megmondja, hogy mit akarunk kezdeni azzal a ciklusváltozóval. 3-2-7 kódrészlet: Generate függvény var source = Observable.Generate(0, i => i < 5, i => i + 1, i => i * i);
3-2-7 ábra: Generate függvény idővonala
ToObservable Ez a függvény nem az Observable osztály egy statikus metódusa, ezt IEnumerable típusú kollekciókon tudjuk használni. Az eredménye elég triviális, egy sima IEnumerable típusú listát IObservable adatfolyammá alakít, majd természetesen mivel egy véges hosszúságú forrásról van szó, a végén lezárja az adatfolyamot egy OnCompleted üzenettel.
37
3-2-8 kódrészlet: ToObservable függvény var list = new List<string> { "A", "B", "C", "D", "E", "F", "G", "H", "I" }; var source = list.ToObservable();
3-2-8 ábra: ToObservable függvény idővonala
Interval A szekvenciát generáló függvények folytatásaként a soron következő operátor az Interval(). Ennek az operátornak a segítségével egy végtelen hosszúságú szekvenciát tudunk generáltatni, melynek elemei nullától kezdve egyesével növekednek, de az igazi különlegessége az, hogy az elemek megadott időközönként jelennek meg a szekvenciában. 3-2-9 kódrészlet: Interval függvény var source = Observable.Interval(TimeSpan.FromMilliseconds(100));
3-2-9 ábra: Interval függvény idővonala
Timer Az előzőtől némileg eltérő, de mégis nagyon hasonló feladatra alkalmas a Timer() operátor. Megadhatunk neki egy relatív időeltolást (a feliratkozáshoz képest), vagy abszolút időpontot, amikor is küld egy üzenetet, majd egy OnCompleted üzenettel rögtön le is zárja az általa szolgáltatott adatfolyamot. 3-2-10 kódrészlet: Timer függvény var source = Observable.Timer(TimeSpan.FromSeconds(3));
38
3-2-10 ábra: Timer függvény idővonala Emellett azonban lehetőség van arra is, hogy periodikusságot adjunk meg, azaz, hogy az első esemény után milyen gyakran generáljon újabb elemeket. Ebben az esetben azonban soha nem lesz lezáró üzenet, a végtelenségig fogja generálni az értékeket. 3-2-11 kódrészlet: Timer függvény eltolt kezdéssel var source = Observable.Timer(TimeSpan.FromMilliseconds(500), TimeSpan.FromMilliseconds(100));
3-2-11 ábra: Eltolt kezdésű Timer függvény idővonala
Konverter függvények Ezeknek a függvényeknek a segítségével tudunk könnyedén adatfolyamot készíteni egy eseményből vagy aszinkron hívásból.
FromEvent Kezdjük a legbonyolultabbnak tűnővel. Teljesen általános esemény adatfolyammá alakítása. Ennek a függvénynek a legbonyolultabb szignatúrája a következőképpen néz ki: 3-2-12 kódrészlet: Hagyományos esemény Rx adatfolyammá alakítása public static IObservable FromEvent( Func, TDelegate> conversion, Action addHandler, Action removeHandler);
Csináljunk hát egy eseményt, aminek kellően szokatlan szignatúrája van és nézzük meg hogyan tudjuk felhasználni ezt a függvényt, hogy becsomagoljuk a készített eseményt. Az esemény nézzen ki például a következőképpen:
39
3-2-13 kódrészlet: Speciális, egyedi esemény public event Action<string, int, double> MySpecialEvent;
Ez ugyebár azt jelenti, hogy az eseménykezelőnk egy string-et, egy int-et és egy double-t kell, hogy paraméterül várjon. Konvertáljuk ezt egy adatfolyammá. 3-2-14 kódrészlet: Feliratkozás a speciális, egyedi eseményre var source = Observable.FromEvent , Tuple<string, int, double>>( handler => new Action<string, int, double>( (s, i, d) => handler.Invoke(new Tuple<string, int, double>(s, i, d))), handler => MySpecialEvent += handler, handler => MySpecialEvent -= handler);
Amikor ilyen kódot látunk, a lényeg, hogy ne essünk pánikba. Vegyük sorra hogyan is épül fel ez a hosszú függvényhívás. Először is kell két generikus típusparaméter TDelegate és TEventArgs. A TDelegate egy olyan delegált típusa, amit felhasználhatunk eseménykezelőként. A második generikus paraméterünk egy csomagolóosztály lesz, ez fogja tartalmazni az eseménykezelő paramétereit. Jelen esetben a beépített Tuple típust használtam, de természetesen beszédesebb nevű tulajdonságokkal ellátott saját osztályt is rakhattam volna oda. A függvény első paramétere az igazán nagy falat, annak a függvénynek lesz az a feladata, hogy nyilvánvalóan tetszőleges saját logika alapján becsomagolja az eseménykezelő paramétereit. Ebben a függvényben paraméterül kapjuk a “csomagolt” eseménykezelőt, azaz azt az Action-t, aminek a paraméterei már egyetlen objektumba vannak csomagolva. Ezt kell majd meghívnunk. A feladatunk pedig az, hogy egy olyan eseménykezelőnek megfelelő típusú függvényt adjunk meg, mely az eredeti esemény paramétereit becsomagolja, és a paraméterül kapott függvényt azzal meghívja. Tehát ami függvényt mi itt megadunk, az lesz feliratkoztatva az eredeti eseményre, de az rögtön be is csomagolja az esemény paramétereit és már ezt a speciális “csomagolt” eseménykezelőt hívja tovább. Ha ezen a paraméteren túltettük magunkat, és megértettük a működését, akkor már csak két egyszerű függvény van hátra, amik az eseményre való feliratkozást és leiratkozást végzik. Azaz megkapjuk az eseménykezelő referenciáját, amit nekünk kell fel és leiratkoztatni a kívánt forrásra/ról. A Subscribe() ebben az esetben egy kicsit eltér az eddigiektől, mivel nem egy egyszerű string jelenik meg a csővezeték végén, hanem egy Tuple<string, int, double>, és ennek megfelelően ennek az értékeit egy kicsit máshogy kell kiírni. 40
3-2-15 kódrészlet: Feliratkozás az adatfolyamra source.ObserveOnDispatcher().Subscribe( value => Console.WriteLine("OnNext: " + value.Item1 + " :: " + value.Item2 + " :: " + value.Item3));
És végül, hogy lássunk is valamit, lőjünk el egy eseményt. 3-2-16 kódrészlet: Esemény kiváltása MySpecialEvent("Rx Event", 42, 3.14);
Ennek természetesen van egy leegyszerűsített változata is, amennyiben az esemény egyetlen paramétert tartalmaz és nincs szükség ilyen bonyolult konverziókra. Ebben az esetben a példa kedvéért térjünk vissza egy egyszerű (de még mindig nem “szabványos”) Action<string> típusú eseményre, és nézzük meg mennyivel leegyszerűsödik az élet egy ilyen helyzetben. Az eseményt a 3-2-17 kódrészlet mutatja. 3-2-17 kódrészlet: „Egyszerű” esemény public event Action<string> MySpecialEvent;
A konverziót pedig a 3-2-18 kódrészlet.
3-2-18 kódrészlet: „Egyszerű” esemény konverziója var source = Observable.FromEvent<string>( handler => MySpecialEvent += handler, handler => MySpecialEvent -= handler);
A Subscribe() metódusból pedig visszaállíthatjuk az eredeti verziót, mert most már megint egy egyszerű string jelenik meg a csővezeték végén, nem kell összetett típusok tulajdonságai között turkálni.
FromEventPattern Ez egy speciális formája az eseményeknek, noha az ilyen típus eseményekkel találkozunk a leggyakrabban. A szignatúrájukban két paraméter van egy Sender (ami általában object de ez nem feltétlenül kell, hogy így legyen), és egy EventArgs, ami az esemény – elvileg – lényegi információját hordozza. 41
3-2-19 kódrészlet: „Mintaszerű” esemény konverziója var source = Observable .FromEventPattern(this, "PointerPressed");
Jelen esetben az alkalmazás aktuális oldalának (this) PointerPressed eseményére iratkoztunk fel, mely a Click-nek felel meg a Windows 8 Windows Store alkalmazásokban. Ha a var kulcsszó felé visszük az egeret, láthatjuk, hogy az adatfolyamunk típusa IObservable<EventPattern>. Ez az EventPattern típus egy ugyanolyan csomagolóosztály, mint amivel az előző példában is dolgoztunk, csak már elő van készítve és kényelmesen a kezünkbe adva. Tartalmazza az eseményt kiváltó objektumnak a referenciáját (Sender) és az esemény argumentumát (EventArgs). Ennek megfelelően most megint alakítsuk át ideiglenesen a feliratkozást, hogy valami értelmes információt láthassunk a klikkelésről.
3-2-20 kódrészlet: Feliratkozás a „PointerPressed Adatfolyamra” source.ObserveOnDispatcher().Subscribe(value => { var position = value.EventArgs.GetCurrentPoint(this).Position; Console.WriteLine("OnNext: " + "X: " + position.X + " Y: " + position.Y); });
Természetesen ennek a függvénynek is jó pár verziója van még, de nagy valószínűséggel ezzel fogunk a legtöbbet találkozni. Mindenképpen fontos megjegyezni mindkét esemény konverter függvénynél, hogy az általuk generált adatfolyamok végtelen hosszúak, sosincs lezáró üzenet, tehát az idővonaluk valahogy így néz ki:
3-2-12 ábra: Eseményből konvertált adatfolyam idővonala
FromAsync A C# 5.0 előtt az aszinkron programkódok írása nem volt egyszerű feladat. Két fő típusa volt az aszinkron programozási sablonoknak, az egyik a BeginXXX/EndXXX metóduspáros, a másik pedig az XXXAsync metódus és Completed eseményes megoldás. Ezek közül a ténylegesen jól használható verziót az előbbi jelentette, és ehhez készítettek is egy konverter függvényt. 42
A BeginXXX/EndXXX metóduspárossal az volt a probléma, hogy amint két-három ilyen aszinkron hívást megpróbáltunk egymás után láncolni, nagyon terjedelmessé és nehezen átláthatóvá vált a kód. Amennyiben valami csoda folytán ilyen kóddal találkozunk, akkor használjuk a Task.Factory.FromAsync() metódust arra, hogy Task alapú aszinkron hívássá alakítsuk. Ugyan van az Rx-nek saját FromAsyncPattern() függvénye is az ilyen típusú aszinkron programozási minta támogatására, de az az ajánlás, hogy a fent említett módszerrel konvertáljuk át az új Task alapú aszinkron műveletté. Ezt pedig például a következőképpen tudjuk megtenni: 3-2-21 kódrészlet: BeginXXX/EndXXX programozása minta Task-ká alakítása Func ToSquare = i => i * i; var ToSquareAsync = Task.Factory.FromAsync( (num, cb, obj) => ToSquare.BeginInvoke(num, cb, obj), (iar) => ToSquare.EndInvoke(iar), 10, null);
Így most már békében beszélhetünk a FromAsync() konverter függvényről. A FromAsync() függvény mindösszesen egy olyan funkciót vár paraméterül, mely vagy nem vár semmilyen paramétert, vagy egy CancellationToken-t vár és visszaad egy Task-ot, mely ugyebár az aszinkron hívást reprezentálja. Készítsünk hát egy egyszerű aszinkron funkciót. 3-2-22 kódrészlet: Aszinkron hatványozás Func> ToSquareAsync = async i => i * i;
A konverter függvényt pedig a következőképpen használhatjuk rajta: 3-2-23 kódrészlet: Aszinkron művelet konverziója Rx adatfolyammá var source = Observable.FromAsync(() => ToSquareAsync(10));
Mivel ez egy nagyon általános feladat (lényegében Task-ből IObservable-be konvertálás), így kapunk hozzá egy még rövidebb elérést is, mely a következőképpen néz ki: 3-2-24 kódrészlet: Aszinkron művelet konverziója Rx adatfolyammá egyszerűbben var source = ToSquareAsync(10).ToObservable();
43
Ha ezt lefuttatjuk, akkor láthatjuk, hogy kapunk egy OnNext: 100 majd egy OnCompleted üzenetet a konzolban, azaz az aszinkron hívásoknak egyetlen értékük van, és ha azt megkapjuk, akkor le is zárul az adatfolyam. Természetesen ha valami kivétel keletkezne az aszinkron hívás során, akkor a lezáró üzenet az ezen kivételt tartalmazó OnError üzenet lenne.
3-2-13 ábra: Aszinkron művelet idővonala Amit még észrevehetünk a kódban, hogy “bele van égetve” a függvényhívás paramétere. Jobb volna helyette egy olyan függvényt csinálni, ami fogadja az eredeti aszinkronhívás paramétereit, majd visszaadja az átkonvertált IObservable-t. Erre sajnos nem kapunk beépített megoldást, nekünk kell megbirkóznunk ezzel a kemény feladattal. 3-2-25 kódrészlet: Aszinkron művelet becsomagolása Func> ToSquareObservable = num => ToSquareAsync(num).ToObservable());
Ezzel a konkrét felhasználás már csak ennyiből áll: 3-2-26 kódrészlet: A csomagolófüggvény felhasználása var source = ToSquareObservable(10);
Hot és Cold Observable Képzeljük el azt a szituációt, hogy ugyanazon forrásra több megfigyelőt is fel szeretnénk iratkoztatni. Az elvárás valószínűleg az volna, hogy mindegyik megfigyelőnk ugyanakkor és ugyanazokat az értékeket kapja. Ez azonban nem feltétlenül van így. Vegyük például a következő kis példát ahol egy egyszerű Interval()-ra feliratkozunk az egyik megfigyelőnkkel, majd direkt egy kis késleltetéssel egy másik megfigyelővel.
44
3-2-27 kódrészlet: Késleltetett feliratkozás ugyanarra a forrásra var source = Observable.Interval(TimeSpan.FromSeconds(1)); source.ObserveOnDispatcher().Subscribe(value => Console.WriteLine("#1 OnNext: " + value)); await Task.Delay(3000); source.ObserveOnDispatcher().Subscribe(value => Console.WriteLine("#2 OnNext: " + value));
Amint láthatjuk a konzolunkon, a második megfigyelőnk három másodperc késleltetéssel becsatlakozik, viszont nem az aktuális értékeket kapja, hanem előröl kezdi neki adagolni a forrás az értékeket. Az ilyen típusú adatfolyamot nevezzük Cold Observable-nek. Ezzel ellentétben, ha például egy eseményből konvertált adatfolyamra csatlakoznánk fel több megfigyelővel, különböző időpontokban, akkor is az aktuális értékeket kapnánk mindegyik megfigyelőben. Ezt pedig Hot Observable-ek nevezzük. Tipikusan az olyan jellegű adatfolyamok, melyeknek nincs beazonosítható kezdőpontja – már a feliratkozás előtt is aktív volt – azokat Hot Observable-nek nevezzük, míg azokat az adatforrásokat, melyeknek beazonosítható kiindulópontja van és ezzel együtt olyan viselkedést produkál, hogy minden megfigyelőnek előröl kezdi el szolgáltatni az adatokat, Cold Observable-nek nevezzük. Ahhoz, hogy egy Cold Observable-t Hot Observable-é alakítsuk, van egy metóduspárosunk, a Publish() és a Connect(). Annak érdekében tehát, hogy több megfigyelő is ugyanazon az adatforráson osztozzon, a Publish() operátort kell használnunk, ekkor az eredeti IObservable adatfolyamunkból egy IConnectableObservable keletkezik, mely egészen pontosan akkor kezdi el publikálni az üzeneteit, amikor meghívjuk rajta a Connect() metódust. A Connect() metódus meghívásáig senki nem kap semmilyen adatot, azután viszont mindenki mindig ugyanazt kapja, közös forráson osztoznak a megfigyelők.
3-2-28 kódrészlet: Hot Observable készítése var source = Observable.Interval(TimeSpan.FromSeconds(1)).Publish(); source.Connect(); source.ObserveOnDispatcher().Subscribe(value => Console.WriteLine("#1 OnNext: " + value)); await Task.Delay(3000); source.ObserveOnDispatcher().Subscribe(value => Console.WriteLine("#2 OnNext: " + value));
Figyeljük meg, hogy mi történik, ha a source.Connect() kódsort ott hagyjuk, ahol van, és mi történik akkor, ha például a három másodperces várakozás (await Task.Dealy(3000)) után rakjuk.
45
Természetesen az imént levezetett problémának megvan a másik oldala is, amikor valami okból kifolyólag szeretnénk egy eredetileg Hot Observable-ből Cold Observable-t csinálni. Erre a Replay() operátort fogjuk tudni használni, mely egészen pontosan ugyanarra az analógiára húzható rá, mint a Publish() - Connect(), azaz a Connect() metódus meghívása után kezdi el megjegyezni a rajta átfolyó adatokat a rendszer, és aztán minden egyes feliratkozó megfigyelőnek el kezdi kiadni előröl az értékeket.
3-2-29 kódrészlet: Cold Observable készítése var source = Observable .FromEventPattern(this, "PointerPressed") .Select(evt => evt.EventArgs.GetCurrentPoint(this).Position) .Replay(); source.Connect(); source.Subscribe(value => Console.WriteLine("#1 OnNext: " + value)); await Task.Delay(3000); source.Subscribe(value => Console.WriteLine("#2 OnNext: " + value));
Futtassuk az alkalmazást és figyeljük meg, hogy amit az első 3 másodpercben összeklikkelgetünk, azt a második késleltetetten feliratkoztatott megfigyelő egyben megkapja. Ha például beraknánk egy késleltetést az első megfigyelő elé is, akkor ugyan nem látnánk eredményét annak, hogy klikkelgetünk a képernyőn, azonban, mikor megtörténik az első számú megfigyelő feliratkoztatása, az megkapja egyből a Connect() metódus hívása óta rögzített adatokat is.
Subjectek A Subject egy olyan speciális típus, mely egyszerre megfigyelő (IObserver) és megfigyelhető (IObservable). A Subject tipikusan valahol az adatfolyam közepén helyezkedik el a lényegi adatforrás(ok) és a megfigyelők között egyfajta proxy szerepet betöltve. Több fajta Subject is létezik, melyek különböző viselkedési modellel rendelkeznek arra vonatkozóan, hogy a rá feliratkozó megfigyelőknek miképpen szolgáltat adatokat.
Subject A beépített Subject-ek legegyszerűbb verziója a “sima” Subject. Ez nem más, mint egy egyszerű Hot Observable. Bármilyen adatforrást, amit ő megfigyel, egy Hot Observable-ként oszt tovább az őt megfigyelők felé.
46
3-2-30 kódrészlet: Subject használata több figyelővel var interval = Observable.Interval(TimeSpan.FromSeconds(1)); var subject = new Subject(); interval.Subscribe(subject); subject.ObserveOnDispatcher().Subscribe(value => Console.WriteLine("#1 OnNext: " + value)); await Task.Delay(3000); subject.ObserveOnDispatcher().Subscribe(value => Console.WriteLine("#2 OnNext: " + value));
Ugyan ez is egy jó példa a működésének szemléltetésére, de én úgy gondolom, hogy nem elhanyagolható pozitívum a felépítéséből adódó lehetőség, hogy teljesen manuálisan is pumpálhatjuk az adatokat az adatfolyamba az OnNext(), OnCompleted() és OnError() metódusok hívásával. 3-2-31 kódrészlet: Manuális üzenetek a Subjecten keresztül var subject = new Subject<string>(); subject.OnNext("1"); // Ezt nem fogjuk megkapni subject.Subscribe( value => Console.WriteLine("OnNext: " + value), () => Console.WriteLine("OnCompleted")); subject.OnNext("2"); // Ezt megkapjuk subject.OnNext("3"); // Ezt megkapjuk subject.OnCompleted(); subject.OnNext("4"); // Ezt már nem kapjuk meg
ReplaySubject A ReplaySubject az előzővel ellentétben pont, hogy egy Cold Observable. Olyannyira hideg, hogy még ha egy eredetileg Hot Observable-höz is kapcsoltuk hozzá, azt is kihűti. Mindenre emlékszik attól a pillanattól kezdve, amikor őt feliratkoztattuk az általa megfigyelt forrás(ok)ra. Másképp megfogalmazva, emlékszik mindenre, ami átfolyt rajta. 3-2-32 kódrészlet: ReplaySubject var subject = new ReplaySubject<string>(); subject.OnNext("1"); // Ezt megkapjuk subject.Subscribe(value => Console.WriteLine("#1 OnNext: " + value)); subject.OnNext("2"); // Ezt megkapjuk subject.Subscribe(value => Console.WriteLine("#2 OnNext: " + value)); subject.OnNext("3"); // Ezt megkapjuk
47
Érdemes megfigyelni a konzol tartalmát. Mikor az első megfigyelőnket feliratkoztatjuk, az megkapja a feliratkozás előtt megjelent elemet, majd értelemszerűen megkapja a másodikat is, ezután feliratkoztatjuk a második megfigyelőnket, aki megkapja visszamenőleg az első és második elemet, és végül a harmadikat már egyszerre kapja meg a két megfigyelő.
BehaviorSubject A BehaviorSubject olyan speciális működéssel bír, hogy mindig csak a legutolsó elemre emlékszik az adatfolyamból. Annak érdekében, hogy akkor is kaphassunk valamilyen értéket, amikor még nem is jelent meg valójában érték az adatfolyamban, létrehozáskor meg kell adni egy alapértéket. Ezután bármikor is iratkozunk fel az adatfolyamra, rögtön a feliratkozás pillanatában meg fogjuk kapni a legutolsó elemet a feliratkozás előtt vagy az alapértéket, utána pedig gyakorlatilag egy Hot Observable-ként fog már viselkedni. 3-2-33 kódrészlet: BehaviorSubject var subject = new BehaviorSubject<string>("0"); subject.OnNext("1"); // Ezt nem fogjuk megkapni subject.OnNext("2"); // Ezt megkapjuk subject.Subscribe(value => Console.WriteLine("OnNext: " + value)); subject.OnNext("3"); // Ezt is megkapjuk subject.OnNext("4"); // Ezt is megkapjuk
Ha nem teljesen tiszta a működése, érdemes eljátszani azzal, hogy a feliratkozást mozgatjuk a sorok között és megfigyeljük, hogy miképpen változik a konzol tartalma. A lényeg még egyszer: ez annyiban különbözik egy sima Hot Observable-től, hogy a feliratkozás előtti utolsó értéket (vagy az alapértéket) is megkapjuk rögtön feliratkozáskor.
AsyncSubject Az AsyncSubject típusnak nem sok köze van az újonnan bevezetett async kulcsszóhoz. Működése roppant egyszerű: bármi is történt az adatfolyamban, mi csak egyetlen értéket fogunk kapni, méghozzá a legutolsó értéket (vagy OnError üzenetet) az adatfolyam lezárása előtt (OnCompleted / OnError). 3-2-34 kódrészlet: AsyncSubject var subject = new AsyncSubject<string>(); subject.OnNext("1"); // Ezt nem fogjuk megkapni subject.Subscribe(value => Console.WriteLine("OnNext: " + value)); subject.OnNext("2"); // Ezt nem fogjuk megkapni subject.OnNext("3"); // Ezt megkapjuk subject.OnCompleted();
48
LINQ Azzal, hogy az események sorozatát egy ilyen “csomagolótípus” segítségével adatfolyamokként reprezentáljuk, lehetővé válik, hogy a konkrét adattól függetlenül különböző operátorokat használjunk, akárcsak az IEnumerable típusnál. Az Rx ereje lényegében ezekben az operátorokban rejlik és az elkövetkezendő jó néhány oldalon ezeket fogom bemutatni.
Projektor operátorok Select Az egyik legáltalánosabb feladat a lekérdezéseknél a projekció. Ez azt a műveletet jelenti, amikor a bemeneti adat típusát valamilyen művelettel megváltoztatjuk. Ezzel az operátorral jellemzően egy komplex típus valamely tulajdonságát vagy tulajdonságait választjuk ki, hogy végül már csak a releváns adatokkal kelljen dolgozni, mint például amikor egy PointerMoved eseményből kicsomagoljuk a számunkra fontos információt, jellemzően a koordinátákat. 3-3-1 kódrészlet: Select operátor var source = Observable .FromEventPattern(this, "PointerMoved") .Select(evt => evt.EventArgs.GetCurrentPoint(this).Position);
SelectMany Sokszor adódik olyan szituáció, hogy egy olyan adatforrásunk van, ami egy lista, ami listákat tartalmaz, nekünk pedig egy nagy listára volna szükségünk, ami a belső listák összefűzéséből adódik. A SelectMany() operátor egészen pontosan ezt oldja meg számunkra. Ha például van egy webszolgáltatás, amitől egy lekérdezés eredményét oldalanként kapjuk vissza, akkor ezzel az operátorral ezeket a csomagokat egybefűzhetjük. Az alábbi példakód ennek az operátornak a használatát mutatja. 3-3-2 kódrészlet: SelectMany operátor var webService = new Func> (page => Enumerable.Range(page * 5, 5)); var source = Observable.Range(0, 5) .Select(num => webService(num)) .SelectMany(page => page);
49
Annak érdekében, hogy a lehető legegyszerűbben szimuláljuk a webszolgáltatás hívást, kell egy egyszerű metódus, ami kap egy számot (oldalszámot), és visszaadja az oldal tartalmát, ami nem más, mint a számok növekvő sorrendben ötösével (01234, 56789, stb). A következő feladat, hogy ezt a webszolgáltatást a lehető legkisebb erőfeszítéssel meghívjuk, tehát szükség van a Range() generátorfüggvényre, ami elszámol nullától négyig (ami azt reprezentálja, hogy öt oldalnyi adatot kérdezünk le). Ez után a Select() operátorral felhasználva ezeket a számokat, meghívjuk a webszolgáltatást és lekérdezzük a megfelelő oldalakat. Ezen a ponton egy IObservable> típusú adatfolyamunk van. Nekünk azonban arra lenne szükségünk, hogy a csomagok össze legyenek fésülve, ezért ráeresztjük a SelectMany() operátort, mely ezt megteszi, és ezután már egy IObservalbe adatfolyamunk lesz és az eredmény nem 5 darab 5 elemű lista, hanem 1 darab 25 elemű lista (adatfolyam) lesz.
Amb Míg a SelectMany() “kiteríti” a többszörösen egymásba ágyazott adatfolyamokat, addig az Amb() operátorral lehetőség van kiválasztani azt amelyikben a leghamarabb jelenik meg egy elem. Leginkább azt az analógiát lehetne erre a működésre ráhúzni, amikor egy csoportnyi embernek felteszünk egy kérdést, de csak annak a válasza érdekel, aki a leghamarabb reagál. 3-3-3 kódrészlet: Amb operátor var source = Observable.Amb(source1, source2);
3-3-1 ábra: Amb operátor idővonala
Switch Ennek az operátornak a segítségével lehetőség van arra, hogy egy többszörösen egymásba ágyazott adatfolyamból mindig a legutoljára megjelent adatfolyam értékeit kapjuk meg. Elég jó példája ennek a keresési javaslatok felajánlása. A felhasználó ír valami szöveget egy szövegdobozba, mi pedig megpróbálnánk mindig az aktuálisan beírt szöveg vagy szövegrészlet alapján a kedvében járni és 50
keresési javaslatokat feldobni neki. Ehhez azonban jellemzően egy aszinkron műveletre van szükség, melyet minden betű leütésekor útjának indítunk, az eredmények pedig egyáltalán nem biztos, hogy ugyanabban a sorrendben érkeznek be, így elképzelhető, hogy mire a felhasználó beírja a keresőbe, hogy “Budakalász”, még mindig a “Buda” kezdetű szavakra próbálnánk keresési javaslatokat felajánlani. Az ilyen és hasonló problémák (versenyhelyzetek) megoldására alkalmas a Switch() operátor, mely biztosítja, hogy mindig az adatfolyamban megjelent legutolsó beágyazott adatfolyamot figyeljük csak és a korábbiakat pedig figyelmen kívül hagyja. Hasonló, mint a SelectMany(), csak épp mindig csak a legutolsó beágyazott adatfolyam értékeit figyeli. 3-3-4 kódrészlet: Switch operátor var source = Observable.FromEventPattern(SearchField, "TextChanged") .Select(e => (e.Sender as TextBox).Text) .Select(q => GetObservableSuggestion(q)) .Switch();
3-3-2 ábra: Switch operátor idővonala Az ábra tehát egy olyan többszörösen egymásba ágyazott adatfolyamot jelképez, melynek az első beágyazott adatfolyamában az (0, 1, 2, 3, 4) számok jelennek meg valamennyi idő alatt, a második adatfolyamában a (10, 20, 30, 40, 50), míg a harmadikban a (100, 200, 300, 400, 500) számok. Mikor az első “al-adatfolyam” megjelenik a fő adatfolyamon, azon elkezd megjelenni az 1, majd a 2, ám közben megjelenik a második beágyazott adatfolyam is így az eredmény-adatfolyamon innentől kezdve már annak az értékeit olvashatjuk ki (10, 20). Végül pedig megjelenik a harmadik beágyazott adatfolyam is, melynek következtében az eredmény adatfolyamban már csak annak az újabb értékeit láthatjuk. Fontos meglátni, hogy mikor egy-egy újabb beágyazott adatfolyam megjelenik, akkor a korábbinak a további értékeit már nem fogjuk megkapni. 51
Szűrő operátorok Where A projekció mellett a másik leggyakoribb felhasználása a LINQ lekérdezéseknek a szűrés. Ez azt a műveletet jelenti, amikor egy forráslistának csak azon elemeire van szükségünk, amik egy bizonyos feltételnek megfelelnek. A Select() operátornál használt pointerkövető példából kiindulva könnyedén lehet egy olyan lekérdezést csinálni, ami a felületet 100x100-as régiókra bontja, és azt érzékeli, hogy a mutató melyik régióban van. 3-3-5 kódrészlet: 100x100-as régiók elkészítése a Select operátorral var source = Observable .FromEventPattern(this, "PointerMoved") .Select(e => e.EventArgs.GetCurrentPoint(this).Position) .Select(p => new Point((int)(p.X / 100), (int)(p.Y / 100)));
Most, hogy az alapok le vannak fektetve, ezen az adathalmazon lehet szűrést végezni. Például mondhatjuk azt, hogy minket csak azok az események érdekelnek, amikor az egér valamelyik átlós irányú régióban mozog (a régió X és Y koordinátája megegyezik). Mi sem egyszerűbb, fűzzünk hozzá egy sort a lekérdezéshez. 3-3-6 kódrészlet: Szűrés az átlós régiókra a Where operátorral var source = Observable .FromEventPattern(this, "PointerMoved") .Select(e => e.EventArgs.GetCurrentPoint(this).Position) .Select(p => new Point((int)(p.X / 100), (int)(p.Y / 100))) .Where(p => p.X == p.Y);
Az operátor működését a következő ábra szemlélteti:
3-3-3 ábra: Where operátor idővonala
52
Distinct, DistinctUntilChanged Az előbb tárgyalt Where() szűrőfeltételnek köszönhetően, most már csak arról kapunk értesítést, hogy ha a mutató valamelyik átlós régióban mozog. Mi ezzel a probléma? Amíg az egér egy régión belül mozog, rengeteg eseményt kapunk, és ezeknek az eseményeknek a tartalma megegyezik. Mit lehet ezzel tenni? A lekérdezésekben jellemzően az ilyen jellegű duplikátum értékeket a Distinct() operátorral szoktuk kiszűrni. Ez azonban egy olyan operátor, ami az egész adatfolyamra nézve szűri meg a duplikátumokat, azaz, ha a mutató járt már egy régióban, utána soha többé nem kapunk róla értesítést, ha visszatér bele. Ennek a problémának a megoldására van a DistinctUntilChanged() operátor, mely csak az egymás mellett lévő duplikátumokat szűri ki az adatfolyamból.
3-3-7 kódrészlet: DistinctUntilChanged operátor var source = Observable .FromEventPattern(this, "PointerMoved") .Select(e => e.EventArgs.GetCurrentPoint(this).Position) .Select(p => new Point((int)(p.X / 100), (int)(p.Y / 100))) .Where(p => p.X == p.Y) .DistinctUntilChanged();
Ennek az operátornak a működését a következő ábra szemlélteti:
3-3-4 ábra: DistinctUntilChanged operátor idővonala
Skip LINQ lekérdezéseknél jellemzően ez (és a Take()) az operátor az, ami egy nagy adathalmazt több kis részre bont. Rx-nél az adatfolyam jellege miatt azonban egy kicsit több lehetőség adódik, és a megszokottnál több féle művelet végrehajtására is használható. A Skip() operátorral lehetőség van átugrani valamennyi elemet vagy időtartamot az adatfolyam kezdetéhez viszonyítva. Azaz megmondhatjuk, hogy minket nem érdekel az első 5 elem, vagy az első 5 másodpercben keletkezett elem.
53
3-3-8 kódrészlet: Skip operátor var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Skip(5);
3-3-5 ábra: Skip operátor idővonala Ennek a párja a SkipLast() operátor, mely az adatfolyam végéhez viszonyítva ugorja át az utolsó X elemet, vagy az utolsó Y másodpercben keletkezett elemet. Mivel ennek az operátornak tudnia kell, hogy mikor van vége az adatfolyamnak, így ennek a kimenetén csak akkor jelennek majd meg értesítések, ha a forrás adatfolyam már lezárult. 3-3-9 kódrészlet: SkipLast operátor var source = Observable.Interval(TimeSpan.FromSeconds(1)) .SkipLast(3);
3-3-6 ábra: SkipLast operátor idővonala A SkipUntil() operátorral lehetőségünk van arra, hogy megadjunk egy abszolút időpontot, ameddig nem vagyunk kíváncsiak az adatfolyam elemeire, vagy átadjunk egy másik adatfolyamot, mondván, hogy addig nem vagyunk kíváncsiak az eredeti adatfolyam elemeire, amíg a másik adatforráson meg nem jelenik egy elem. 3-3-10 kódrészlet: SkipUntil operátor var source = Observable .Interval(TimeSpan.FromSeconds(1)) .SkipUntil(DateTimeOffset.Now.Add(TimeSpan.FromSeconds(5)));
54
3-3-7 ábra: SkipUntil operátor idővonala Végül pedig a SkipWhile() operátorral arra van lehetőség, hogy addig lépkedjük át az elemeket, amíg az átadott feltétel teljesül. Ez nem egyezik meg a sima Where() operátorral, ugyanis amint a feltétel egyszer nem teljesül, onnantól kezdve minden további elemét megkapjuk az adatfolyamnak, akár teljesülne rá a feltétel, akár nem.
3-3-11 kódrészlet: SkipWhile operátor var source = Observable.Interval(TimeSpan.FromSeconds(1)) .SkipWhile(num => num % 2 == 0);
3-3-8 ábra: SkipWhile operátor idővonala
Take A Take() operátor pontosan a Skip() ellentétje. Míg a Skip() átugrik elemeket különböző feltételektől és paraméterektől függően, addig a Take() ugyanezen feltételek és paraméterek hatására visszaadja ezeket az elemeket. A "sima" Take() operátor tehát visszaadja az első X elemet, vagy az első Y másodpercben keletkezett elemet. 3-3-12 kódrészlet: Take operátor var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Take(5);
55
3-3-9 ábra: Take operátor idővonala A TakeLast() operátor ennek kiegészítéseként az utolsó X elemet, vagy az utolsó Y másodpercben keletkezett elemet adja vissza. 3-3-13 kódrészlet: TakeLast operátor var source = Observable.Interval(TimeSpan.FromSeconds(1)) .TakeLast(3);
3-3-10 ábra: TakeLast operátor idővonala A TakeUntil() operátorral egy abszolút időpontot adhatunk meg amíg szükségünk van a folyam elemeire, vagy egy másik adatfolyamot adhatunk át, mondván, addig van szükségünk az eredeti forrás elemeire, amíg az átadott folyamon meg nem jelenik egy új elem.
3-3-14 kódrészlet: TakeUntil operátor var source = Observable .Interval(TimeSpan.FromSeconds(1)) .TakeUntil(DateTimeOffset.Now.Add(TimeSpan.FromSeconds(5)));
3-3-10 ábra: TakeUntil operátor idővonala 56
Végül pedig a TakeWhile() operátorral addig szedjük ki az elemeket az adatfolyamból, amíg a megadott feltétel teljesül. 3-3-15 kódrészlet: TakeWhile operátor var source = Observable.Interval(TimeSpan.FromSeconds(1)) .TakeWhile(num => num % 2 == 0);
3-3-11 ábra: TakeWhile operátor idővonala Ez a két operátor (Skip() és Take()) természetesen keverhető is, azaz könnyedén megmondhatjuk, hogy nekünk a 3-as indextől (negyedik elemtől) kezdve 4 elemre van szükségünk. 3-3-16 kódrészlet: Skip és Take operátorok ötvözése var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Skip(3) .Take(4);
3-3-12 ábra: Skip és While operátorok ötvözésének idővonala
Sample Ez az operátor lehetővé teszi, hogy mintavételezzük az adatfolyamot. Például ha statisztikai információkat gyűjtünk az egér pozíciójáról a képernyőn, nem feltétlenül szeretnénk másodpercenként 100+ értéket lementeni, hanem megelégszünk másodpercenként egy értékkel. 57
Ennek a megvalósításában a Sample() operátor lesz segítségünkre, amivel ezt a logikát igen könnyedén meg tudjuk valósítani. 3-3-17 kódrészlet: Sample operátor var source = Observable .FromEventPattern(this, "PointerMoved") .Select(e => e.EventArgs.GetCurrentPoint(this).Position) .Sample(TimeSpan.FromSeconds(1));
Időintervallum megadása mellett pedig lehetőség van egy másik adatfolyamot is megadni. Ilyen esetben a másik adatfolyamon megjelenő elemek fogják a mintavételezési időpontokat jelenteni.
Kiválasztó operátorok Ezek az operátorok egyetlen elem kiválasztására használatosak.
First Az adatfolyam legelső elemét a FirstAsync() operátorral tudjuk kiválasztani, mely egy olyan adatfolyamot produkál, amiben egyetlen elem lesz, ami pedig értelem szerűen a forrás adatfolyam első eleme. Ennek az operátornak átadhatunk opcionálisan egy feltételt is, így az első olyan elemet fogja visszaadni, ami megfelel a megadott feltételnek. Ez az operátor kivételt dob, ha egyetlen elem sincs az adatfolyamban lezáráskor. Ezen probléma orvoslására használhatjuk a FirstOrDefaultAsync() operátort, mely üres adatfolyam esetén a típus alapértelmezett értékét fogja visszaadni (referencia típusoknál null, struktúráknál pedig az alapérték, int-nél például 0).
3-3-18 kódrészlet: FirstAsync operátor var source = Observable.Interval(TimeSpan.FromSeconds(1)) .FirstAsync();
3-3-13 ábra: FirstAsync operátor idővonala
58
Last A LastAsync() operátor a FirstAsync() "ellentétje", ugyanis az adatfolyam lezárása (OnCompleted esemény) előtti utolsó elemet vagy az utolsó olyan elemet adja vissza, ami megfelel az átadott feltételnek. Természetesen a LastAsync() operátornak is megvan a LastOrDefaultAsync() párja, mely az esetleges üres adatfolyam problémáját orvosolja. 3-3-19 kódrészlet: LastAsync operátor var source = Observable.Range(0, 5) .LastAsync();
3-3-14 ábra: LastAsync operátor idővonala
ElementAt Amennyiben a keresett elem egy adott indexen helyezkedik el (ami nem az első és nem az utolsó a folyamban), akkor ezzel az operátorral azt megtalálhatjuk. A First() és Last() operátorokkal ellentétben, ennek a teljes neve nem ElementAtAsync(), hanem csak simán ElementAt(). Ez az operátor is rendelkezik az *OrDefault() verzióval. 3-3-20 kódrészlet: ElementAt operátor var source = Observable.Interval(TimeSpan.FromSeconds(1)) .ElementAt(2);
3-3-15 ábra: ElementAt operátor idővonala
59
Single A SingleAsync() operátorral arra az esetre készülhetünk fel, hogy ha kifejezetten egy elemet várunk az adatfolyamban. Ha ennél kevesebb vagy több eleme van az adatfolyamnak, akkor kivételt dob. Ennek az operátornak is van *OrDefault verziója. 3-3-21 kódrészlet: SingleAsync operátor var source = Observable.Return(42) .SingleAsync();
3-3-16 ábra: SingleAsync operátor idővonala
DefaultIfEmpty Ezzel az operátorral kifejezetten arra készülhetünk fel, ha a forrás adatfolyam nulla elemű. Ebben az esetben a típusnak megfelelő alapértéket, vagy egy explicit módon megadott alapértéket fogunk az adatfolyam végén kapni. 3-3-22 kódrészlet: DefaultIfEmpty operátor var source = Observable.Empty() .DefaultIfEmpty(42);
3-3-17 ábra: DefaultIfEmpty idővonala
60
StartWith Ez az operátor nem annyira illik ebbe a csoportba, hiszen az eredménye nem egy egy elemű adatfolyam, azonban ezt felhasználva a korábbiak előtt, lényegében adhatunk az adatfolyamnak egy alapértéket. Ez után persze nem túl sok értelme van egy FirstAsync() operátort használni, hiszen az azonnal visszatérne az épp előtte beállított alapértékkel. Egy Last() vagy Single() operátor esetében azonban hasznos kiegészítést adhat. Emellett ennek az operátornak van egy kiemelten hasznos funkcionalitása. Gondoljunk bele, hogy jellemzően hogyan néz ki egy olyan logika egy oldalon, ahol valamilyen tulajdonság értékétől függően (például orientáció) akarjuk az oldal vizuális állapotát beállítani. Valószínűleg lesz egy metódus, ami paraméterül várja az aktuális állapotot, és az oldal konstruktorában vagy Loaded eseményében meghívjuk ezt a metódust az aktuális értékével a figyelt tulajdonságnak, és feliratkozunk valamiféle Changed eseményre, melynek az eseménykezelőjében a megváltozott értékkel hívjuk meg a metódust. 3-3-23 kódrészlet: Alkalmazás méretének figyelése hagyományos módon SetVisualState(ApplicationView.Value); Window.Current.SizeChanged += (snd, arg) => SetVisualState(ApplicationView.Value);
Rx esetében ez úgy néz ki, hogy elkészítjük az adatfolyamot a megfelelő eseményből, és a StartWith() operátorral lekérdezzük az aktuális értékét a figyelt tulajdonságnak, így az adatfolyamra feliratkozva azonnal megkapjuk az aktuális értéket, és természetesen a későbbiekben a majdani változásokat. 3-3-24 kódrészlet: Alkalmazás méretének megfigyelése Rx-szel var source = Observable.FromEventPattern(Window.Current, "SizeChanged") .Select(_ => ApplicationView.Value) .StartWith(ApplicationView.Value);
3-3-18 ábra: StartWidth operátor idővonala
61
Nem feltétlenül kevesebb a kód, de mindenképpen olvashatóbb, elegánsabb és például ebben az esetben könnyedén kiszűrhetőek a fals értesítések egy DistinctUntilChanged() operátorral.
Matematikai operátorok Ezekkel az operátorokkal lehetőségünk van a teljes (már lezárult) adatfolyamon matematikai műveleteket végezni, úgy, mint maximum –és minimum keresés, átlagszámítás, összegzés, illetve az elemek megszámlálása. Ezeket a műveleteket rendre a Max(), Min(), Average(), Sum() és Count() operátorokkal tudjuk elvégezni. Példaként álljon itt a Max() operátor kódja és folyamatábrája. 3-3-25 kódrészlet: Matematikai operátorok (Max) var source = Observable.Range(0, 5).Max();
3-3-19 ábra: Matematikai operátorok idővonala Az érdekessége ezeknek a műveleteknek, hogy nem csak szám típusú adatfolyamokon használhatóak, hanem tetszőleges adatfolyamon. Amennyiben viszont nem számok vannak az adatfolyamban, szolgáltatnunk kell egy transzformációs függvényt, ami az elemeket valamilyen számszerű adattá alakítja.
Időzítéssel kapcsolatos operátorok Delay A Delay() operátor a Timer() első verziójának egy általánosításaként fogható fel, mely arra használható, hogy annak az adatfolyamnak az értékei, amihez hozzá van kapcsolva, késleltetve jelenjenek meg annak a bizonyos csővezetéknek a végén. De ez csak egyszeri késleltetést jelent, gyakorlatilag bármikor is jelentek volna meg a szóban forgó szekvencia elemei, ennek a megjelenésnek az időpontja el lesz tolva egy megadott mennyiségű idővel. Most éppen az egyszerűség kedvéért egy Interval()-lal generált adatfolyamhoz fogom hozzácsatolni, és késleltetni fél másodperccel. Ezzel azt is láthatjuk, hogy csak később jelennek meg az elemek, de nem az elemek megjelenése közti idő fog megváltozni.
62
3-3-26 kódrészlet: Delay operátor var source = Observable.Interval(TimeSpan.FromMilliseconds(100)); .Delay(TimeSpan.FromMilliseconds(500));
3-3-20 ábra: Delay operátor idővonala
Throttle A Throttle() (magyarul lefojtás) egy olyan operátor, mellyel lehetőségünk van visszafogni az adatfolyam végén megjelenő adatok gyakoriságát. Vegyük alapul a következő kis példát. Szeretnénk statisztikai adatokat gyűjteni arról, hogy a felhasználó merre mozgatja az egerét. Mivel az egér az USB szabvány szerint 125Hz-es frissítést produkál, valószínűleg nem szeretnénk az összes koordinátát elmenteni ahol csak az egeret érzékeltük az alkalmazásban, hanem mondjuk azokat a pontokat szeretnénk, ahol fél vagy egy másodpercnél hosszabb ideig állt egy helyben a kurzor. Nyilván ilyen jellegű statisztikákból ki tudjuk találni, hogy a képernyő mely részén pihen a legtöbbet az egér, és ezen adatokból (és egyéb statisztikák összevonásából) optimalizálhatjuk a felhasználói felület elrendezését. Na de a lényeg: készítsük el az egér figyelését. 3-3-27 kódrészlet: Egér pozíciójának figyelése Rx segítségével var source = Observable.FromEventPattern(this, "PointerMoved") .Select(e => e.EventArgs.GetCurrentPoint(this).Position); source.ObserveOnDispatcher().Subscribe(value => { var position = value.EventArgs.GetCurrentPoint(this).Position; Console.WriteLine(string.Format( "OnNext: X:{0:0.00}, Y:{1:0.00}", position.X, position.Y)); });
Ezzel azonban még csak azt értük el, hogy iszonyatos mennyiségű OnNext üzenetet látunk a konzolunkban. Hogyan tudnánk “lecsitítani” ezt az adatfolyamot? Természetesen a Throttle() operátorral. 63
3-3-28 kódrészlet: Adatfolyam lefojtása var source = Observable.FromEventPattern(this, "PointerMoved") .Select(e => e.EventArgs.GetCurrentPoint(this).Position) .Throttle(TimeSpan.FromSeconds(1));
Innentől kezdve már csak akkor kapunk frissítést (természetesen mindig a legutolsót), ha az egér egy másodpercig nyugalomban van és egy helyben áll.
3-3-21 ábra: Throttle operátor idővonala
Hibakezelő operátorok Timeout A Timeout() operátor terminálja az adatfolyamot, ha két elem megjelenése között (vagy az első elem megjelenése előtt) egy megadottnál hosszabb idő telik el. Vegyünk egy egyszerű példát, ahol egy Timer() operátorral generálunk egy olyan adatfolyamot, melynek az első eleme 500ms késleltetéssel jelenik meg és adjuk hozzá a Timeout() operátort 250ms “türelmi idővel”, aminek következtében a program elindulása után 250ms-al kapni fogunk egy OnError üzenetet. 3-3-29 kódrészlet: Timeout operátor var observableTimer = Observable.Timer( TimeSpan.FromMilliseconds(500), TimeSpan.FromMilliseconds(100)); var source = observableTimer.Timeout(TimeSpan.FromMilliseconds(250));
3-3-22 ábra: Timeout operátor idővonala 64
Retry A Retry() operátorral lehetőség van arra, hogy ha hiba keletkezik az adatfolyamban, akkor azt nyelje el, és iratkozzon fel újra a forrásra. Ezt, ha nem paraméterezzük fel, akkor addig próbálkozik, amíg egyszer sikerrel nem jár, vagy megadhatjuk azt is, hogy maximum hányszor próbálkozzon újra. Jellemzően webszolgáltatás hívásoknál lehet ez hasznos, hiszen lehetséges, hogy valami okból kifolyólag az egyik hívás eredménye nem jár sikerrel vagy kifut az időből, de a következő már sikeres lehet. Ilyenkor természetesen a csatorna végén feliratkozva semmit nem látunk abból, hogy az a webszolgáltatás-hívás hányszor futott ki az időből és próbálkozott újra, csak azt látjuk, hogy a leges leg végén a folyamatnak végül sikerült e valami használható értékkel visszatérnie a hívásnak vagy nem. Az imént leírtakat a következő kódrészlettel lehet szimulálni. 3-3-30 kódrészlet: Retry operátor használata egy hibára hajlamos forráson var failingWebService = new Func>(async () => { var random = new Random(); // Random Response Time await Task.Delay(random.Next(500)); // Random Exception if (random.Next(100) < 50) throw new Exception(); return "Siker"; }); var source = failingWebService() .ToObservable() .Timeout(TimeSpan.FromMilliseconds(250)) .Retry(3);
A szimulált webszolgáltatás 50% eséllyel fog kifutni az időből és további 50% eséllyel fog kivételt doni. A Retry() operátor háromszor fog újrapróbálkozni. Ha mindháromszor kivétel keletkezik, akkor a csatorna OnError üzenettel fog lezárulni egyetlen érték nélkül, egyéb esetben pedig egy értékkel (“Siker”) és egy OnCompleted üzenettel.
OnErrorResumeNext Ezzel az operátorral lehetőség van arra, hogy ha az eredeti adatfolyamban valami hiba keletkezik, akkor észrevétlenül átváltson egy ugyanolyan típusú másik adatfolyamra. Például ha van valamilyen tartalom az interneten, amit le akarunk tölteni, és ez több helyre is tükrözve van, akkor elő lehet 65
készíteni mindegyik elérhetőséghez egy webszolgáltatás hívást (Rx módra), és aztán felsorolni OnErrorResumeNext() operátorokkal őket. Ez az operátor csak akkor aktiválja magát, ha az előtte lévő adatfolyamban hiba történik. Ilyenkor a hibát elnyeli, és a forrást lecseréli a megadott másodlagos forrásra. Ennek megfelelően, ha többet egymás után fűzünk, akkor az első akkor lép életbe, ha az eredeti forrásban hiba történik, a második akkor, ha a másodlagos forrás is csődöt mond, és így tovább. 3-3-31 kódrészlet: OnErrorResumeNext operátor var source = source1.OnErrorResumeNext(source2);
3-3-23 ábra: OnErrorResumeNext operátor idővonala
Catch Ez az operátor tud úgy is viselkedni, mint az OnErrorResumeNext(), de itt már lehetőségünk van arra is, hogy elkapjuk a keletkezett hibát, és annak függvényében válasszuk ki a csere-adatfolyamot. Ezt az operátort nem szabad összekeverni a feliratkozásnál megadható OnError csatorna figyelőjével, ugyanis ott már valóban a csatorna legvégét szemléljük, itt pedig szolgáltatni kell egy új adatfolyamot, hogy a folyam ne szakadjon meg. Ez az operátor is elnyeli az OnError üzenetet és helyette egy alternatív adatfolyamot szúr be. 3-3-32 kódrészlet: Catch operátor var source = source1 .Catch((TimeoutException ex) => source2) .Catch((Exception ex) => source3);
66
Finally Ezzel az operátorral lehetőség van valamilyen feladatot végrehajtani akkor, amikor az adatfolyam lezárul akár természetesen (OnCompleted), akár valamilyen hibával (OnError).
Logikai operátorok Ezekkel az operátorokkal különböző, egész adatfolyamra vonatkozó feltételeket tudunk megvizsgálni, eredményük minden esetben egy logikai (bool) érték.
SequenceEqual Ezzel az operátorral lehetőség van arra, hogy két adatfolyamot vagy egy adatfolyamot és egy listát összehasonlítsunk. Amennyiben azok tartalma megegyezik (beleértve az elemek sorrendjét is), igaz értéket fog visszaadni.
Contains Ezzel az operátorral azt tudjuk megvizsgálni, hogy egy adott elem benne van e az adatfolyamban.
IsEmpty Az operátor neve mindent elárul, azt tudjuk meg, hogy az adatfolyam üres e.
All Az All operátorral azt tudjuk leellenőrizni, hogy az adatfolyam összes eleme megfelel e az átadott kritériumnak.
Any Az Any operátorral azt tudjuk leellenőrizni, hogy van e akár egy olyan eleme is az adatfolyamnak, ami megfelel az átadott kritériumnak.
Ismétlő operátorok Repeat A Repeat() operátor nagyon hasonlít a hibakezelésnél tárgyalt Retry() operátorhoz, azzal a különbséggel, hogy ez normál véget érés esetén iratkozik fel újra a forrásra.
67
DoWhile A DoWhile() operátor a hagyományos while ciklus Rx-es verziója. Megadhatunk egy feltételt, és amíg az teljesül és nem keletkezik valamilyen lekezeletlen hiba az adatfolyamban, addig normál véget érések esetén újra és újra visszairatkozik a forrásra.
Monitorozó operátorok Do Ez az operátor megfigyelheti, de nem módosíthatja az adatfolyamot. Ennek az operátornak a segítségével lehetőség van arra, hogy az adatfolyam különböző részeit megfigyeljük, monitorozzuk. Hasznos lehet például látni az egyes elemek értékét és megjelenésének időpontját. Többek közt az ilyen jellegű megfigyelések elősegítésére szolgálnak a következő operátorok.
Timestamp Ez az operátor hozzáfűzi az adatfolyam értékeihez megjelenésük időpontját.
TimeInterval Ez az operátor hozzáfűzi az elemekhez az előző elem megjelenése óta eltelt időt.
Materialize és Dematerialize Ezzel az operátorpárossal lehetőség van arra, hogy típusos formába öntsük az adatfolyam különböző csatornáit. Lényegében kapunk egy Notification típusú objektumot, amibe be van csomagolva az OnNext, OnCompleted, és az OnError csatorna, és ennek az objektumnak a tulajdonságait tudjuk figyelni az egyes csatornák helyett.
Összefűző operátorok Ezeknek az operátoroknak a segítségével lehetőség nyílik több adatfolyam összefűzésére különböző logikák alapján.
Concat Ez az operátor egyszerűen egymás után rakja a forrás-adatfolyamokat. Az első adatforrásnak sikeresen le kell zárulnia (OnCompleted) ahhoz, hogy a másodikból elkezdje szedni az elemeket. 3-3-33 kódrészlet: Concat operátor var source = source1.Concat(source2);
68
3-3-24 ábra: Concat operátor idővonala
Merge Ennek az operátornak a felhasználásával azonos típusú adatfolyamokat tudunk összefésülni, melynek eredménye egy olyan adatfolyam, amin a két forrás elemei fognak megjelenni ugyanabban az időpontban, mint amikor a forrásban megjelent volna. 3-3-34 kódrészlet: Merge operátor var source = source1.Merge(source2);
3-3-25 ábra: Merge operátor idővonala
Zip A Zip() operátor már egy fokkal trükkösebb összefésülési logikával rendelkezik a Merge()-nél. Ez azt biztosítja, hogy időben bármilyen sorrendben is érkezzenek be az elemek, az eredmény adatfolyamban mindig párban fognak megjelenni a két forrásból érkező elemek, az egyik forrás első eleme a másik forrás első elemével, majd második a másodikkal “összecipzárazva” és így tovább. Fontos figyelembe venni, hogy az eredmény-adatfolyam csak annyi elemet fog tartalmazni, amennyit a kisebbik forrás-adatfolyam tartalmazott. 69
3-3-35 kódrészlet: Zip operátor var source = source1.Zip(source2, (s1, s2) => s1 + " " + s2);
3-3-26 ábra: Zip operátor idővonala
CombineLatest Ez az operátor a Zip()-hez hasonlóan elempárokat fog alkotni. Az elempárok ebben az esetben nem kötött sorrendben fognak érkezni, hanem mindig a forrás-adatfolyamok utolsó elemeinek kombinációját fogja visszaadni. 3-3-36 kódrészlet: CombineLatest operátor var source = source1.CombineLatest(source2, (s1, s2) => s1 + " " + s2);
3-3-27 ábra: CombineLatest operátor idővonala
70
Ablak operátorok Window és Buffer Az alap koncepciója ezeknek az operátoroknak az, hogy az eredeti adatforrás elemeit valamilyen logika alapján a sorrendjüket megtartva egymást követő vagy akár átfedésekkel is rendelkező csoportokra bontja. Az ablak széleinek meghatározására azonban sok féle lehetőségünk van. A legegyszerűbb, ha azt mondjuk, hogy ötösével szeretnénk az elemeket csoportba rendezni. 3-3-37 kódrészlet: Window operátor elemszámmal var source = dataStream.Window(5);
Ilyenkor paraméterként átadhatunk egy másik számot is, amivel azt határozzuk meg, hogy hány elem kihagyása után induljon egy új ablak. Ezzel lényegében lehetőségünk van átfedő, vagy „csúszó” ablakot létrehozni. 3-3-38 kódrészlet: Window operátor elemszámmal és átfedéssel var source = dataStream.Window(5, 1);
Ezzel tehát azt érjük el, hogy az adatfolyamban megjelenő minden elemre nyitunk egy 5 elemet „hosszúságú” ablakot. Ha mindezt a Buffer() operátorral tennénk, akkor eredményül egy olyan adatfolyamot kapnánk, amiben az első 4 elem után minden elem megjelenésekor megkapnánk az utolsó 5 elemét az adatfolyamnak egy listában. Ez tökéletes eszköz lehet például egy szenzorból érkező adatok szűrésére vagy simítására. Mondhatjuk azt is, hogy 5 másodperces ablakokra szeretnénk bontani az adatfolyamot. 3-3-39 kódrészlet: Window operátor időtartammal var source = dataStream.Window(TimeSpan.FromSeconds(5));
A korábbihoz hasonlóan itt is megadhatunk extra paraméterként egy plusz időtartamot, hogy milyen időközönként nyíljon új ablak. Emellett pedig egy számot is megadhatunk, amivel maximalizálhatjuk az ablak elemszámát, így egy olyan ablakot kapunk, ami például vagy 5 másodpercig vagy 5 elemig tart. Átadhatunk egy másik adatfolyamot is paraméterül, így az azon megjelenő elemek fogják időben az ablakok határát jelenteni. 71
3-3-40 kódrészlet: Window operátor eseményforrással var source = dataStream.Window(observableButtonClick);
A legáltalánosabb paraméterezésnél pedig külön adhatunk meg egy adatfolyamot az ablakok nyitásához és egy funkciót mely megkapja mindegyik ablaknyitó értéket és visszaad egy adatfolyamot, melyen amint megjelenik egy elem, bezárul az ablak.
3-3-41 kódrészlet: Window operátor nyitó –és záróeseményforrásokkal var observableAKey = Observable .FromEventPattern(this, "KeyDown") .Select(e => e.EventArgs.Key) .Where(k => k == VirtualKey.A); var observableBKey = Observable .FromEventPattern(this, "KeyDown") .Select(e => e.EventArgs.Key) .Where(k => k == VirtualKey.B); var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Window(observableAKey, _ => observableBKey);
Ebben a példában minden “A” betű lenyomáskor megnyílik egy ablak, és minden “B” betű lenyomásakor bezárulnak az addig megnyitottak. Ezt persze dinamikusabban is meg lehetne oldani, hogy például ablaknyitó karakterek lehessenek a [1, 2, 3] karakterek és az „1”-et az „A” zárhassa le, a „2”-t az „S”, a „3”-at pedig a „D”.
72
3-3-42 kódrészlet: Window operátor nyitó –és záróeseményforrásokkal – v2 var openingKeys = new List(); openingKeys.Add(VirtualKey.Number1); openingKeys.Add(VirtualKey.Number2); openingKeys.Add(VirtualKey.Number3); var closingKeys = new Dictionary(); closingKeys.Add(VirtualKey.Number1, VirtualKey.A); closingKeys.Add(VirtualKey.Number2, VirtualKey.S); closingKeys.Add(VirtualKey.Number3, VirtualKey.D); var observableOpeningKey = Observable.FromEventPattern(this, "KeyDown") .Select(e => e.EventArgs.Key) .Where(k => openingKeys.Contains(k)); var closingKeySelector = new Func>(ok => { return Observable.FromEventPattern(this, "KeyDown") .Select(e => e.EventArgs.Key) .Where(k => k == closingKeys[ok]); }); var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Window(observableOpeningKey, ok => closingKeySelector(ok));
Az alfejezet címe az, hogy Window és Buffer. A két operátor messziről nézve megegyezik, azonban van egy jelentős eltérés a kettő működésében. Míg a Window() az ablakokat adatfolyamként adja vissza, így azt már a megnyílásától kezdve figyelhetjük, addig a Buffer() az ablakok lezárulásakor adja vissza azok tartalmát egy listában.
Scan és Aggregate Ezekkel az operátorokkal egy akkumulátor elem segítségével aggregálhatjuk az adatfolyam elemeit. Ez az akkumulátor elem bármi lehet. Lehet egy szám, amibe az adatfolyam összegzését tudjuk vezetni, vagy lehet egy lista, aminek a segítségével lementhetjük az adatfolyam történetét és átlagolást, mediánszámítást vagy tetszőleges műveletet végezhetünk rajta. A Scan() és az Aggregate() között (hasonlóan a Window() és Buffer() közti eltéréshez) az a különbség, hogy míg a Scan() minden új elem megjelenésekor meghívja az átadott műveletet és az eredményét rögtön ki is értékeli, addig az Aggregate() csak az adatfolyam lezárásakor megy végig az adatfolyamon és adja vissza végül az aggregálás eredményét. 3-3-43 kódrészlet: Scan operátor var source = Observable.Range(0, 5) .Scan((accumulator, actual) => accumulator += actual);
73
3-3-28 ábra: Scan operátor idővonala
3-3-44 kódrészlet: Aggregate operátor var source = Observable .Range(0, 5) .Aggregate((accumulator, actual) => accumulator += actual);
3-3-29 ábra: Aggregate operátor idővonala
GroupBy A GroupBy() operátorral lehetőség van egy adatfolyamot valós időben csoportokra bontatni. A csoportosítás hagyományos listák esetében egy olyan struktúrát szokott eredményezni, ahol az egyszerű egyszintű listából valamilyen logika alapján egy kétszintű listát csinálunk, ahol az első szint a csoportokat tartalmazza, melyekben pedig találunk egy kulcsot (a csoportot “leíró” tulajdonság) és az adott csoportba tartozó elemeket. Egy egyszerű példa a számok “páros” és “páratlan” csoportokba osztása a következőképpen néz ki: 3-3-45 kódrészlet: GroupBy operátor használata egy hagyományos listán var groups = Enumerable .Range(0, 10) .GroupBy(n => n % 2 == 0 ? "Páros" : "Páratlan");
Ez tehát egy olyan kollekciót fog eredményezni, mely két elemből áll: a páros elemeket tartalmazó csoportból és a páratlan elemeket tartalmazóból.
74
Rx esetében mindez annyiban különbözik, hogy az elemek nem állnak rögtön rendelkezésre, hanem minden egyes elem megjelenésekor értékelődik ki, hogy az beletartozik e egy már meglévő csoportba vagy új csoportot kell neki nyitni. 3-3-46 kódrészlet: GroupBy operátor használata egy Rx-es adatfolyamon var groups = Observable.Range(0, 10).GroupBy(n => n % 2 == 0 ? "G1" : "G2");
Ezt pedig egy IObservable> típusú visszatérési értékkel tudja nekünk a rendszer biztosítani. A külső IObservable a csoportok adatfolyamát jelöli, a belső pedig azok tartalmát. Tehát amint megjelenik egy elem az eredeti adatforrásban, az azonnal létre fog hozni magának egy csoportot, amire aztán fel lehet iratkozni és várni, hogy a későbbiekben megjelenő elemek ebbe a csoportba kerülnek e. Lényegében majdnem ugyanazt érjük el, mintha tudatosan több adatforrást hoznánk létre a Where() operátor segítségével, csak itt a létrejövő csoportok száma nincs “beleégetve” a kódba, hanem teljesen dinamikusan jönnek létre. Az imént említett példa esetében ez még nem jelentene problémát, mert két lehetséges csoportunk van (páros/páratlan), akár kézzel is előkészíthetnénk őket, de ha szavakat szeretnénk csoportosítani a kezdőbetűjük alapján, akkor már egy kicsit kellemetlenebb lenne kézzel definiálni 20-30-40-50 (angol abc, számok, speciális karakterek, stb.) változót. Az operátorhoz tartozó szemléltetődiagram a következőképpen néz ki:
3-3-30 ábra: GroupBy operátor idővonala
75
Ütemezők Mikor Rx-el dolgozunk, alapvetően nem igazán tudunk róla, hogy épp milyen szálon (vagy szálakon) futnak az adatfolyamunk különböző részei, ha épp valamilyen időzítés/késleltetés van, akkor azt valószínűleg egy háttérszálon fogja végezni, annak érdekében, hogy azzal ne a UI szálat blokkolja, vagy ha egy aszinkron szervizhívás eredményére várunk, az is háttérszálon fog történni. Arról azonban nem mi nyilatkozunk explicit módon, hogy mi hol fusson, mennyire legyen párhuzamosítva, akárcsak például a LINQ-nál, ahol hozzáadva a lekérdezés forrásához, egy AsParallel() operátort, a lekérdezés párhuzamosítva fog lefutni. Ezek mellett pedig azzal a problémával is szembe tudjuk találni magunkat, ha több forrásból párhuzamosan jönnek adatok, akkor azoknak a szinkronizálását miképpen végezzük el, hogyan kezeljük a konkurenciát. Ezeknek a problémáknak a megoldására vezették be az ütemezőket, melyek segítségével – szükség esetén – szabályozni tudjuk, hogy a műveletek mikor és hol (melyik szálon) hajtódjanak végre. Az ütemezők mind megvalósítják az IScheduler interfészt, mely tartalmaz egy DateTimeOffset típusú Now nevű tulajdonságot, és három verzióját a Schedule() metódusnak. A Now tulajdonságon keresztül van a műveleteknek “időérzéke”. Azaz ha készítünk egy megfelelő logikájú ütemezőt, akkor átverhetjük a rendszert (pl tesztelés céljából) és egy valójában 5 év hosszú eseményláncot lefuttathatunk 5 másodperc alatt. A Schedule() függvény pedig a végrehajtandó feladatot fogja megkapni paraméterül és azt, hogy mikor kell ezt neki végrehajtania. A legtöbb Rx operátor opcionálisan fogadni tud egy IScheduler típusú paramétert. Ennek a segítségével tudjuk az alapbeállításokat felülbírálni és adott esetben optimalizálni a kódot. Az ütemezőket tehát a következőképpen tudjuk felhasználni: Egy operátor paramétereként: 3-4-1 kódrészlet: TaskPoolScheduler var source = Observable.Range(0, 10, TaskPoolScheduler.Default);
Ennek következtében a generált értékek új szálon fognak megjelenni. Emellett van egy konkrét operátorunk is arra, hogy közbeiktassunk egy explicit szálváltást, mely nem más, mint az ObserveOn() operátor. Ennek paraméterül szintén egy IScheduler típusú objektumot adhatunk, és lényegében azt teszi, hogy az utána következő műveletet (ha az alapértelmezetten mást nem használ) a megadott kontextusba fogja helyezni. Tipikusan a felhasználói felület módosítása az amit nem tehetünk meg más szálról, így ha a soron következő műveletet szeretnénk, hogy a UI szálon fusson le, akkor az ObserveOn() művelettel erre explicit kérést tehetünk a következőképpen:
76
3-4-2 kódrészlet: CoreDispatcherScheduler – tereljük a végrehajtást a UI szálra var source = Observable.Interval(TimeSpan.FromSeconds(1)) .ObserveOn(CoreDispatcherScheduler.Default);
Mivel ez egy tényleg nagyon gyakori igény, így konkrétan erre a műveletre van egy még egyszerűbb operátorunk, az ObserveOnDispatcher(). A rendelkezésre álló beépített ütemezők tehát a teljesség igényével a következők:
CoreDispatcherScheduler Mikor egy olyan adatfolyam végén vagyunk, ahol nem tudhatjuk biztosra, hogy az pontosan melyik szálon is kóborol, és a végeredmény felhasználásával szeretnénk a felhasználói felületet frissíteni, ez az az ütemező, ami a segítségünkre lesz.
TaskPoolScheduler Ezzel arra kényszeríthetjük a műveletet, hogy egy TaskPool-ból vett háttérszálon hajtódjon végre. Ez a .NET beépített szálkezelő mechanizmusa.
ThreadPoolScheduler Ez annyiban különbözik a TaskPoolScheduler-től, hogy a WinRT szálkezelését használja.
ImmediateScheduler Ezzel arra kényszeríthetjük a folyamatot, hogy az aktuális szálon azonnal hajtsa végre a műveleteket.
HistoricalScheduler Ez a legutolsó a felsorolásban és ez egy elég speciális verzió. Ezzel lehetőségünk van átverni az Rx időérzékét és kedvünkre manipulálni az időt. Ezzel fogjuk tudni megoldani, hogy egy amúgy hosszú ideig tartó folyamatot pár másodperc alatt lefuttassunk. Ahhoz, hogy ezt látványosan szemléltessem, levezetek egy egyszerű példát, melyből minden világossá fog válni. Kezdjük a legelején a dolgot, hozzuk létre a HistoricalScheduler-t. 3-4-3 kódrészlet: HistoricalScheduler var hist = new HistoricalScheduler();
Ezzel megvan maga az ütemező, ami azonban még nem csinál semmit. A dolog trükkös része a következő két sorban rejlik: az idő manipulálása. 77
3-4-4 kódrészlet: Virtuális idő manipulálása var interval = Observable.Interval(TimeSpan.FromSeconds(1)); interval.Subscribe(_ => hist.AdvanceBy(TimeSpan.FromMinutes(30)));
Mi is történik itt pontosan? Létrehozunk az Interval() generátorfüggvénnyel egy adatfolyamot, mely másodpercenként ad egy új értéket, erre feliratkozunk és ennek következtében minden másodpercben előrepörgetjük az ütemezőnk óráját 30 perccel az AdvanceBy() metódus segítségével. Tehát azt szimuláljuk, hogy másodpercenként eltelik fél óra. Ez után egy egyszerű, majd egy fokkal bonyolultabb, de ennek megfelelően látványosabb példát fogok mutatni. Elsőnek hozzunk létre egy olyan adatfolyamot az Interval() függvénnyel, mely 20 percenként generál egy új elemet. 3-4-5 kódrészlet: 20 perc 1 másodperc alatt var longStream = Observable .Interval(TimeSpan.FromMinutes(20), hist) .ObserveOnDispatcher() .Subscribe(value => Console.WriteLine(value.ToString()));
Ott van tehát az Interval(), mely 20 percenként generál egy értéket és a fontos kis részlet, hogy átadtuk neki paraméterül az ütemezőnket. Innentől kezdve az Interval() operátor időérzéke a mi kezünkben van. Ez után pedig jön a szokásos feliratkozás, hogy lássunk is valamit az egészből. Ha futtatjuk az alkalmazást, láthatjuk, hogy bár elvileg 20 percenként kellene, hogy generálódjon egy új elem, mégis másodpercenként jelennek meg új elemek az adatfolyamban. Némileg realisztikusabb a következő – bonyolultabb – példa, ahol egy listányi előre elkészített elemből generálunk adatfolyamot. Készítsük el a segédosztályt. A kódot a 3-4-6 kódrészlet mutatja.
78
3-4-6 kódrészlet: Példaadat public class TestData { public TimeSpan Offset { get; set; } public string Data { get; set; } public TestData(TimeSpan offset, string data) { Offset = offset; Data = data; } }
Majd térjünk vissza az oldal Loaded eseményébe vagy az oldal konstruktorába és hozzunk létre ebből egy listát. 3-4-7 kódrészlet: Példaadatok „generálása” var testData = new List { new TestData(TimeSpan.FromMinutes(25), "Elso"), new TestData(TimeSpan.FromMinutes(45), "Masodik"), new TestData(TimeSpan.FromMinutes(135), "Harmadik"), new TestData(TimeSpan.FromMinutes(180), "Negyedik") };
Végül pedig csináljunk ebből egy olyan adatfolyamot, mely a tesztadatoknak megfelelő időeltolással helyezi be a megfelelő elemeket az adatfolyamba. Azaz 25, 45, 135 majd 180 perccel az indítás után.
3-4-8 kódrészlet: A példaadatok felhasználása és tesztelése var testDataStream = testData.ToObservable() .Select(data => Observable.Timer(data.Offset, hist) .Select(_ => data.Data)) .SelectMany(timer => timer) .ObserveOnDispatcher() .Subscribe(Console.WriteLine);
A trükk megint a Timer() használatánál van, amikor is átadjuk neki a HistoricalScheduler-ünket paraméterként. Amit érdemes megfigyelni, hogy az időzítés a mélyén úgy történik, hogy van a végrehajtandó feladatoknak egy időbélyege (a mi tesztadatunktól függetlenül), hogy mikor kellene azt végrehajtani. Emellett pedig normál esetben körülbelül milliszekundumonként pörgetve van az óra (1 79
milliszekundummal persze, tehát valós időben telik az idő) és aztán amint elhagyja az idő valamelyik betárazott feladat időpontját (pontos egyezést nem fogunk tudni kapni), akkor azt a feladatot végrehajtja. Ebből a működésből eredően történt az a korábbi példában, hogy harminc perccel állítgatjuk előre az órát, de 20 percenként “lövünk el” egy eseményt, így aztán az egyik fázisban egy a következőben pedig kettő elem fog megjelenni egyszerre. Ezt a hatást kikerülhetjük azzal, ha a másodpercenkénti 30 perces előreállítás helyett fél másodpercenként állítjuk előre 15 perccel a virtuális óramutatót, azaz kisebb szeletekre bontjuk az eseményteret, melyeken gyorsabban ugrálunk át. Ezt egészen addig le lehet tornászni, hogy 1 milliszekundumonként állítjuk előre a virtuális óramutatót például 1 perccel, ami ugyebár 60.000-szeres gyorsítást jelent, mégis az aktuális példánk tesztelésére tökéletesen alkalmas lehet. Érdemes emellett még figyelembe venni, hogy ezzel a módszerrel azt tudjuk elérni, hogy egy előre definiált (esetleg felvett) adatfolyamon gyorsan végigmenjünk, de egy “élő” adatfolyamot nem fogunk tudni felgyorsítani, elvégre csak a virtuális időre tudunk hatást gyakorolni, a valós időre nem.
Rx + Async A végére még extraként érdemes említést ejteni egy nagyon hasznos képességéről az Rx-nek. Mint azt a könyv legelején írtam, az az alapvető különbség a sima aszinkron hívások és az Rx között, hogy előbbinek egy visszatérési értéke van, utóbbinak pedig több is lehet. Azonban ott van a dolognak az az oldala, hogy előbbivel nagyon kényelmesen lehet programot írni az új async-await kulcsszavaknak köszönhetően. Mi volna, ha egy adatfolyam is “bevárható” volna? Mit jelentene ez egyáltalán? A következő pár sor a szemfülesek számára valószínűleg mindent el fog árulni. 3-5-1 kódrészlet: Adatfolyam „bevárása” // események másodpercenként var interval = Observable.Interval(TimeSpan.FromSeconds(1)); // számok 1-től 5-ig var range = Observable.Range(1, 5); // számok 1-től 5-ig másodpercenként adagolva var source = interval.Zip(range, (i, r) => i.ToString()); string result = await source; Console.WriteLine(result);
Azzal, hogy “bevárjuk” az adatfolyamot, azt érjük el, hogy megkapjuk az utolsó értékét az OnCompleted üzenet előtt. Fontos észben tartani, hogy utolsó értéke csak egy véges adatfolyamnak lehet, tehát például az Interval()-t egymagában megpróbálni bevárni nem volna túl bölcs gondolat. Emellett a másik fontos tudnivaló, hogy a bevárással a szekvencia utolsó elemét fogjuk megkapni, 80
vagy ha keletkezett kivétel, és az utolsó üzenet egy OnError üzenet volt, akkor egy kivételt fog dobni a bevárásnál, melyet egy hagyományos try-catch blokkal fogunk tudni elkapni. El lehet képzelni azt a szituációt – melyet a könyv elején is említettem – hogy van egy webszolgáltatás hívásunk, melyre rádobunk egy TimeOut() meg egy Retry() operátort, majd végül ugyanolyan kényelmesen, mintha csak egy sima aszinkron hívás volna, bevárjuk az await kulcsszó segítségével. 3-5-2 kódrészlet: Hibára hajlamos adatfolyam „bevárása” try { var result = await Observable.FromAsync(() => getWeatherAsync()) .Timeout(TimeSpan.FromSeconds(1)) .Retry(3); } catch { }
Vagy ha például van egy összetettebb szervizhívásunk, mely több részből áll, mert például a lekérdezett adat lapozva van és csak több részletben lehet lekérdezni... nem probléma, Rx-el összefésüljük, majd a legvégén egy darabban visszaadjuk az eredményt, mintha csak egy szimpla szervizhívás lett volna.
3-5-3 kódrészlet: Oldalakra bontott lekérdezés visszaadása egy darabban var result = await Observable.Range(0, 3) .Select(i => Observable.FromAsync(() => getBigDataPartAsync(i))) .SelectMany(oo => oo) // IObservable> feloldása .SelectMany(oe => oe) // IObservable> feloldása .ToList(); // A teljes adatfolyam tartalmának listába gyűjtése
Vagy például ha egy MessageDialog-hoz hasonló működést akarunk csinálni (megjelenítünk valamit, majd arra várunk, hogy nyomja meg valamelyik gombot a felhasználó és utána menjen tovább a végrehajtás)... nem probléma, a gombokhoz tartozó eseményeket megfelelően elkészítjük, majd egyszerűen megjelenítjük a dialógust és várunk az adott metódusban, hogy nyomja meg valamelyik gombot a felhasználó, és utána pedig kényelmesen egyetlen sorral lejjebb leellenőrizzük, hogy mit nyomott és mehet tovább a programkód végrehajtása.
81
3-5-4 kódrészlet: Hagyományos esemény „bevárása” var okEvent = Observable.FromEventPattern(okButton, "Click") .Select(_ => "OK") .FirstAsync(); var cancelEvent = Observable.FromEventPattern(cancelButton, "Click") .Select(_ => "CANCEL") .FirstAsync(); var dialogResult = await Observable.Amb(okEvent, cancelEvent); if (dialogResult == "OK") Console.WriteLine("Az OK gombra kattintott"); else if (dialogResult == "CANCEL") Console.WriteLine("A CANCEL gombra kattintott");
Összefoglalás Ebben a fejezetben kívül-belül megismerhettük az Rx-et. Megtanulhattuk, hogy hogyan tudunk a legkülönfélébb módokon adatforrásokat készíteni, vagy meglévő adatforrásokat (mint pl. aszinkron műveletek, események, listák, és „manuális” adatforrások) Rx formátumba konvertálni. Megtanulhattuk, hogy ezeken az adatfolyamokon milyen műveleteket tudunk végezni a LINQ operátorok segítségével és miképpen kombinálhatjuk őket. Megtekintettük az ütemezőket is, melyek az aszinkronitás és a vele járó problémák lekezeléséért felelnek. Végül pedig megnéztük, hogy miképpen képes az Rx a C# nyelvi elemeit kihasználni.
82