TopologyBuilder buider=new TopologyBuilder();
buider.setSpout("a", new SpoutA());
buider.setSpout("b", new SpoutB());
buider.setSpout("c", new SpoutC());
buider.setSpout("d", new SpoutD());
buider.setSpout("e", new SpoutE()).globalGrouping ("c").shuffleGrouping();
4.global grouping
全局分组,整个stream被分配到storm中的一个bolt的其中一个task,再具体一点就是分配给id值最低的那个task
比较适合并发汇总
5.none grouping
不分组,这个分组的意思就是说stream不关心怎么样分组,目前这种分组和shuffle grouping是一种效果,有点不一样的是storm
会使用none grouping的这一个bolt放到这个bolt的订阅者同一个线程里面去执行
6直接分组消息发送者指定消息接收者的哪个task处理这个消息,只有被表名direct stream的消息流可以处理这个方法
消息处理者可以通过topologyContext来获取处理它消息的task id
public void declareOutputFieleds(OutputFieldsDeclarer declare){
declare.declareStream("directStream",true,new Fields("fields"));
}
public void prepare(Map stormConf,TopologyContext context,OutputCollector collector){
this.numOfTasks=context.getComponentTasks("my-stream");
this.collector=collector;
}
public void execute(Tuple input ){
collector.emitDirect(new Random().nextInt(3),process(input));
}
7local or shuffle grouping
custom grouping实现customStreamGrouping接口来自定义分组
public class CategoryGrouping implements CustomStreamGrouping,
Serializable {
// Mapping of category to integer values for grouping
private static final Map<String, Integer> categories =
ImmutableMap.of
(
"Financial", 0,
"Medical", 1,
"FMCG", 2,
"Electronics", 3
);
// number of tasks, this is initialized in prepare method
private int tasks = 0;
public void prepare(WorkerTopologyContext context,
GlobalStreamId stream, List<Integer> targetTasks)
{
// initialize the number of tasks
tasks = targetTasks.size();
}
public List<Integer> chooseTasks(int taskId, List<Object>
values) {
// return the taskId for a given category
String category = (String) values.get(0);
return ImmutableList.of(categories.get(category) % tasks);
}
}
builder.setSpout("a", new SpoutA());
builder.setBolt("b", (IRichBolt)new BoltB())
.customGrouping("a", new CategoryGrouping());