- A+
所属分类:Storm
在stormUI上有三个字段,分别为Emitted、Transferred和Executed,都用来标识当前组件发出去的数据记录。
(1)Emitted表示当前Spout或Bolt发出去的记录条数,即OutputCollecter类中emit方法被调用的次数。
(2)Transferred表示当前Spout或Bolt发出去的记录一共被转发的次数,因为每个Spout或Bolt下游都可能接多个Bolt,所以有时候虽然emit一条记录,但是会把这条记录分别转发给下游的各个Bolt处理。
(3)Bolt中还有个字段为Executed,表示这个Bolt接收并处理的数据记录数。
Emitted和Transferred之间的倍数关系与多个因素有关:
(1)下游有几条线的Bolt,即有几个Bolt都从当前Spout或Bolt取数据;
(2)下游Bolt接收数据的分组方式,即采用field grouping、all grouping等;
(3)下游Bolt的task数,此因素是否生效还取决于(2)中的分组方式;
举例说明,假设A为Spout,B和C分别为Bolt,且A中的数据分别发给B和C,B的task数为3,C的task数为5,B读A中数据的分组方式All Grouping,C读A中数据的分组方式为Shuffle Grouping,若AEmitted 1000条数据,则存在如下对应关系:
Transferred = 1000(C的5个Bolt平分这1000条) + 1000 * 3(B的3个Bolt各1份) = 4000条
一般情况下,若Spout或Bolt Emit数据后,下游没有任何Bolt去读,Transferred为0,但也特殊情况,就是在UI中我们看到的Transferred不为0,主要原因是Storm调用了ack方法,相当于系统的默认AckerBolt去读了这个数据。
补充:
(1)emit()方法传一个参数表示不开启ack机制,即collector.emit(new Value(id)),传两个参数表示需要ack校验,即collector.emit(new Value(id), new Value(id)),其中第二个参数表示messageId,重要的数据可以采用后者。
(2)如果需要在同一个Spout发送给不同的Bolt不同的数据,可以定义不同的流,如:
// 默认default stream
Values value1 = new Value(id, time);collector.emit(value1, value1);// stream2
Values value2 = new Value(pid, state);collector.emit("stream2", value2, value2);// stream3
Values value3 = new Value(sid);collector.emit("stream3", value3, value3);
同时在方法中声明
@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "date")); // 默认default streamdeclarer.declareStream("stream2", new Fields("pid", "state")); // stream2declarer.declareStream("stream3", new Fields("sid")); // stream3}