Apache ActiveMQ - 企业级消息中间件

项目简介

Apache ActiveMQ是Apache软件基金会开发的开源消息中间件,实现了Java消息服务(JMS)规范。ActiveMQ提供了一个可靠、高性能的消息传递平台,支持多种编程语言和传输协议,被广泛用于构建分布式应用系统。

ActiveMQ支持多种消息模式,包括点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)模式,可以处理大量并发连接和消息。它提供了丰富的企业级功能,如消息持久化、事务支持、集群部署等。

主要特性

  • JMS规范实现:完全支持JMS 1.1和2.0规范
  • 多协议支持:OpenWire、STOMP、AMQP、MQTT等
  • 消息持久化:支持多种持久化存储方式
  • 集群支持:主从复制和网络集群
  • 安全机制:身份认证和授权控制
  • 管理监控:Web控制台和JMX监控

项目原理

核心架构

ActiveMQ采用基于代理(Broker)的架构:

1
2
3
4
5
6
7
8
9
ActiveMQ架构
├── Broker (消息代理)
│ ├── Transport Connectors (传输连接器)
│ ├── Network Connectors (网络连接器)
│ ├── Persistence Adapter (持久化适配器)
│ └── Security Plugin (安全插件)
├── Producer (消息生产者)
├── Consumer (消息消费者)
└── Admin Tools (管理工具)

消息传递模式

点对点模式(Queue)

  • 消息发送到队列
  • 每条消息只能被一个消费者处理
  • 支持负载均衡

发布/订阅模式(Topic)

  • 消息发布到主题
  • 所有订阅者都能收到消息
  • 支持消息广播

消息持久化

ActiveMQ支持多种持久化方式:

KahaDB

  • 默认持久化存储
  • 基于文件的快速存储
  • 支持事务和恢复

AMQ Message Store

  • 传统的消息存储方式
  • 基于日志和引用存储

JDBC Store

  • 基于数据库的存储
  • 支持集群共享存储

LevelDB Store

  • 高性能的键值存储
  • 支持快速读写操作

使用场景

1. 系统解耦

通过消息队列实现系统间的松耦合,提高系统的可维护性和扩展性。

2. 异步处理

将耗时操作异步化,提高系统的响应速度和吞吐量。

3. 流量削峰

在高并发场景下,通过消息队列平滑处理请求峰值。

4. 数据同步

在分布式系统中同步数据变更,保证数据一致性。

5. 事件驱动架构

构建基于事件的微服务架构,实现服务间的异步通信。

具体案例

案例1:基本消息发送和接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

// 消息生产者
public class MessageProducer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "test.queue";

public void sendMessage(String messageText) throws JMSException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

// 创建连接
Connection connection = connectionFactory.createConnection();
connection.start();

try {
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建目标队列
Destination destination = session.createQueue(QUEUE_NAME);

// 创建生产者
javax.jms.MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

// 创建消息
TextMessage message = session.createTextMessage(messageText);
message.setStringProperty("sender", "ProducerApp");
message.setLongProperty("timestamp", System.currentTimeMillis());

// 发送消息
producer.send(message);
System.out.println("消息已发送: " + messageText);

} finally {
connection.close();
}
}

public static void main(String[] args) throws JMSException {
MessageProducer producer = new MessageProducer();

for (int i = 1; i <= 10; i++) {
producer.sendMessage("Hello ActiveMQ " + i);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 消息消费者
public class MessageConsumer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "test.queue";

public void receiveMessages() throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();

try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);

// 创建消费者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);

// 设置消息监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String messageText = textMessage.getText();
String sender = textMessage.getStringProperty("sender");
long timestamp = textMessage.getLongProperty("timestamp");

System.out.printf("接收消息: %s (发送者: %s, 时间: %d)%n",
messageText, sender, timestamp);

// 模拟消息处理
processMessage(messageText);

}
} catch (JMSException e) {
e.printStackTrace();
}
}

private void processMessage(String message) {
// 业务逻辑处理
try {
Thread.sleep(1000); // 模拟处理时间
System.out.println("消息处理完成: " + message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

// 保持连接以接收消息
System.out.println("等待消息中... 按Enter键退出");
System.in.read();

} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}

public static void main(String[] args) throws JMSException {
MessageConsumer consumer = new MessageConsumer();
consumer.receiveMessages();
}
}

案例2:发布/订阅模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 主题发布者
public class TopicPublisher {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "news.topic";

public void publishNews(String category, String content) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();

try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);

javax.jms.MessageProducer producer = session.createProducer(topic);

// 创建消息
TextMessage message = session.createTextMessage(content);
message.setStringProperty("category", category);
message.setStringProperty("publisher", "NewsService");
message.setLongProperty("publishTime", System.currentTimeMillis());

// 发布消息
producer.send(message);
System.out.println("新闻已发布 [" + category + "]: " + content);

} finally {
connection.close();
}
}

public static void main(String[] args) throws JMSException {
TopicPublisher publisher = new TopicPublisher();

publisher.publishNews("科技", "AI技术取得重大突破");
publisher.publishNews("体育", "世界杯决赛即将开始");
publisher.publishNews("财经", "股市创新高");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 主题订阅者
public class TopicSubscriber {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "news.topic";

public void subscribeNews(String subscriberName, String categoryFilter) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.setClientID(subscriberName); // 设置持久订阅需要的客户端ID
connection.start();

try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);

// 创建消息选择器
String messageSelector = categoryFilter != null ?
"category = '" + categoryFilter + "'" : null;

// 创建持久订阅者
TopicSubscriber subscriber = session.createDurableSubscriber(
topic, subscriberName + "_subscription", messageSelector, false);

subscriber.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String content = textMessage.getText();
String category = textMessage.getStringProperty("category");
String publisher = textMessage.getStringProperty("publisher");

System.out.printf("[%s] 收到新闻 [%s]: %s (来源: %s)%n",
subscriberName, category, content, publisher);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});

System.out.println(subscriberName + " 开始订阅新闻... 按Enter键退出");
System.in.read();

} catch (Exception e) {
e.printStackTrace();
} finally {
connection.close();
}
}

public static void main(String[] args) throws JMSException {
// 启动多个订阅者实例
new Thread(() -> {
try {
new TopicSubscriber().subscribeNews("科技爱好者", "科技");
} catch (JMSException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try {
new TopicSubscriber().subscribeNews("体育迷", "体育");
} catch (JMSException e) {
e.printStackTrace();
}
}).start();

// 全量订阅者
new TopicSubscriber().subscribeNews("新闻编辑", null);
}
}

案例3:事务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public class TransactionalMessageExample {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "transaction.queue";

// 事务性消息发送
public void sendTransactionalMessages() throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();

try {
// 创建事务性会话
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
javax.jms.MessageProducer producer = session.createProducer(queue);

try {
// 发送多条消息
for (int i = 1; i <= 5; i++) {
TextMessage message = session.createTextMessage("事务消息 " + i);
message.setIntProperty("messageId", i);
producer.send(message);
System.out.println("发送消息: " + i);
}

// 模拟业务逻辑
if (performBusinessLogic()) {
// 提交事务
session.commit();
System.out.println("事务提交成功");
} else {
// 回滚事务
session.rollback();
System.out.println("事务回滚");
}

} catch (Exception e) {
// 异常时回滚事务
session.rollback();
System.out.println("发生异常,事务回滚: " + e.getMessage());
}

} finally {
connection.close();
}
}

// 事务性消息消费
public void receiveTransactionalMessages() throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();

try {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
javax.jms.MessageConsumer consumer = session.createConsumer(queue);

Message message;
while ((message = consumer.receive(5000)) != null) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String content = textMessage.getText();
int messageId = textMessage.getIntProperty("messageId");

System.out.println("处理消息: " + content);

// 处理业务逻辑
if (processMessage(messageId, content)) {
session.commit();
System.out.println("消息处理成功,事务提交");
} else {
session.rollback();
System.out.println("消息处理失败,事务回滚");
}
}
} catch (Exception e) {
session.rollback();
System.out.println("处理异常,事务回滚: " + e.getMessage());
}
}

} finally {
connection.close();
}
}

private boolean performBusinessLogic() {
// 模拟业务逻辑,可能成功或失败
return Math.random() > 0.3; // 70% 成功率
}

private boolean processMessage(int messageId, String content) {
// 模拟消息处理逻辑
try {
Thread.sleep(100);
return messageId % 4 != 0; // 除了4的倍数都成功
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}

案例4:集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<!-- activemq.xml 主节点配置 -->
<beans xmlns="http://www.springframework.org/schema/beans">
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="master"
dataDirectory="${activemq.data}">

<!-- 持久化配置 -->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

<!-- 传输连接器 -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883"/>
</transportConnectors>

<!-- 网络连接器 - 连接到从节点 -->
<networkConnectors>
<networkConnector uri="static:(tcp://slave1:61616,tcp://slave2:61616)"
duplex="false"
decreaseNetworkConsumerPriority="true"/>
</networkConnectors>

<!-- JMX管理 -->
<managementContext>
<managementContext createConnector="true" connectorPort="1099"/>
</managementContext>

<!-- 安全配置 -->
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="admin" password="admin123" groups="admins"/>
<authenticationUser username="user" password="user123" groups="users"/>
</users>
</simpleAuthenticationPlugin>

<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" write="admins" read="admins" admin="admins"/>
<authorizationEntry topic=">" write="admins" read="admins" admin="admins"/>
<authorizationEntry queue="user.>" write="users" read="users"/>
<authorizationEntry topic="user.>" write="users" read="users"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>

</broker>
</beans>

案例5:Spring集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<!-- Spring配置文件 -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:jms="http://www.springframework.org/schema/jms">

<!-- 连接工厂 -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="admin"/>
<property name="password" value="admin123"/>
</bean>

<!-- 缓存连接工厂 -->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory"/>
<property name="sessionCacheSize" value="10"/>
</bean>

<!-- JMS模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="defaultDestinationName" value="spring.queue"/>
</bean>

<!-- 消息监听容器 -->
<bean id="messageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destinationName" value="spring.queue"/>
<property name="messageListener" ref="messageHandler"/>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
</bean>

<!-- 消息处理器 -->
<bean id="messageHandler" class="com.example.SpringMessageHandler"/>

</beans>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Spring消息服务
@Service
public class SpringMessageService {

@Autowired
private JmsTemplate jmsTemplate;

// 发送消息
public void sendMessage(String queueName, Object message) {
jmsTemplate.convertAndSend(queueName, message);
}

// 发送消息并设置属性
public void sendMessageWithProperties(String queueName, Object message,
Map<String, Object> properties) {
jmsTemplate.send(queueName, session -> {
Message msg = jmsTemplate.getMessageConverter().toMessage(message, session);
for (Map.Entry<String, Object> entry : properties.entrySet()) {
msg.setObjectProperty(entry.getKey(), entry.getValue());
}
return msg;
});
}

// 接收消息
public Object receiveMessage(String queueName) {
return jmsTemplate.receiveAndConvert(queueName);
}
}

// 消息监听器
@Component
public class SpringMessageHandler implements MessageListener {

@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
String content = ((TextMessage) message).getText();
System.out.println("Spring接收消息: " + content);

// 处理业务逻辑
processMessage(content);
}
} catch (JMSException e) {
e.printStackTrace();
}
}

private void processMessage(String content) {
// 业务处理逻辑
System.out.println("处理消息: " + content);
}
}

性能优化建议

1. 连接池配置

1
2
3
4
5
// 连接池配置
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(new ActiveMQConnectionFactory(BROKER_URL));
pooledConnectionFactory.setMaxConnections(20);
pooledConnectionFactory.setMaximumActiveSessionPerConnection(50);

2. 内存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!-- activemq.xml 内存配置 -->
<broker>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="20 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="5 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
</broker>

3. JVM参数调优

1
2
# 启动参数
ACTIVEMQ_OPTS="-Xms2G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200"

Apache ActiveMQ作为成熟的企业级消息中间件,其丰富的功能特性和稳定的性能使其成为构建可靠消息传递系统的理想选择。通过合理的配置和优化,ActiveMQ可以为企业应用提供高效、安全的异步通信能力。

版权所有,如有侵权请联系我