一 storm的静态模型
storm的静态模型比较好理解,弄清楚Topology,Spout,Bolt,Stream的含义大概就明白了,不清楚的可以看看strom文档中的Concepts部分。
但是storm是个并行执行的框架,运行状态下的模型是怎么样的呢?
二 storm运行时基本模型
storm运行时的模型,要弄清楚worker,executor,task的区别,解释的最好的是storm的这篇文档 Understanding the parallelism of a Storm topology,里面的两张图画的很好:
worker是个jvm进程,可以通过配置里的setNumWorkers设置一个Topology消耗的进程数;tasks是一个spout/bolt的执行实体,在topologyBuilder的setBolt、setSpout的时候用setNumTasks来知道task数;但是在setBolt,setSpout的时候,还会填写一个并发度的数字,比如setBolt("my bolt", new MyBolt(), 6) ,6就是并发度,也就是6个executor,每个executor是一个线程,这6个线程可以分布到一个或多个进程(worker)里。
下面说个比较复杂的地方,一个executor里可以有一个或多个task,比如这么设置:setBolt("my bolt", new MyBolt(), 6).setNuTasks(12),这时候每个executor里有2个task。也就是说虚拟并发度是12,实际并发度是6。为什么要这样呢?线程里的程序是顺序执行的,一个executor里有2个task,这2个task是先后执行的,没有真正“并发”的效果。这么分的原因有2个:
第一点: spout,bolt的实际并发度,executor的数目可以动态调整,通过storm rebanlance命令可以不重启topology的情况下动态调整。为了实现这种动态调整,就有了上面的虚拟并发数-task数。比如本来一个storm集群有20台物理机,现在10台坏了,就可以用rebalance暂时把并发度降低,等新机器换上后,并发数再调上来。
第二点:方便测试,比如你在单机storm上想测30个并发的执行情况,可是你的硬件资源只能启动2个executor了,这个时候你仍然可以设置30个task来测试。
三 storm的运行时模式是如何建立起来的
1 bolt、spout被创建在你提交任务的机器上,可能这个机器不是storm集群的一部分
2 submit后,上面创建的object被序列化,然后通过网络传到Nimbus。
3 Nimbus把接收到的序列化后的topology存在本地
4 当一个supervisor集群被分配来执行这个topology后,它从Nimbus上下载序列化后的topology存到supervisor机器上。
5 根据配置要启动多个task,启动方式就是读取序列化后的文件然后反序列化。
6 task反序列化后,会先执行prepare函数,然后一切就绪,开始不停地执行execut函数。
四 用java写Spout,Bolt类,类里的数据变量如何共享,要加同步锁吗?
上面都清楚以后,这一步就好理解了,类的static变量是在类装载时候生成,每个类一份,所以这种变量在storm运行模型中,是每个worker一份。
类的成员变量,属于每个Object,所以是每个task一份。由于并发度不会超过task数(executor number <= task number) ,所以成员变量不用加同步锁。
注意在分布式环境下,每个物理机、worker、task随时会失败,然后被重启,所以关键的状态数据要保持到第三方系统里,比如hbase,memcached
参考资料:
http://storm.apache.org/releases/1.0.2/Concepts.html
http://storm.apache.org/releases/1.0.2/Understanding-the-parallelism-of-a-Storm-topology.html
http://stackoverflow.com/questions/17257448/what-is-the-task-in-storm-parallelism
https://groups.google.com/forum/#!topic/storm-user/N4xoNJIUHiw