Osztott, skálázódó platform stream-feldolgozáshoz
http://s4.io Molnár András (
[email protected])
2012. május 11.
Motiváció ●
●
●
“The emergence of new applications such as real-time search, high frequency trading, and social networks is pushing the limits of what can be accomplished with traditional data processing systems. There is a clear need for highly scalable stream computing solutions that can operate at high data rates and process massive amounts of data. For example, to personalize search advertising, we need to process thousands of queries per second from millions of unique users in real-time, which typically involves analyzing recent user activity such as queries and clicks.” ●
●
Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk
“Other typical uses for computing over continuous streams of data include: listening for stock trading signals, watching for fraud in transactions, and monitoring process logs to look for signs of trouble.”
http://www.infoq.com/news/2010/11/yahoo-releases-s4
Motiváció ●
●
“With the surge of open source projects such as Hadoop, adoption of the MapReduce programming model has accelerated and is moving from the research labs into real-world applications as diverse as web search, fraud detection, and online dating. Despite these advances, there is no similar trend for general purpose distributed stream computing software. There are various projects and commercial engines, but their use is still restricted to highly specialized applications.” ●
Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk
Stream szempontok ●
“Analysis on streaming data should not rely on storing the data, as the amount of required disk space is unknown. Additionally, the processing of the data is likely to take longer than the rate of transmission would allow. Since the data is not stored, special algorithms must be developed for aggregating and analyzing data.” –
http://www.bytemining.com/2010/11/exciting-tools-for-big-data-s4-sawzall-and-mrjob/
Mit mond magáról? ●
Simple Scalable Streaming System –
“S4 is a general-purpose, distributed, scalable, partially fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data.”
–
“We aim to develop a high performance computing platform that hides the complexity inherent in parallel processing system from the application programmer.” –
http://incubator.apache.org/s4
Mit mond magáról? ●
●
●
“The drivers to read from and write to the platform can be implemented in any language making it possible to integrate with legacy data sources and systems.” “S4 was released by Yahoo! Inc. in October 2010 under the Open Source Apache 2.0 license” –
http://incubator.apache.org/s4
Mit mond magáról? - Áttekintés 1. ●
“proven” –
●
“decentralized” –
●
no single point of failure, egyenrangú node-ok
“scalable” –
●
@ Yahoo! - több ezer keresés / sec feldolg.
nincs felső határ, lineárisan nő a throughput új node-ok beillesztésekor
... –
http://incubator.apache.org/s4
Mit mond magáról? - Áttekintés 2. ●
...
●
“extensible” –
●
“cluster management” –
●
egyszerű API, kész alkalmazások ZooKeeper alapon
“partial fault-tolerance” –
ha egy szerver kiesik, egy stand-by szerver átveszi a feladatait – a szerver állapota elveszhet, de az új input adatok alapján beáll –
http://incubator.apache.org/s4
S4 vs. Hadoop & Map/Reduce ●
“We considered extending the open source Hadoop platform to support computation of unbound streams but we quickly realized that the Hadoop platform was highly optimized for batch processing. –
MapReduce systems typically operate on static data by scheduling batch jobs.
–
In stream computing, the paradigm is to have a stream of events that flow into the system at a given data rate over which we have no control.
–
The processing system must keep up with the event rate or degrade gracefully by eliminating events, this is typically called load shedding.
●
Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk
S4 vs. Hadoop & Map/Reduce ●
●
...
The streaming paradigm dictates a very different architecture than the one used in batch processing. Attempting to build a general-purpose platform for both batch and stream computing would result in a highly complex system that may end up not being optimal for either task.” –
●
It is worth mentioning that many real world systems implement a streaming strategy of partitioning the input data into fixed-size segments that are processed by a MapReduce platform. The disadvantage of this approach is that the latency is proportional to the length of the segment plus the overhead required to do the segmentation and initiate the processing jobs.”
Rather than trying to fit a square peg into a round hole we decided to explore a programming paradigm that is simple and can operate on data streams in real-time.” ●
Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk
- S4 is not “real-time Map/Reduce”!
További jellemzők ●
“Actors” programming model –
●
minimize latency –
●
●
encapsulation & location transparency using local memory of nodes and avoid disk I/O bottlenecks
lossy failover is acceptable –
állapot elvész, de az input stream-ből helyreállítható;
–
“downstream systems must degrade gracefully”
cluster nodes can NOT be added/removed while running
További jellemzők ●
● ●
“Elastic : computing load automatically gets distributed Expandable : a simple API has been provided Object Oriented : POJOs used for internode communication.” http://jayatiatblogs.blogspot.com/2011/02/introduction-to-s4.html
Másokhoz való viszony ●
“S4 represents a free- to low-cost alternative presently available proprietary real-time processing options like multiple IBM InfoSphere products and SAP’s new inmemory HANA appliance” –
●
2010. november 3. http://gigaom.com/cloud/is-yahoo-set-to-open-source-real-time-mapreduce/
IBM Stream Processing Core (SPC)-vel való összehasonlítás: –
SPC – subscription model
–
S4 – combination of actors model and Map/Reduce (no centralized control, simplicity) Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk
●
...
Másokhoz való viszony ●
●
“Such systems have been around for many years from vendors like StreamBase, Oracle, Tibco and Sybase as well as in some open source projects like Active Insight Esper and OpenESB. Many of these vendor systems have been folded into larger Enterprise Service Bus offerings.” “S4's sweet spot is in processing huge volumes of short-lived data where most of what the business wants is aggregation, not keeping every detail. The way S4 works, it keeps track of data locality and fault detection and lets the developer concentrate on only writing logic.” –
●
2010. november 23. http://www.infoq.com/news/2010/11/yahoo-releases-s4
“The S4 design is not new in the industry as it implements the Actor
framework. Erlang and Scala already have a similar implementation. But the power of mixing in Zookeeper and a pluggeable architecture can set S4 apart from previous frameworks.” 2011. február 27. http://jayatiatblogs.blogspot.com/2011/02/introduction-to-s4.html
Fejlesztés állása ●
alpha verzió v0.3.0, released in Aug 2011 –
●
●
This is an alpha version and should not be used for production. There's no guarantee of backwards-compatibility until the 1.0 release.
benchmarkról írnak a fejlesztők valós példákkal továbbfejlesztési irányok pl. garbage collection priorizálással...
Stream Model ●
Stream:
pl.
Stream Model ●
Stream: sequence of (KEY, VAL) data events –
KEY: tuple-valued key (optional)
–
VAL: attribute tuple
–
data events are typed (EV) ●
pl.
events are represented by Java objects
Processing Model
data event stream
http://www.slideshare.net/alekbr/s4-stream-computing-platform
Processing Model ●
Data events routed (emitted) to PEs –
●
●
●
(Processing Elements) with internal state
PEs consume events and –
emit other events consumed by other PEs
–
or publish results
events can be routed to appropriate PEs new PE instances can be created
http://www.slideshare.net/alekbr/s4-stream-computing-platform
Példa - WordCount
●
Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk
Példa - WordCount no key
random key
constant key
Processing Model ●
“Each instance of PE is uniquely identified by four components: –
it's functionality as defined by PE class and associated configuration
–
the types of events that it consumes
–
the keyed attribute in those events
–
the value of the keyed attribute in events which it consumes” http://www.slideshare.net/alekbr/s4-stream-computing-platform
●
●
Azaz minden kulcsértékre külön PE példány kell, ezt az S4 automatikusan létrehozza Keyless PE: az adott típus összes eseményét feldolgozza
Processing Model “Each instance of PE is uniquely identified by four components: PE prototype
●
–
it's functionality as defined by PE class and associated configuration
–
the types of events that it consumes
–
the keyed attribute in those events
–
the value of the keyed attribute in events which it consumes” http://www.slideshare.net/alekbr/s4-stream-computing-platform
●
●
Azaz minden kulcsértékre külön PE példány kell, ezt az S4 automatikusan létrehozza, protoípus alapján (új érték esetén) Keyless PE: az adott típus összes eseményét feldolgozza
Processing Model “Each instance of PE is uniquely identified by four components: PE prototype
●
–
it's functionality as defined by PE class and associated configuration
–
the types of events that it consumes
–
the keyed attribute in those events
–
the value of the keyed attribute in events which it consumes” http://www.slideshare.net/alekbr/s4-stream-computing-platform
●
Azaz minden kulcsértékre külön PE példány kell, ezt az S4 automatikusan létrehozza, protoípus alapján (új érték esetén)
●
Keyless PE: az adott típus összes eseményét feldolgozza
●
“Garbage collection is a challenge to the platform” pl. TTL rendelhető a PE-khez, azaz bizonyos idő után ha az adott kulcsértékkel nem érkezik adat, megszűnhet a PE - ekkor viszont elveszti állapotát!
Standard és Custom PE-k ●
●
Alapvető feldolgozási műveletek előre gyártott PE-kként megtalálhatók, csak konfigurálni kell, pl. –
io.s4.processor.AbstractWindowingPE
–
io.s4.processor.JoinPE
–
io.s4.processor.PrintEventPE
–
io.s4.processor.ReroutePE
–
io.s4.processor.SimpleCountingPE
Programozással bármilyen PE elkészíthető –
Java, Spring framework ●
–
io.s4.processor.AbstractPE
Eclipse project generation
PE programozás ●
“developers essentially implement two primary handlers: –
input event handler processEvent() - invoked for each incoming event of the types the PE has subscribed to
–
output mechanism output() - optional method that implements the output of PE to an external system. Can be configured to be invoked in a variety of ways – at regular time intervals t or on receiving n input events.”
PE programozás pl. Query számláló: az egyes query-ket hányszor adják ki (queryString)
http://www.slideshare.net/alekbr/s4-stream-computing-platform
PE programozás a kulcsot (queryString) itt már nem kell kezelni, hanem a konfigurációban kell megadni
10 percenként küldünk outputot (küldhetnénk minden eseménynél is vagy adott számú eseményenként)
Processing Model ●
PN – Processing Node –
A PE-k logikai futtató egységei (hostjai)
●
●
–
a lokális feldolgozást (PE hívást), esemény kezelést (fogadás, továbbítás) végzi
–
a Dispatcher továbbít lokálisan vagy más node-ok felé, particionál (hash)
Kommunikációs réteg –
cluster kezelés, fizikai-logikai node megfeleltetés, failover kezelés
–
API input események küldésére
–
választható hálózati protokoll – garantált vagy nem garantált adatküldés (a vezérlő események mindig ganratáltak, az adat esetén beállítható)
http://www.slideshare.net/alekbr/ s4-stream-computing-platform
ZooKeeper – koordináció / vagy “red-button” mode (single computer)
Adapter ●
●
“The events being fed to the S4 cluster for processing need to be translated into S4 compatible events and similarly the events received from an S4 cluster have to be made understandable to the client. The Client I/O Stub solves this purpose whereas [e.g. JSON or other conversion] the Adapter injects events into the S4 cluster and receives from it via the Communication Layer.” ●
●
http://jayatiatblogs.blogspot.com/2011/02/introduction-to-s4.html
Adapter
http://docs.s4.io/manual/client_adapter.html
Minta alkalmazás ●
TwitterTopicCount –
“This sample application listens to the Twitter Spritzer and keeps track of the top 10 hash tags” ●
●
●
●
–
TwitterFeedListener: converts JSON from to Java events for S4 to use TopicExtractorPE: pulls hash tags out of each tweet and creates one new event per hash tag (key) TopicCountAndReportPE (one per hash tag) counts the number of times its hash tag has been seen, emitting a new event with the hash tag and the count single TopNTopicPE consumes all hash tag counts and keeps a sorted list of the top 10
Működéséhez korrigálni kell! ●
●
(TwitterFeedListener.java)
http helyett https kell a twitter urlString-ben log4j basic configuration kell a main()-be: org.apache.log4j.BasicConfigurator.configure();
Telepítés, minta futtatás ●
Ld. http://docs.s4.io/tutorials/getting_started.html
●
gradlew build rendszer
●
Pl. ~/s4 könyvtárba telepítve: –
cd ~/s4/s4/build/s4-image/
–
export S4_IMAGE=`pwd`
–
cd ~/s4/twittertopiccount/build/install/twitter_feed_listener
–
export TWIT_LISTENER=`pwd`
–
$S4_IMAGE/scripts/start-s4.sh -r client-adapter & [s4 indítás]
–
$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 \ -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml & [adapter indítás]
–
$TWIT_LISTENER/bin/twitter_feed_listener \ <
> <>
[esemény betöltés indítás]
–
cat /tmp/top_n_hashtags
–
tail -f ~/s4/s4/build/s4-image/s4-core/logs/s4-core/s4-core_<>.log
–
kill `ps x|grep s4|awk '{ print $1 }'`
Benchmark ●
S4 cikkbeli benchmark #1: –
Streaming click-trhough rate (CTR) computation ● ●
●
●
●
●
estimate of the probability of a user will click on an item sliding window of 24 hours – splitted into 1hrs / 5mins slots aggregated and further processed . If a PN fails, state is lost and the aggregate is substituted with longterm estimates 250 000 users / day, 2 weeks, peak event rate 1600/sec, 16 servers with 4-4 32-bit procs & 2 GB RAM 3% improvement in CTR computation (detecting low quality ads quickly and filtering them out) +offline stress test ...
Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk
Benchmark ●
S4 cikkbeli benchmark #2: –
On-line parameter optimization ●
process: measurement – for the duration of a slot – comparator – significant differences are remarked – optimizer – adaption strategy 200 000 user / day, 2 weeks, –
● ●
revenue increase by 0,25 % and click yield 1,4 %
●
Neumeyer et al.: “S4: Distributed Stream Computing Platform” cikk
Saját tapasztalatok ●
Egyelőre csak egy gépen, egy node-dal
●
Zookeper nélkül (“redbutton mode”)
●
Minta alkalmazások futtatása
●
... folyt.köv...
Összefoglalás - benyomások ●
Elsőre ígéretesnek tűnik
●
Új paradigma, de nem teljesen
●
Még alfa verzió
●
A Yahoo van mögötte
●
Nem nagyon találtam még rá konkrét hivatkozást, alkalmazást
●
Ügyelni kell arra, hogy “lossy failover is acceptable” –
●
●
azaz node kieséskor vagy PE felszabadításkor rövid távú adatok veszhetnek (ált.trendet vsz. nem befolyásolja)
Kipróbálás: Minta alkalmazásig jutottam egyelőre, –
telepítése egy gépre könnyű és problémamentes volt
–
a minta alkalmazás hibáit javítani kellett
Kísérletezni kellene vele akár több gépen, később konkrét nagy adatos feladattal stb.
Itt a vége.
Köszönöm a figyelmet!