Grouping策略,并行度,消息的可靠处理机制
配置并行度works jvm:在一个节点可以运行多个jvm进程,一个topology可以包含一个或者多个worker并行的泡在不同的machine,所以一个work progress就是执行一个topology的子集
并且一个worker只能对应一个toplogy
exectors在一个worker可以包含一个或者多个tasks,但默认每个excutor只执行一个task,一个worker包含多个exectors,每个component(spout和bolt)至少对应一个executor
tasks(bolt/spout instance) task就是具体的处理对象,每一个spout和bolt会被当做很多task在集群里面执行,每一个task对应一个线程,而stream grouping则是定义怎么从一堆
task发射tuple到另一堆task,可以调用ToplogyBuilder.setSpout和TopBuilder.setBolt来设置并行度,也就是多个task
配置并行度
对于并行度的配置,在storm可以在多个地方进行配置, 优先级为
defaults.yaml<storm.yaml<topology-specific configuration<internal component-specific configuration<external componnet -specific configuration
work process 的数目,可以通过配置文件和代码中的配置,work就是执行进程,所以考虑并发的效果,数目
至少应该大于machines数目
executor数目 component的并发线程数, 只能在代码中配置通过setbolt和setspout的参数,列如 setbolt("green-bolt",new GreenBolt(),2)
tasks数目,可以不配置,默认和executor1:1,也可以通过设置setNumTask()配置
配置并行度
Topology 的worker数通过config设置,也就是执行该toplogy 的work进程数,他可以通过strom
rebalance 命令任意调整
Config conf=new Config();
stream Grouping ,告诉topology如何在两个组件之间发送tuple
定义一个topology的其中一个定义每个bolt接收什么样的流作为输入。stream grouping 就是用来stream应该stream应该
如果分配数据给bolts上面的多个tasks
列如:当:boltA 的一个task要发送一个tuple给bolt B , 他应该发送
storm里面有7种 stream grouping
1.shuffle grouping
2.fields grouping
3.all grouping
4.global grouping
5.none grouping
6.direct grouping
7local or shuffle grouping
通过cuomstreamgrouping 接口来实现分流
1.shuffle grouping
随机分组,
随机派发stream里面得tuple,保证每个bolt接收到的tuple数目大致相同
splitsentence对于句子里面得每个单词发射一个新的tuple,wordconut里面维护一个单词-》次数的mapping,wordcount每接收一个单词就跟新他的统计
状态RandomSentenceSpout和splitSentence之间就是用的shuffle grouping shuffle grouping则是定义怎么从一堆对
每个task的tuple分配比较均匀
TopologyBuilder buider=new TopologyBuilder();
buider.setSpout(1, new RandomSentenceSpout(),5);
buider.setBolt(2, new SplitSentence(),8).shuffleGrouping();
buider.setBolt(3, new WordCount(),12).FieldsGrouping(2,new Fields("words") );
2.fields grouping
按字段分组,比如按user-id这个字段来分组,那么具有同样userid的tuple会被分到相同的bolts里面得一个task
而不同的userid会被分到不同的task
splitsentence 和wordcount之间使用fields grouping ,这种机制保证相同field值得tuple会同一个task,这对于wordcount很关键
如果同一个单词不去同一个task,那么统计出来的次数就不对了
TopologyBuilder buider=new TopologyBuilder();
buider.setSpout(1, new TweetSpout());
buider.setSpout("signal",new SignalSpout());
buider.setBolt(2, new TweetCounter()).FieldsGrouping(1,new Fields("username")).allGrouping("sigals");
3.all grouping广播发送,对于每一个tuple,所有的bolts都会受到
all grouping的一个常见用途就是发送信号给bolts,比如
你对stream进行了一些过滤, 你必须把filter的参数传给所有的bolts,发送这些参数就可以通过all
grouping 来实现
这里我们为tweetcounter的所有task订阅signal,所以我们可以使用
sigalspout发送不同的信号到tweetcounter这个bolt
TopologyBuilder buider=new TopologyBuilder();
buider.setSpout("a", new SpoutA());
buider.setSpout("b", new SpoutB());
buider.setSpout("c", new SpoutC());
buider.setSpout("d", new SpoutD());
buider.setSpout("e", new SpoutE()).globalGrouping ("c").shuffleGrouping();
4.global grouping
全局分组,整个stream被分配到storm中的一个bolt的其中一个task,再具体一点就是分配给id值最低的那个task
比较适合并发汇总
5.none grouping
不分组,这个分组的意思就是说stream不关心怎么样分组,目前这种分组和shuffle grouping是一种效果,有点不一样的是storm
会使用none grouping的这一个bolt放到这个bolt的订阅者同一个线程里面去执行
6直接分组消息发送者指定消息接收者的哪个task处理这个消息,只有被表名direct stream的消息流可以处理这个方法
消息处理者可以通过topologyContext来获取处理它消息的task id
public void declareOutputFieleds(OutputFieldsDeclarer declare){
declare.declareStream("directStream",true,new Fields("fields"));
}
public void prepare(Map stormConf,TopologyContext context,OutputCollector collector){
this.numOfTasks=context.getComponentTasks("my-stream");
this.collector=collector;
}
public void execute(Tuple input ){
collector.emitDirect(new Random().nextInt(3),process(input));
}
7local or shuffle grouping
custom grouping实现customStreamGrouping接口来自定义分组
public class CategoryGrouping implements CustomStreamGrouping,
Serializable {
// Mapping of category to integer values for grouping
private static final Map<String, Integer> categories =
ImmutableMap.of
(
"Financial", 0,
"Medical", 1,
"FMCG", 2,
"Electronics", 3
);
// number of tasks, this is initialized in prepare method
private int tasks = 0;
public void prepare(WorkerTopologyContext context,
GlobalStreamId stream, List<Integer> targetTasks)
{
// initialize the number of tasks
tasks = targetTasks.size();
}
public List<Integer> chooseTasks(int taskId, List<Object>
values) {
// return the taskId for a given category
String category = (String) values.get(0);
return ImmutableList.of(categories.get(category) % tasks);
}
}
builder.setSpout("a", new SpoutA());
builder.setBolt("b", (IRichBolt)new BoltB())
.customGrouping("a", new CategoryGrouping());
消息可靠处理机制
在什么情况下人为一个spout发送出来的消息被完全处理答案是下面的条件同时被满足
1.tuple tree不再增长
2.树中的任何消息被标识为已处理
如果在指定时间内,一个消息被衍生出来的tuple tree 未被完全处理成功,则人为该消息未被完成
这个超时值可以通过任务级参数Config.Toplogy_message_timeout_sec进行配置,默认30秒
消息的生命周期
spout应该实现的接口Ispout extends serialble
假设我们从kestrel读取消息,spout会将这个消息设置ID作为消息的message id向spoutOutputCollector
接下来,这些消息会被发送到后续处理的bolts,并且storm会跟踪由此消息衍生出来的新消息,当检测到一个
消息产生出来的tuple tree被完整处理后,并将此消息的messageid作为参数传入,同理消息如果超时就调用fail
可靠性相关api
当在tuple tree创建新节点, 要通知storm
当处理完一个单独消息,需要告诉storm这个消息树变化情况
为tuple tree增加一个节点,我们称之为峁定
package spouts;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SplitSentence extends BaseRichBolt {
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector=collector;
}
public void execute(Tuple tuple) {
String sentence=tuple.getString(0);
for(String word:sentence.split(" ")){
collector.emit(tuple, new Values(word));
}
collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
多重铆钉
List<Tuple> archor=new LinkedList<Tuple>();
archor.add(Tuple1);
archor.add(Tuple2);
collector.emit(archor, new Values(1,2,3));
或者以下功能一样,但是BasicOutputCollector会被自动锚定到输入消息,当exexute执行完消息会自动的应答输入消息
package spouts;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SplitSentence2 extends BaseBasicBolt {
public void execute(Tuple input, BasicOutputCollector collector) {
String sentence=input.getString(0);
for(String word:sentence.split(" ")){
collector.emit( new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
调整可靠性
将参数config.Topology_ackers设置为0,通过此方法,当spout发送此消息的时候,他的ack会被立即调用
2.发送消息时,可以不指定消息messageid,当需要关闭可靠性的时候,可以使用此 方法
3.如果你不介意衍生出来的子孙消息可靠性,则派生出来出来的消息在发送时候不要铆钉,及在emit中不指定
消息,因为这些消息没有锚定到任何tuple tree中,他们的失败不会引起任何spout重新发送消息
文档来源:开源中国社区https://my.oschina.net/goudingcheng/blog/614326
页:
[1]