Apache Storm - 实时流计算系统

项目简介

Apache Storm是一个免费开源的分布式实时计算系统,专门用于处理无界数据流。Storm由Nathan Marz在BackType公司开发,后来被Twitter收购并开源,最终成为Apache软件基金会的顶级项目。

Storm的设计目标是提供一个简单易用、容错性强的实时流处理框架。它可以处理每秒数百万条消息,具有低延迟、高可靠性的特点,被广泛应用于实时分析、在线机器学习、连续计算等场景。

主要特性

  • 实时处理:毫秒级延迟的数据流处理
  • 可扩展性:水平扩展到数百个节点
  • 容错保证:自动故障检测和恢复
  • 易于使用:简单的编程模型和API
  • 多语言支持:Java、Python、Ruby等多种语言
  • 可靠性保证:至少一次处理语义

项目原理

核心概念

Topology(拓扑)

  • Storm应用的逻辑单元
  • 由Spout和Bolt组成的有向无环图
  • 定义数据流的处理逻辑

Spout(喷头)

  • 数据流的源头
  • 从外部系统读取数据
  • 向拓扑发送tuple

Bolt(螺栓)

  • 数据处理单元
  • 接收tuple并进行处理
  • 可以发送新的tuple

Tuple(元组)

  • Storm中的数据单元
  • 由命名字段组成的列表
  • 在拓扑中流动的基本数据结构

Stream(流)

  • 无界的tuple序列
  • 连接Spout和Bolt
  • 定义数据流向

集群架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Storm集群架构
├── Nimbus (主节点)
│ ├── 拓扑管理
│ ├── 任务分配
│ └── 故障监控
├── Supervisor (工作节点)
│ ├── Worker进程管理
│ ├── Executor执行
│ └── Task运行
├── ZooKeeper
│ ├── 集群协调
│ ├── 配置管理
│ └── 状态存储
└── UI (用户界面)
├── 拓扑监控
├── 性能指标
└── 日志查看

处理语义

At-least-once(至少一次)

  • 保证每条消息至少被处理一次
  • 可能出现重复处理
  • 通过ACK机制实现

At-most-once(至多一次)

  • 消息可能丢失但不会重复
  • 关闭可靠性机制
  • 提供更高的性能

Exactly-once(精确一次)

  • 通过Trident API实现
  • 结合状态管理和事务
  • 更复杂但更可靠

使用场景

1. 实时数据分析

对流式数据进行实时聚合、计算和分析。

2. 实时推荐系统

基于用户实时行为更新推荐结果。

3. 实时监控告警

监控系统指标,实时检测异常并发送告警。

4. 实时ETL

对流式数据进行实时清洗、转换和加载。

5. 复杂事件处理

检测和处理复杂的事件模式和序列。

具体案例

案例1:实时词频统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

// 随机句子生成器Spout
public class RandomSentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private Random random;

private static final String[] sentences = {
"Apache Storm is a powerful stream processing system",
"Real-time data processing with Storm is amazing",
"Storm provides fault-tolerant distributed computing",
"Big data stream processing made easy with Storm",
"Apache Storm handles millions of tuples per second"
};

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}

@Override
public void nextTuple() {
// 随机选择一个句子
String sentence = sentences[random.nextInt(sentences.length)];

// 发送tuple
collector.emit(new Values(sentence));

// 稍作延迟
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}

// 分词Bolt
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
try {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.toLowerCase().split("\\s+");

for (String word : words) {
if (!word.trim().isEmpty()) {
// 清理标点符号
word = word.replaceAll("[^a-zA-Z0-9]", "");
if (!word.isEmpty()) {
collector.emit(tuple, new Values(word));
}
}
}

// 确认处理完成
collector.ack(tuple);

} catch (Exception e) {
// 处理失败,重新发送
collector.fail(tuple);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

// 词频统计Bolt
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private Map<String, Integer> wordCounts;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.wordCounts = new HashMap<>();
}

@Override
public void execute(Tuple tuple) {
try {
String word = tuple.getStringByField("word");

// 更新词频
Integer count = wordCounts.get(word);
if (count == null) {
count = 0;
}
count++;
wordCounts.put(word, count);

// 发送结果
collector.emit(tuple, new Values(word, count));

// 每处理100个词打印一次统计
if (count % 100 == 0) {
System.out.println("词频统计更新: " + word + " -> " + count);
}

collector.ack(tuple);

} catch (Exception e) {
collector.fail(tuple);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

// 拓扑构建和运行
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();

// 添加Spout
builder.setSpout("sentence-spout", new RandomSentenceSpout(), 1);

// 添加分词Bolt
builder.setBolt("split-bolt", new SplitSentenceBolt(), 2)
.shuffleGrouping("sentence-spout");

// 添加计数Bolt
builder.setBolt("count-bolt", new WordCountBolt(), 2)
.fieldsGrouping("split-bolt", new Fields("word"));

// 配置
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(2);

if (args != null && args.length > 0) {
// 提交到集群
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
// 本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", config, builder.createTopology());

Thread.sleep(30000); // 运行30秒

cluster.shutdown();
}
}
}

案例2:实时日志分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
// 日志解析Spout
public class LogSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private BufferedReader reader;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
try {
// 从文件或Kafka读取日志
this.reader = new BufferedReader(new FileReader("/var/log/access.log"));
} catch (IOException e) {
throw new RuntimeException("无法打开日志文件", e);
}
}

@Override
public void nextTuple() {
try {
String logLine = reader.readLine();
if (logLine != null) {
collector.emit(new Values(logLine, System.currentTimeMillis()));
} else {
Thread.sleep(50); // 等待新日志
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("logline", "timestamp"));
}
}

// 日志解析Bolt
public class LogParserBolt extends BaseRichBolt {
private OutputCollector collector;
private Pattern logPattern;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
// Apache访问日志格式
String regex = "^(\\S+) \\S+ \\S+ \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) \\S+\" (\\d{3}) (\\d+)";
this.logPattern = Pattern.compile(regex);
}

@Override
public void execute(Tuple tuple) {
try {
String logLine = tuple.getStringByField("logline");
long timestamp = tuple.getLongByField("timestamp");

Matcher matcher = logPattern.matcher(logLine);

if (matcher.find()) {
String clientIP = matcher.group(1);
String method = matcher.group(3);
String url = matcher.group(4);
int statusCode = Integer.parseInt(matcher.group(5));
long responseSize = Long.parseLong(matcher.group(6));

// 发送解析后的数据
collector.emit(tuple, new Values(
clientIP, method, url, statusCode, responseSize, timestamp
));
}

collector.ack(tuple);

} catch (Exception e) {
collector.fail(tuple);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("client_ip", "method", "url", "status_code", "response_size", "timestamp"));
}
}

// 错误检测Bolt
public class ErrorDetectionBolt extends BaseRichBolt {
private OutputCollector collector;
private Map<String, AtomicInteger> errorCounts;
private ScheduledExecutorService scheduler;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.errorCounts = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(1);

// 每分钟检查错误率
scheduler.scheduleAtFixedRate(this::checkErrorRates, 1, 1, TimeUnit.MINUTES);
}

@Override
public void execute(Tuple tuple) {
try {
String clientIP = tuple.getStringByField("client_ip");
int statusCode = tuple.getIntegerByField("status_code");

// 检测4xx和5xx错误
if (statusCode >= 400) {
errorCounts.computeIfAbsent(clientIP, k -> new AtomicInteger(0))
.incrementAndGet();

// 发送错误事件
collector.emit(tuple, new Values(
clientIP, statusCode, "ERROR_DETECTED", System.currentTimeMillis()
));
}

collector.ack(tuple);

} catch (Exception e) {
collector.fail(tuple);
}
}

private void checkErrorRates() {
for (Map.Entry<String, AtomicInteger> entry : errorCounts.entrySet()) {
String ip = entry.getKey();
int errorCount = entry.getValue().getAndSet(0); // 重置计数

if (errorCount > 10) { // 每分钟超过10个错误
System.out.println("高错误率警告: IP " + ip + " 在过去1分钟内产生了 " + errorCount + " 个错误");

// 可以发送告警到外部系统
sendAlert(ip, errorCount);
}
}
}

private void sendAlert(String ip, int errorCount) {
// 发送告警到监控系统、邮件、短信等
System.out.println("发送告警: IP " + ip + " 错误数: " + errorCount);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("client_ip", "status_code", "event_type", "timestamp"));
}
}

// 统计聚合Bolt
public class StatsBolt extends BaseRichBolt {
private OutputCollector collector;
private Map<String, Long> urlCounts;
private Map<String, Long> ipCounts;
private AtomicLong totalRequests;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.urlCounts = new ConcurrentHashMap<>();
this.ipCounts = new ConcurrentHashMap<>();
this.totalRequests = new AtomicLong(0);

// 定期输出统计信息
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(this::printStats, 30, 30, TimeUnit.SECONDS);
}

@Override
public void execute(Tuple tuple) {
try {
String clientIP = tuple.getStringByField("client_ip");
String url = tuple.getStringByField("url");

// 更新统计
urlCounts.merge(url, 1L, Long::sum);
ipCounts.merge(clientIP, 1L, Long::sum);
totalRequests.incrementAndGet();

collector.ack(tuple);

} catch (Exception e) {
collector.fail(tuple);
}
}

private void printStats() {
System.out.println("=== 实时统计 ===");
System.out.println("总请求数: " + totalRequests.get());

// 输出Top 10 URLs
System.out.println("Top 10 热门URL:");
urlCounts.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.limit(10)
.forEach(entry -> System.out.println(" " + entry.getKey() + ": " + entry.getValue()));

// 输出Top 10 IPs
System.out.println("Top 10 活跃IP:");
ipCounts.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.limit(10)
.forEach(entry -> System.out.println(" " + entry.getKey() + ": " + entry.getValue()));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 不需要输出字段
}
}

案例3:Trident批处理API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;

// 使用Trident进行批处理
public class TridentWordCount {

public static void main(String[] args) {
TridentTopology topology = new TridentTopology();

// 创建数据流
topology.newStream("sentences", new FixedBatchSpout(new Fields("sentence"), 3,
new Values("Apache Storm is great"),
new Values("Storm provides reliable processing"),
new Values("Trident offers exactly-once semantics"),
new Values("Real-time processing with Storm"),
new Values("Big data stream processing")))

// 分词
.each(new Fields("sentence"), new Split(), new Fields("word"))

// 分组并计数
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

// 创建查询流
.newValuesStream();

// 创建查询拓扑
TridentState wordCounts = topology.newStaticState(new MemoryMapState.Factory());

topology.newDRPCStream("words")
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("word", "count"), new FilterNull())
.project(new Fields("word", "count"));
}

// 分词函数
public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.toLowerCase().split("\\s+")) {
if (!word.trim().isEmpty()) {
collector.emit(new Values(word.replaceAll("[^a-zA-Z0-9]", "")));
}
}
}
}

// 空值过滤
public static class FilterNull extends BaseFilter {
@Override
public boolean isKeep(TridentTuple tuple) {
return tuple.get(1) != null;
}
}
}

// 自定义聚合器
public class SumAggregator extends BaseAggregator<SumAggregator.State> {

static class State {
long sum = 0;
}

@Override
public State init(Object batchId, TridentCollector collector) {
return new State();
}

@Override
public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {
state.sum += tuple.getLong(0);
}

@Override
public void complete(State state, TridentCollector collector) {
collector.emit(new Values(state.sum));
}
}

案例4:Kafka集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;

// Kafka集成拓扑
public class KafkaStormIntegration {

public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();

// 配置Kafka Spout
KafkaSpoutConfig<String, String> kafkaConfig = KafkaSpoutConfig
.builder("localhost:9092", "input-topic")
.setGroupId("storm-consumer-group")
.setMaxUncommittedOffsets(250)
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
.build();

KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaConfig);

// 配置Kafka Bolt
KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>()
.withTopicSelector(new DefaultTopicSelector("output-topic"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>())
.withProducerProperties(getKafkaProducerProps());

// 构建拓扑
builder.setSpout("kafka-spout", kafkaSpout, 1);

builder.setBolt("processing-bolt", new MessageProcessingBolt(), 2)
.shuffleGrouping("kafka-spout");

builder.setBolt("kafka-bolt", kafkaBolt, 1)
.shuffleGrouping("processing-bolt");

// 配置和提交
Config config = new Config();
config.setNumWorkers(3);

StormSubmitter.submitTopology("kafka-storm-topology", config, builder.createTopology());
}

private static Properties getKafkaProducerProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "1");
props.put("retries", 3);
return props;
}
}

// 消息处理Bolt
public class MessageProcessingBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
try {
String message = tuple.getStringByField("value");

// 处理消息(例如:JSON解析、数据转换等)
String processedMessage = processMessage(message);

// 发送到Kafka
collector.emit(tuple, new Values("processed-key", processedMessage));
collector.ack(tuple);

} catch (Exception e) {
collector.fail(tuple);
e.printStackTrace();
}
}

private String processMessage(String message) {
// 实际的消息处理逻辑
// 例如:数据清洗、格式转换、业务规则应用等
return message.toUpperCase() + " - PROCESSED";
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("key", "value"));
}
}

案例5:性能监控和调优

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# storm.yaml配置文件
storm.zookeeper.servers:
- "zk1.example.com"
- "zk2.example.com"
- "zk3.example.com"

nimbus.seeds: ["nimbus1.example.com", "nimbus2.example.com"]

# 性能调优参数
topology.workers: 4
topology.acker.executors: 2
topology.max.spout.pending: 1000
topology.message.timeout.secs: 30

# 内存配置
worker.heap.memory.mb: 2048
supervisor.memory.capacity.mb: 8192

# 网络配置
storm.messaging.transport: "org.apache.storm.messaging.netty.Context"
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 16384

# 序列化配置
topology.kryo.register:
- "com.example.MyClass"
- "com.example.MyOtherClass"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 性能监控工具
public class StormMonitor {

public void monitorTopology(String topologyName) {
// 获取拓扑信息
Map<String, Object> topologyInfo = getTopologyInfo(topologyName);

// 监控关键指标
double throughput = getThroughput(topologyInfo);
double latency = getLatency(topologyInfo);
double capacity = getCapacity(topologyInfo);

System.out.println("拓扑性能指标:");
System.out.println(" 吞吐量: " + throughput + " tuples/sec");
System.out.println(" 延迟: " + latency + " ms");
System.out.println(" 容量: " + capacity);

// 性能告警
if (capacity > 0.8) {
System.out.println("警告: 容量使用率过高 (" + capacity + ")");
}

if (latency > 1000) {
System.out.println("警告: 处理延迟过高 (" + latency + " ms)");
}
}

private Map<String, Object> getTopologyInfo(String topologyName) {
// 通过Storm UI API获取拓扑信息
// 这里简化为示例代码
return new HashMap<>();
}

private double getThroughput(Map<String, Object> topologyInfo) {
// 计算吞吐量
return 0.0;
}

private double getLatency(Map<String, Object> topologyInfo) {
// 计算平均延迟
return 0.0;
}

private double getCapacity(Map<String, Object> topologyInfo) {
// 计算容量使用率
return 0.0;
}
}

性能优化建议

1. 并行度配置

1
2
3
4
5
6
// 合理设置并行度
builder.setSpout("spout", new MySpout(), 2);
builder.setBolt("bolt1", new MyBolt1(), 4)
.shuffleGrouping("spout");
builder.setBolt("bolt2", new MyBolt2(), 8)
.fieldsGrouping("bolt1", new Fields("key"));

2. 内存优化

1
2
3
Config config = new Config();
config.put(Config.WORKER_HEAP_MEMORY_MB, 2048);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 512);

3. 批处理优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 批量处理提高性能
public class BatchingBolt extends BaseRichBolt {
private List<Tuple> batch = new ArrayList<>();
private static final int BATCH_SIZE = 100;

@Override
public void execute(Tuple tuple) {
batch.add(tuple);

if (batch.size() >= BATCH_SIZE) {
processBatch(batch);
for (Tuple t : batch) {
collector.ack(t);
}
batch.clear();
}
}
}

Apache Storm作为实时流处理的先驱,其简单易用的编程模型和强大的容错机制使其成为处理实时数据流的理想选择。虽然现在有更新的流处理框架,但Storm在特定场景下仍然具有重要价值,特别是需要低延迟和高可靠性的应用场景。

版权所有,如有侵权请联系我