前言
Storm
是一个流式处理框架(你可以把它当成一种消息队列),开发人员开发出特定的项目,然后通过storm这个渠道下发各种任务,从而达到任务执行的效果。
Storm有两个比较重要的组件:nimbus
和supervision
,其中nimbus
主要是承接任务和分配任务用,而每一个supervision
可以有若干worker
(视服务器硬件而定),而supervison
的主要任务就是监控对应的worker
,一旦worker
死了,supervision
就会把他们唤醒。
本次试验是用的是金山云服务器,storm的版本是1.0.2
,配置是1个nimbus,三个supervision
,每一个worker上只执行一个任务,总共三个任务。
准备工作
安装storm
之前需要在storm
里新安装一套zookeeper
,因为storm
是需要一个zk集群的,nimbus
和每一个supervisior
是通过zk的心跳来传递存活的信息,于是我们就在每一个supervision
里面安装一个zookeeper
,并且启动zookeeper
的server
端,安装zookeeper
的方法可以移步http://chenx1242.blog.51cto.com/10430133/1889715 。
上面这段话用图来说就是这样子:
启动zookeeper
之后,就需要在nimbus
和supervisior
里安装storm
,上面说过本次安装的storm
是1.0.2
版本,路径直接是/storm/apache-storm-1.0.2
。
将storm
安装完之后,需要在nimbus
和supervisior
里更改/etc/hosts
文件,改成如下的格式:
1
2
3
4
5
6
7
8
9
10
11127.0.0.1 localhost
nimbus的内网IP online-nimbus-001
supervision1的内网IP supervision-001
supervision2的内网IP supervision-002
supervision3的内网IP supervision-003
zookeeper的内网IP zookeeper的名称 #注意,这里的zk是给模块拉取配置的zk
storm的zk1的内网IP storm的zk1 #这里的zk就是给storm集群用的zk
storm的zk2的内网IP storm的zk2 #如果storm的zk是standalone模式,这里就不要写了。
storm的zk3的内网IP storm的zk3 #如果storm的zk是standalone模式,这里就不要写了。
保存完/etc/hosts
之后,还有一个比较重要的步骤,就是在/etc/ld.so.conf.d/
这个路径里面建立一个ffmped.conf
这个文件,文件的内容如下:
1
2/storm/apache-storm-1.0.2/lib
/storm/apache-storm-1.0.2/lib/3rd
注意,/storm/apache-storm-1.0.2
是我的storm路径,在实际情况下需要根据自己的路径进行更改。
把这个ffmped.conf
建立成功之后,我们可以测试一下,如果输入ldconfig
的话,会出现如下的内容,就证明达到了我们的效果:
storm
本身的bin
目录夹里也有很多命令可以直接使用,为了调用storm list
方便,我们需要把bin/storm
这个可执行文件作一个软连接,方法就是先cd /usr/local/bin/
,然后ln -s /storm/apache-storm-1.0.2/bin/storm storm
。这样的话,我们就可以直接使用storm list
来查看任务列表了。
Storm的具体配置
安装了storm
,调整了命令行,同时也搞定了ffmpeg.conf
,下面就是调整storm
的配置文件了,nimbus
和supervisior
都要修改。
storm的配置文件叫storm.yaml
,路径位于storm
文件夹下的/conf/
文件夹,我们需要在这个文件里面输入如下的内容:
下面对配置文件作一个简单的解释:
1)storm.zookeeper.port:zk的默认端口2181;
2)storm.cluster.mode:storm的集群运行模式,这里我们也是采用默认的distributed(分布式);
3)storm.local.dir:storm使用的本地文件的目录,这个目录必须存在而且storm进程可读写;
4)supervisor.slots.ports:这个地方在nimbus里可以不用管,但是在supervisior里是需要改的,如果你只打开6700,那么就只放开了6700端口,即只有一个worker,如果你打开了6700、6701、6702三个端口,那么就意味这个supervisior将有三个worker在工作,由于这次试验里我们每一个supervisor只开启一个任务,所以在supervisior的storm.yaml里这个节点就只保留6700,其他的就全部注释掉;
5)nimbus.task.launch.secs:task启动时的一个特殊超时设置.在启动后第一次心跳前会使用该值来临时替代nimbus.task.timeout.secs;
6)worker.childopts:设定每个worker (JVM任务)的最小和最大内存;
更改完了storm.yaml
之后,就要在nimbus
里面安装zkclient
。直接复制粘贴过来就好了。
如果你不喜欢storm
自带的日志格式,想更改一下日志的内容,那么就要在/storm/apache-storm-1.0.2/log4j2
文件夹里面修改worker.xml
,不过在这里善意的提醒,最好在修改之前先备份原有的worker.xml
。
连接具体任务
这次的实验包用的是我所在的公司开发内部使用的包,先把这个包的内容复制到/storm/
文件夹下,同时mkdir install
和makir properties
这两个文件夹,在install
文件夹里有开发写的任务的jar
包和启动程序,如下:
而在properties
文件夹里,应该有这个任务的配置文件,如下:
由于我们已经事前在/etc/hosts
里指定了zkclient需要访问的zk的ip地址了,那么如果zk项配置正确,zkclient这个时候是可以成功启动的。同时在install
文件夹里./update_stormserver_config.sh
也应该是反应正确的。
然后我们就可以启动storm了。
启动nimbus和supervision
启动storm
要先启动nimbus
,在/storm/apache-storm-1.0.2/bin
里面启动run_nimbus.sh
,然后等一下会有一大片东西出现,再jps
一下就能看到nimbus
已经启动了,如图:
从上图我们可以看到,18141的进程就是zkclient
,只不过在jps
里它名字叫AppServerDaemon
,而zkServer
在jps
里叫QuorumPeerMain
。
如果 storm
出现Did you specify a valid list of nimbus hosts for config nimbus.seeds?
的错误提示,那么就是nimbus
没有启动的缘故。
启动了nimbus
之后,就可以在supervisor
的机器里去效仿着启动supervisor
,但是这里要注意,如果你开启了一个supervisior
,那么按照我们上面的配置文件,就启动了一个6700端口的worker
,这个时候在nimbus
执行下派一个任务的命令,nimbus
就会下派这个任务给这个worker
。
下派命令的例子如下:
1
storm jar storm-starter-0.9.2-incubating-jar-with-dependencies.jar com.lechange.recordshare.RecordShareTopology 1
这样就启动了一个叫videoshare
的任务,这个任务只用1个worker
。
如果在命令行里反馈这样的错误:
1
Error: Could not find or load main class storm.starter.recordshare.RecordShareTopology
或者exception in thread main java.lang.NoClassDefFoundError
这样的错误,那就要检查jar包和路径。
而如果你再打开一个supervisor
,在nimbus
端又下发了一个任务,那么这个任务就会给刚刚新启动的supervisor
。这样,启动一个下发一个,就会对每一个worker
具体干的任务情况有一个比较清晰的了解。
在nimbus上执行storm list
,就可以获得上图的样子,可以看出,我在nimbus端下发了三个任务,就是topology_name
这一栏,他们的状态也是active
,而workers
数量都是1,也就是说在那三台supervisor
里都在工作。而跑到supervisor
一看日志,也是对应有各自的任务日志。
至此整个storm和具体的模块工作的搭建就完成了。
补充
如果你事前一口气把三个supervisor
都打开了,即开启了3个worker
,然后一口气在nimbus
端,一口气输入了三个下发任务的命令,那么这三个命令会随机的到这三个worker
里,没有任何顺序而言,你只能通过日志的关键词来判断具体的worker
做哪些任务。
而如果你的worker数量少于nimbus
下发任务的数量,会有什么反应呢?
答案就是任务根本没有worker
去干,在storm list
里,多余的任务对应的num_workers
的数字是0,而如果这个时候你新增一个supervisor
到这个storm
集群,那么这个任务就会吭哧吭哧开始工作了。