MQ API
I. Overview
The Oinone Message Queue module provides unified API interfaces supporting three message middleware: RocketMQ/Kafka/RabbitMQ, decoupling production and consumption through NotifyProducer
and NotifyConsumer
. Key features:
- Consistent API: One set of interfaces adapts to three message middleware
- Flexible Configuration: Dynamically switch message queue types via YAML configuration
- Message Types: Supports normal/ordered/transactional messages
- Extensibility: Provides send/consume interceptor mechanisms
II. Dependencies and YAML Configuration
(Ⅰ) Maven Dependencies
Add corresponding dependencies as needed based on the message queue used in the actual business.
<!-- RocketMQ -->
<dependency>
<groupId>pro.shushi.pamirs.framework</groupId>
<artifactId>pamirs-connectors-event-rocketmq</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>pro.shushi.pamirs.framework</groupId>
<artifactId>pamirs-connectors-event-kafka</artifactId>
</dependency>
<!-- RabbitMQ -->
<dependency>
<groupId>pro.shushi.pamirs.framework</groupId>
<artifactId>pamirs-connectors-event-rabbitmq</artifactId>
</dependency>
(Ⅱ) YAML Configuration
Documentation related to this topic can be found in Event Configuration.
1. Basic Configuration
pamirs:
event:
enabled: true
topic-prefix: oinone
notify-map:
system: ROCKETMQ # System message type
biz: KAFKA # Business message type
logger: RABBITMQ # Log message type
2. Middleware Configuration
# RocketMQ
spring:
rocketmq:
name-server: 127.0.0.1:9876
producer:
enableMsgTrace: true
# Kafka
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: ${spring.application.name}
# RabbitMQ
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: oinone
password: oinone
III. Core Interfaces
(Ⅰ) NotifyProducer Interface
public interface NotifyProducer<TEMPLATE> {
// Send normal message
<T> NotifySendResult send(String topic, String tag, T msg);
// Send transactional message (RocketMQ specific)
<T> NotifySendResult sendTx(String topic, String tag, String txGroup, T msg, Object extArg);
// Send ordered message
<T> NotifySendResult sendOrderly(String topic, String tag, T msg, String hashKey);
}
Parameter Description:
Parameter | Type | Required | Description |
---|---|---|---|
topic | String | Yes | Message topic |
tag | String | No | Message tag |
msg | Object | Yes | Message body (needs to implement Serializable) |
hashKey | String | Required for ordered messages | Partition key |
Return value NotifySendResult
structure:
public class NotifySendResult {
private boolean success; // Sending status
private Object notifyResult; // Message sending result
private Throwable error; // Exception information
}
(Ⅱ) NotifyConsumer Interface
@FunctionalInterface
public interface NotifyConsumer<T extends Serializable> {
void consume(Message<T> event);
}
IV. Usage Examples
(Ⅰ) Producer Examples
1. Producer Acquisition Methods
Original Implementation (Hardcoding Approach)
@Autowired
private RocketMQNotifyProducer rocketMQNotifyProducer;
@Autowired
private RabbitMQNotifyProducer rabbitMQNotifyProducer;
@Autowired
private KafkaNotifyProducer kafkaNotifyProducer;
public void sendNormalMessage() {
OrderMessage msg = new OrderMessage("ORDER_001");
// Method 1:
// Problem: The message queue type is hardcoded in the code, resulting in high coupling and lack of extensibility
// Directly using specific implementation classes requires modifying code logic when changing message middleware
//NotifySendResult result = rocketMQNotifyProducer.send("oinone-trade", "CREATE", msg);
//NotifySendResult result = rabbitMQNotifyProducer.send("oinone-trade", "CREATE", msg);
NotifySendResult result = kafkaNotifyProducer.send("oinone-trade", "CREATE", msg);
}
Note: This approach has issues
- Strong Coupling: Message queue implementation classes (RocketMQ/RabbitMQ/Kafka) are directly injected into business code, binding to specific middleware
- Hardcoding: Message queue types are hardcoded through variable names or comments, unable to switch dynamically
- Poor Extensibility: Adding new message middleware requires modifying injection code and sending logic, violating the open-closed principle
Optimized Implementation (Decoupled Dynamic Solution)
public void sendNormalMessage() {
// Method 2: Dynamically obtain the corresponding producer based on business type (recommended)
// Obtain the adapted producer instance based on the business key defined in EventConstants
NotifySendResult result = EventEngine.get(EventConstants.EVENT_SYS_BIZ_KEY).send("oinone-trade", "CREATE", msg);
// Method 3: Obtain a universal producer through the business context
// Suitable for scenarios requiring flexible specification of business types
NotifySendResult result = EventEngine.bizNotifyProducer().send("oinone-trade", "CREATE", msg);
}
Core Advantages:
- Middleware Decoupling:
- Eliminates direct dependency on specific
RocketMQNotifyProducer
/RabbitMQNotifyProducer
- Manages producer instances uniformly through
EventEngine
, decoupling business code from middleware
- Eliminates direct dependency on specific
- Dynamic Adaptation:
- Supports dynamic matching of producers through business identifiers like
EventConstants.EVENT_SYS_BIZ_KEY
- When adding new middleware, only need to extend
EventEngine
configuration without modifying business logic
- Supports dynamic matching of producers through business identifiers like
- Unified Interface:
- Provides a unified sending interface
send(String topic, String event, T message)
- Hides API differences of different middleware, reducing learning costs
- Provides a unified sending interface
2. Ordered Message Sending
public void sendOrderlyMessage() {
PaymentMessage payment = new PaymentMessage("PAY_202312");
producer.sendOrderly("oinone-payment", "PAY", payment, payment.getOrderId());
}
3. Transactional Message Sending (RocketMQ)
@TransactionListener("txGroup")
public class TransactionListenerImpl implements NotifyTransactionListener {
@Override
public void executeLocalTransaction(Message msg, Object arg) {
// Local transaction execution
}
@Override
public boolean checkLocalTransaction(MessageExt msg) {
// Transaction status check
}
}
public void sendTransactionMessage() {
producer.sendTx("oinone-account", "DEDUCT", "txGroup", accountDTO, null);
}
4. Sending with @Notify Annotation
Annotation Definition
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Notify {
String notifyBizType() default "biz"; // Business type
String topic(); // Message topic
String tags() default ""; // Message tags
Class<? extends NotifySendCallback> sendCallback() default NotifySendCallback.class; // Sending callback
Class<? extends NotifyQueueSelector> querySelector() default NotifyQueueSelector.class; // Queue selector
Class<? extends NotifyTagsGenerator> tagsGenerator() default NotifyTagsGenerator.class; // Tag generator
}
Attribute | Type | Required | Default Value | Description |
---|---|---|---|---|
notifyBizType | String | No | biz | Message business type (system/biz/logger) |
topic | String | Yes | - | Message topic name |
tags | String | No | "" | Static message tags |
sendCallback | Class | No | NotifySendCallback.class | Sending result callback class |
querySelector | Class | No | NotifyQueueSelector.class | Ordered message queue selector |
tagsGenerator | Class | No | NotifyTagsGenerator.class | Dynamic tag generator |
Normal Message Sending
// Normal usage
@Notify(
topic = "order_created",
tags = "PAYMENT",
notifyBizType = "biz"
)
public Order createOrder(OrderRequest request) {
// Order creation business logic
return orderService.create(request);
}
Dynamic Tag Generation
// Custom tag generator
public class OrderTagGenerator implements NotifyTagsGenerator {
@Override
public String tagsGenerator(Object result) {
if (result instanceof Order order) {
return order.getStatus().name();
}
return "UNKNOWN";
}
}
// Usage example
@Notify(
topic = "order_status_update",
tagsGenerator = OrderTagGenerator.class
)
public void updateOrderStatus(String orderId, OrderStatus status) {
// Status update logic
}
Ordered Message Sending
// Custom queue selector
public class OrderQueueSelector implements NotifyQueueSelector {
@Override
public String hashing(Object result) {
if (result instanceof Order order) {
return order.getUserId();
}
return "0";
}
}
// Usage example
@Notify(
topic = "order_sequence",
querySelector = OrderQueueSelector.class
)
public void processOrderSequence(Order order) {
// Ordered processing logic
}
(Ⅱ) Consumer Examples
1. @NotifyListener Annotation Definition
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface NotifyListener {
String topic(); // Required, listening topic
String tags() default "*"; // Message tag filtering
String group() default ""; // Consumer group
ConsumerType consumerType() default ConsumerType.CONCURRENTLY; // Consumption mode
}
Parameter | Type | Default Value | Description |
---|---|---|---|
topic | String | - | Listened Topic name (supports wildcards) |
tags | String | * | Tag filtering expression (RocketMQ specific) |
group | String | - | Consumer group ID (Kafka mandatory) |
consumerType | enum | CONCURRENTLY | Consumption mode: - CONCURRENTLY : Concurrent consumption - ORDERLY : Ordered consumption |
2. Normal Consumption Example
@Bean
@NotifyListener(
topic = "oinone-trade",
tags = "CREATE",
consumerType = ConsumerType.CONCURRENTLY
)
public NotifyConsumer<OrderMessage> orderCreateConsumer() {
return message -> {
OrderMessage order = message.getPayload();
// Process order creation logic
};
}
3. Idempotent Consumption Handling Example
@Bean
@NotifyListener(topic = "oinone-trade",tags = "CREATE")
public NotifyConsumer<OrderMessage> orderCreateConsumer() {
return message -> {
String msgId = message.getHeaders().getId().toString();
if (redis.exists(msgId)) {
return; // Already processed
}
// Business processing
redis.setex(msgId, 3600);
OrderMessage order = message.getPayload();
// Process order creation logic
};
}
V. Advanced Features
(Ⅰ) Message Interceptors
// Pre-sending processing
@Component
public class AuthCheckSendBefore implements NotifySendBefore {
@Override
public Message<?> sendBefore(Message<?> message) {
message.getHeaders().put("auth-token", getToken());
return message;
}
}
// Post-consumption processing
@Component
public class MetricsCollector implements NotifyConsumeAfter {
@Override
public void consumeAfter(Message<?> message) {
metrics.increment("msg.processed");
}
}
VI. Differences Handling Among Different Middleware
Feature | RocketMQ | Kafka | RabbitMQ |
---|---|---|---|
Transactional Message | Supported | Not supported | Not supported |
Ordered Message | Strict order | Partition order | Queue order |
Message Backtracking | Supported | Time offset | Not supported |
Performance | High throughput | Extremely high throughput | Medium |
VII. Common Questions
Q: How to switch message middleware?
A: Modify the pamirs.event.notify-map
configuration and replace the corresponding dependencies, no need to modify business code.
Q: How to ensure ordered messages?
A: Use the sendOrderly
method, messages with the same hashKey will be routed to the same queue.
Q: What is the principle of transactional message implementation?
A: RocketMQ uses two-phase commit, first sending a prepared message, and committing after the local transaction executes successfully.