博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm Trident示例CombinerAggregator
阅读量:5160 次
发布时间:2019-06-13

本文共 2674 字,大约阅读时间需要 8 分钟。

CombinerAggregator首先在每个分区上运行partitionAggregate,在每个partition内先聚合,然后运行全局重新分区(global)操作以合并同一批次的所有分区到一个单独的分区,即把前面每个partition聚合的结果,再放到一个单独的partition进行聚合。 这里的网络传输与其他两个聚合器相比较少。 因此,CombinerAggregator的总体性能比Aggregator和ReduceAggregator好。

省略部分代码,省略部分可参考:https://blog.csdn.net/nickta/article/details/79666918

FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,                    new Values("nickt1", 4),                   new Values("nickt2", 7),                    new Values("nickt3", 8),                   new Values("nickt4", 9),                    new Values("nickt5", 7),                   new Values("nickt6", 11),                   new Values("nickt7", 5)                   );           spout.setCycle(false);           TridentTopology topology = new TridentTopology();           topology.newStream("spout1", spout)                   .shuffle()                   .each(new Fields("user", "score"),new Debug("shuffle print:"))                  .parallelismHint(5)                  .aggregate(new Fields("score"), new CombinerAggregator
() { //partition当中的每个tuple调用 1次 public Integer init(TridentTuple tuple) { return tuple.getInteger(0); } //聚合结果 //第1次调用时,val1值为zero返回的值,之后的调用为上次调用 combine的返回值 //val2为每次init返回的值 public Integer combine(Integer val1, Integer val2) { return val1+val2; } //如果partition如此没有tuple,也会调用 public Integer zero() { return 0; } }, new Fields("sum")) .each(new Fields("sum"),new Debug("sum print:")) .parallelismHint(5);

输出:

[partition0-Thread-58-b-0-executor[33 33]]> DEBUG(shuffle print:): [nickt3, 8]

[partition1-Thread-126-b-0-executor[34 34]]> DEBUG(shuffle print:): [nickt2, 7]
[partition2-Thread-60-b-0-executor[35 35]]> DEBUG(shuffle print:): [nickt1, 4]
[partition1-Thread-70-b-1-executor[39 39]]> DEBUG(sum print:): [19]
[partition4-Thread-146-b-0-executor[37 37]]> DEBUG(shuffle print:): [nickt4, 9]
[partition4-Thread-146-b-0-executor[37 37]]> DEBUG(shuffle print:): [nickt6, 11]
[partition0-Thread-58-b-0-executor[33 33]]> DEBUG(shuffle print:): [nickt5, 7]
[partition2-Thread-62-b-1-executor[40 40]]> DEBUG(sum print:): [27]
[partition0-Thread-58-b-0-executor[33 33]]> DEBUG(shuffle print:): [nickt7, 5]
[partition3-Thread-39-b-1-executor[41 41]]> DEBUG(sum print:): [5]

转载于:https://www.cnblogs.com/nickt/p/8641513.html

你可能感兴趣的文章
中文论文-LaTex模板
查看>>
P3538 [POI2012]OKR-A Horrible Poem
查看>>
CUDA高性能编程中文实战11章例子中多设备的例子编译提示问题
查看>>
Centos下安装软件的常用方法
查看>>
微信公众平台开发——为何不能在网页调用微信jsapi?
查看>>
emacs设置代理访问插件仓库
查看>>
wireshark自动化之tshark命令行
查看>>
Linux中more命令的实现
查看>>
【递归】二叉树的深度
查看>>
工程源码github地址
查看>>
MediaRecord与AudioRecord
查看>>
高性能网站架构设计之缓存篇(5)- Redis 集群(上)
查看>>
个人项目最终总结
查看>>
day13_先沃联盟定时任务
查看>>
day09_mysql——AB复制
查看>>
上传下载---上传
查看>>
Vue 子路由 与 单页面多路由 的区别
查看>>
JAVA里面的关键字"extends" &"implement"有什么区别
查看>>
图的存储(Java)以及遍历
查看>>
HDU2059龟兔赛跑(加油站)
查看>>