Storm中Emitted、Transferred和Executed区别

  • A+
所属分类:Storm

在stormUI上有三个字段,分别为EmittedTransferredExecuted,都用来标识当前组件发出去的数据记录。

Storm中Emitted、Transferred和Executed区别

(1)Emitted表示当前Spout或Bolt发出去的记录条数,即OutputCollecter类中emit方法被调用的次数。
(2)Transferred表示当前Spout或Bolt发出去的记录一共被转发的次数,因为每个Spout或Bolt下游都可能接多个Bolt,所以有时候虽然emit一条记录,但是会把这条记录分别转发给下游的各个Bolt处理。
(3)Bolt中还有个字段为Executed,表示这个Bolt接收并处理的数据记录数。
Emitted和Transferred之间的倍数关系与多个因素有关:
(1)下游有几条线的Bolt,即有几个Bolt都从当前Spout或Bolt取数据;
(2)下游Bolt接收数据的分组方式,即采用field grouping、all grouping等;
(3)下游Bolt的task数,此因素是否生效还取决于(2)中的分组方式;

举例说明,假设A为Spout,B和C分别为Bolt,且A中的数据分别发给B和C,B的task数为3,C的task数为5,B读A中数据的分组方式All Grouping,C读A中数据的分组方式为Shuffle Grouping,若AEmitted 1000条数据,则存在如下对应关系:
 Transferred = 1000(C的5个Bolt平分这1000条) + 1000 * 3(B的3个Bolt各1份) = 4000条

Storm中Emitted、Transferred和Executed区别

一般情况下,若Spout或Bolt Emit数据后,下游没有任何Bolt去读,Transferred为0,但也特殊情况,就是在UI中我们看到的Transferred不为0,主要原因是Storm调用了ack方法,相当于系统的默认AckerBolt去读了这个数据。

   

补充:
(1)emit()方法传一个参数表示不开启ack机制,即collector.emit(new Value(id)),传两个参数表示需要ack校验,即collector.emit(new Value(id), new Value(id)),其中第二个参数表示messageId,重要的数据可以采用后者。

(2)如果需要在同一个Spout发送给不同的Bolt不同的数据,可以定义不同的流,如:

    // 默认default stream

    Values value1 = new Value(id, time);
    collector.emit(value1, value1);
    // stream2

    Values value2 = new Value(pid, state);
    collector.emit("stream2", value2, value2);

    // stream3

    Values value3 = new Value(sid);
    collector.emit("stream3", value3, value3);

同时在方法中声明
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("id", "date"));  // 默认default stream
    declarer.declareStream("stream2", new Fields("pid", "state"));  // stream2
    declarer.declareStream("stream3", new Fields("sid"));  // stream3
}

 

圈里圈外

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: