# Storm & RedStorm
# Distributed Realtime Computation in Ruby
# Jason Morrison
# December 11, 2012

---
# Data processing systems
# Presenter Notes
* design of data processing systems
* interesting case: 1 processing machine is not sufficient
* use cases like web/engagement analytics, app logging and performance analytics logging, financial/ecommerce information, physical sensor data
* could be for many reasons: high ingest volume, expensive computation, or tight latency requirements
* even if your processing system isn't large, storm does a very good job of documenting and exposing clean primitives and abstractions that are at play in these systems, and are valuable to understand
---
# DFS + map-reduce
* Queries are functions over all data
* Queries typically executed in batch mode, offline
* Results are high-latency
# Presenter Notes
* One popular approach for large-scale data processing systems is to dump the
incoming data into a distributed filesystem like HDFS, run offline map-reduce
queries over it, and place the results into a data store so your apps can read
from it.
* If you need to store all of your data and you need to execute queries which
span large time-frames, or you don't know the queries up front, then batch
mode is a great fit.
However, there are plenty of use cases where these parameters don't quite fit.
---
# Design considerations
* Value of low latency
* Data retention needs
* Adhoc vs known queries
* Timeframe of queries
# Presenter Notes
Consider, when:
* low latency is valuable
* you know queries ahead of time
* query domain covers small time windows
then a stream processing model can allow you to get at your answers in a much
faster, and cheaper way.
* Instead of storing data and then executing batch queries over it, what if we
persist the query and run the data through it?
---
# Queues & workers

# Presenter Notes
* First, I want to examine a typical approach to assembling a realtime system:
Hands up, ever written a system made of queues+workers to process incoming data?
* architecture here is
* an incoming stream of data, from a web frontend or work queue or subscription to streaming API
* cluster of queues persisting that data as jobs
* cluster of workers taking jobs off the queue, working them, maybe persisting out to some store,
and then emitting their processing results forward into the next queue layer
* then more workers, maybe more persistence
Then you'll know there are some challenges...
* data processing guarantees & fault tolerance
* queues impose latency between layers
* when one worker sends to another, impose a 3rd party between worker layers, where message has to go to disk
* without another layer of abstraction, coding is tedius - spending your time thinking about where to write message to, where do I read messages from, how do I serialize messages, etc.
* this is the kind of system that was in place in the product (Backtype) that became Twitter analytics
---
# Twitter analytics

---
# Storm

# Presenter Notes
* released september 2011 at StrangeLoop
* >4,000 stars, >500 forks. most starred Java project on github.
* Used by: Groupon, The Weather Channel, Alibaba, Taobao (ebay-alike, Alexa #11)
---
# Design goals. Storm should:
* Guarantee data processing
* Provide for horizontal scalability
* Be fault tolerant
* Be easy to use
# Presenter Notes
* Guaranteed data processing: Choose whether, in the face of failure, data can be lost, or must be processed at-least once, or exactly once
* Horizontal scalability: Distribute your processing across a cluster, tweak parallelization of computations and the allocation of cluster resources to match your workload
* Fault tolerance: If there are any errors or when nodes fail, system should handle this gracefully and keep running
* Easy to use: As the application developer, focus on writing your computation, not infrastructure like message serialization, routing, and retrying.
* So, how is it put together?
---
# Storm primitives
* Streams
* _Components_
* Spouts
* Bolts
* Topologies
---
# Stream

# Presenter Notes
* core abstraction of storm
* unbounded sequences of tuples
* tuples are named list of values
* dynamically typed, use any type by providing serializer
* streams are how that the parts of your computation talk to one another, they are the message passing
---
# Spout

# Presenter Notes
* sources of streams
* typically:
* read from a pubsub/kestrel/kafka queue
* connect to a streaming API
* emits any number of output streams
---
# Bolt

# Presenter Notes
* process any # input streams
* produce any # output streams
* where computation happens
* functions, filters, aggregations, streaming joins, talk to DBs...
---
# Topology

# Presenter Notes
* network of spouts and bolts, connected by streams
* describe parallelism and field groupings
---
# Parallelism: _logical view_

# Presenter Notes
* nothing to do with hardware
* 4 components, 1 spout, 3 bolts. #/tasks per.
---
# Parallelism: _physical view_

# Presenter Notes
* 3 worker nodes
* multiple worker processes on each node (JVM process)
* tasks are threads, many tasks per thread
---
# Stream grouping

* Shuffle grouping
* Fields grouping
* All grouping
# Presenter Notes
* since spouts and bolts are parallel, when a tuple is emitted on a stream, which tasks does that tuple go to?
* decides how to partition the stream of messages
* shuffle grouping: pick a random task, evenly distribute
* all grouping: send to all tasks
* global grouping: pick task with lowest id (all tuples go to same task)
* fields grouping: for persistent/stateful algorithms, send similar data to the same worker process.
mod hashing on a subset of tuple fields
* let's take a look at some code...
---
# RedStorm

# Presenter Notes
* Storm is written for the JVM in Java and Clojure
* there are several ways to integrate across languages (namely: JVM-native, thrift for topos, "multilang components" json-over-stdio shelling for spouts/bolts)
* RedStorm lets you write topologies and components in JRuby
---
# Example time!

# Presenter Notes
* RandomSentenceSpout
* SplitSentenceBolt
* WordCountBolt
---
# Data processing guarantees
Tuple tree:

# Presenter Notes
* now that we've seen...
* tuple tree
* rooted at the spout, with a single tuple
* as bolts emit new tuples, they are anchored to the input tuple
* ack
* after a bolt finishes, it acks its input tuple
* after a whole tree is acked, the root tuple is considered processed
* this provides at-least-once semantics
* you can build exactly-once processing semantics on top using transactions
---
# Cluster deployment

# Presenter Notes
* local mode vs cluster mode
* 3 sets of nodes
* Nimbus node: master node, similar to hadoop jobtracker
* upload computations here
* launches and coordinates workers, &
* moves them around when worker nodes fail
* (not HA yet, but nimbus failure doesn't affect workers, so low-pri. HA is on roadmap.)
* ZooKeeper nodes:
* separate apache project
* cluster coordination
* store configuration, serve as distributed lock service for master election, etc.
* Supervisor nodes:
* talk to nimbus via ZK to decide what to run on the machine
* start/stop worker processes as necessary, as dictated by nimbus
* visibility: storm-ui
---
# Storm UI

# Presenter Notes
* what topologies are running on your cluster
* error logs
* details statistics about each topology
* for every spout tuple, what's the avg latency until whole tuple tree is completed
* for every bolt, avg processing latency
* for every component, throughput
* deployment: storm-deploy to automate deploment and provisioning on ec2
---
# storm-deploy
!yaml
# /path/to/storm-deploy/conf/clusters.yaml
nimbus.image: "us-east-1/ami-08f40561" # 64-bit ubuntu
nimbus.hardware: "m1.large"
supervisor.count: 2
supervisor.image: "us-east-1/ami-08f40561" # 64-bit ubuntu
supervisor.hardware: "c1.xlarge"
#supervisor.spot.price: 1.60
zookeeper.count: 1
zookeeper.image: "us-east-1/ami-08f40561" # 64-bit ubuntu
zookeeper.hardware: "m1.large"
# Presenter Notes
* 1-click deploy tool for deploying clusters on AWS
* configure your cluster
* provides nice things like
* spot pricing
* ganglia for resource usage monitoring
* then configure your AWS settings
---
# storm-deploy
!clojure
; ~/.pallet/config.clj
(defpallet
:services
{
:default {
:blobstore-provider "aws-s3"
:provider "aws-ec2"
:environment {:user {:username "storm"
:private-key-path "~/.ec2/k.pem"
:public-key-path "~/.ec2/k.pem.pub"}
:aws-user-id "1234-5678-9999"}
:identity "AKIA1111111111111111"
:credential "abCDEFghijklmnpOPQRSTuvWXyz1234567890123"
:jclouds.regions "us-east-1"
}
})
---
# storm-deploy
!bash
# start cluster
$ lein run :deploy --start --name mycluster --release 0.8.1
# attach to the cluster
$ lein run :deploy --attach --name mycluster
# stop cluster
$ lein run :deploy --stop --name mycluster
---
# Running in production
!bash
# deploy your topology
$ redstorm jar examples/simple
$ redstorm cluster --1.9 examples/simple/word_count_topology.rb
# monitor with storm-ui and ganglia
$ open http://{nimbus-ip}:8080
$ open http://{nimbus-ip}/ganglia
# kill topology
$ storm kill word_count
# Presenter Notes
* jar it up, then submit the jar to the topology
* monitor with storm-ui and ganglia
* Specifying other dependencies for deploy (ami, system packages, jars, gems)
* jclouds/pallet, apache ivy, bundler
---
# Bigger example: Tweitgeist
* [Live example](http://tweitgeist.colinsurprenant.com)
* [GitHub source](https://github.com/colinsurprenant/tweitgeist)

---
# Bigger example: Tweitgeist
* [Talk: "Twitter Big Data"](http://www.slideshare.net/colinsurprenant/twitter-big-data)

---
# But wait, there's more...
* Runtime rebalancing and swapping
* DRPC: Ad-hoc computations
* Trident: state, transactions, exactly-once semantics
* Lambda architecture: blend streaming and batch modes
# Presenter Notes
* change parallelization and migrate topologies on the fly
* drpc
* abstraction built on top of storm primitives
* design distributed computations as topologies
* some function you want to execute on-the-fly, across a cluster
* drpc server acts as a spout, emits function invocations
* trident
* also built atop storm primitives
* exactly-once semantics with fault tolerance
* stateful source/sink
* lambda
* not part of storm,
* but an approach to combining both realtime and batch in your architecture
* discussed in 2012 strange loop talk
* and in book "big data" http://www.manning.com/marz/
---
# Questions!
---
# Resources: Getting started
* [storm-starter](https://github.com/nathanmarz/storm-starter)
* [redstorm examples](https://github.com/colinsurprenant/redstorm/tree/master/examples)
---
# Resources: Software
* [RedStorm](https://github.com/colinsurprenant/redstorm)
* [storm](http://storm-project.net/)
* [storm-contrib](https://github.com/nathanmarz/storm-contrib)
* [storm-deploy](https://github.com/nathanmarz/storm-deploy)
* [storm-mesos](https://github.com/nathanmarz/storm-mesos)
* [storm-starter](https://github.com/nathanmarz/storm-starter)
---
# Resources: Documentation
* [Storm wiki](https://github.com/nathanmarz/storm/wiki)
* ~ 40,000 words of doc
* [storm-user Google group](https://groups.google.com/group/storm-user)
---
# Resources: Talks
* [ETE 2012 Talk](http://vimeo.com/40972420)
* "Storm: Distributed and fault-tolerant realtime computation" - April 2012
* [Runaway complexity in Big Data](http://www.infoq.com/presentations/Complexity-Big-Data)
* "Common sources of complexity in data systems and a design for a fundamentally better data system" - October 2012
---
# Resources: Book
* [Big Data](http://manning.com/marz/)
* Early access book by Nathan Marz
* "Principles and best practices of scalable realtime data systems"

---
# Resources: Other ESP/CEP resources
* [Wikipedia: Event Stream Processing](http://en.wikipedia.org/wiki/Event_stream_processing)
* [Event Stream Processor Matrix](http://blog.sematext.com/2011/09/26/event-stream-processor-matrix/)
* [Quora: Are there any open-source CEP tools?](http://www.quora.com/Complex-Event-Processing-CEP/Are-there-any-open-source-CEP-tools)
* [Ilya Grigorik's "StreamSQL: Event Processing with Esper"](http://www.igvita.com/2011/05/27/streamsql-event-processing-with-esper/)
---
# Thanks!
_Diagrams from Nathan Marz' "Storm: Distributed and fault-tolerant realtime computation" [talk](http://chariotsolutions.com/presentations/storm-distributed-and-fault-tolerant-realtime-comp)._
a在线亚洲男人的天堂_边摸边吃奶边做动态图_欧美亚洲久久综合精品