配置
依赖的jar
org.apache.rocketmq spring-boot-starter-rocketmq 1.0.0-SNAPSHOT
property配置
消息发送端这两个必须有
# 消费端可以只配置一个nameserver,nameserver spring.rocketmq.nameServer=10.201.9.65:9876;10.201.9.67:9876# 发送端需要提供这个,消费端根据topic和group去消费,集群模式,同一个group只会被消费一次spring.rocketmq.producer.group=imassbank-demo-web-group# 发送端发送失败重试的次数,默认两次,可能会带来消息重复发送spring.rocketmq.producer.retryTimesWhenSendFailed=2# 异步发送模式,同上spring.rocketmq.producer.retryTimesWhenSendAsyncFailed=2#发送消息超时时间,单位毫秒,默认 3000(3秒钟)spring.rocketmq.producer.sendMsgTimeout=300000# 消息大小超过 4k,则被压缩,默认大小4kspring.rocketmq.producer.compressMsgBodyOverHowmuch=4096# 最大消息允许4Mspring.rocketmq.producer.maxMessageSize=4194304# 消息存储失败,是否允许发送换brokerspring.rocketmq.producer.retryAnotherBrokerWhenNotStoreOk=false
否则,不能实例DefaultMQProducer
@Bean @ConditionalOnClass(DefaultMQProducer.class) @ConditionalOnMissingBean(DefaultMQProducer.class) @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"nameServer", "producer.group"}) @Order(1) public DefaultMQProducer mqProducer(RocketMQProperties rocketMQProperties) { RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); String groupName = producerConfig.getGroup(); Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null"); DefaultMQProducer producer = new DefaultMQProducer(producerConfig.getGroup()); producer.setNamesrvAddr(rocketMQProperties.getNameServer()); producer.setSendMsgTimeout(producerConfig.getSendMsgTimeout()); producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed()); producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed()); producer.setMaxMessageSize(producerConfig.getMaxMessageSize()); producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMsgBodyOverHowmuch()); producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryAnotherBrokerWhenNotStoreOk()); return producer; }
Simple消息发送示例
定义一个公共的实体类,必须实现序列化接口
public class RepayStatusChangeVo implements Serializable { /** * */ private static final long serialVersionUID = 1924468458546695791L; private String loanOrderId; private Integer loanNumber; private Integer status; ………省略getter setter}
Producer发送端
@Component("msgSender")public class MsgSender { private static final Logger logger = LoggerFactory.getLogger(MsgSender.class); private final String REPAY_STATUS_TOPIC = "REPAY_STATUS_TOPIC"; @Resource private RocketMQTemplate rocketMQTemplate; public boolean repaySuccessNotify(RepayStatusChangeVo repay){ try { if(repay == null){ repay = new RepayStatusChangeVo(); } repay.setLoanOrderId("asdfasfadasdfas"); repay.setLoanNumber(1); repay.setStatus(1); logger.info("发送还款状态:{}",JSONObject.toJSONString(repay)); SendResult result = rocketMQTemplate.syncSend(this.REPAY_STATUS_TOPIC, JSONObject.toJSONString(repay)); if(result != null && result.getSendStatus().compareTo(SendStatus.SEND_OK)==0){ logger.info("发送消息成功,TOPIC:{},消息ID:{}",this.REPAY_STATUS_TOPIC,result.getMsgId()); }else{ logger.error("发送消息失败,TOPIC:{},消息内容:{}",this.REPAY_STATUS_TOPIC,JSONObject.toJSONString(repay)); } } catch (Exception e) { logger.info("发送还款状态失败:{},异常:{}",JSONObject.toJSONString(repay),e); } return true; } }
Consumer消费端
@Component("msgReceiver")@RocketMQMessageListener(topic = "REPAY_STATUS_TOPIC", consumerGroup = "imassbank-demo-group")public class MsgReceiver implements RocketMQListener{ private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class); @Override public void onMessage(RepayStatusChangeVo repay) { logger.info("接受到还款状态 repay: {}", repay); }}
事务消息示例
消息发送端
producer
package com.imassbank.demo.web.message.transaction;import java.util.HashMap;import java.util.Map;import javax.annotation.Resource;import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSONObject;/** * @author fengpuchao * @date 2018年11月21日 */@Component("msgTransactionSender")public class MsgTransactionSender { private static final Logger logger = LoggerFactory.getLogger(MsgTransactionSender.class); private final String REPAY_STATUS_TOPIC = "REPAY_STATUS_TOPIC_TRANSACTION"; private final String REPAY_STATUS_TXGROUP = "REPAY_STATUS_TXGROUP"; @Resource private RocketMQTemplate rocketMQTemplate; public boolean repaySuccessNotify(){ Mapparam = new HashMap (); param.put("loanOrderId", "adfasdfasdfasdf"); param.put("loanNumber", 5); param.put("status", 1); try { Message msg = MessageBuilder.withPayload(JSONObject.toJSONString(param)).build(); logger.info("发送事务消息,消息内容:{}",JSONObject.toJSONString(param)); rocketMQTemplate.sendMessageInTransaction(REPAY_STATUS_TXGROUP, REPAY_STATUS_TOPIC, msg, null); } catch (Exception e) { logger.info("发送还款状态失败:{},异常:{}",JSONObject.toJSONString(param),e); return false; } return true; }}
listener
package com.imassbank.demo.web.message.transaction;import java.io.UnsupportedEncodingException;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.atomic.AtomicInteger;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.starter.annotation.RocketMQTransactionListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSONObject;/** * @author fengpuchao * @date 2018年11月21日 */@Component("msgTransactionListener")@RocketMQTransactionListener(txProducerGroup="REPAY_STATUS_TXGROUP")public class MsgTransactionListener implements TransactionListener { private static final Logger logger = LoggerFactory.getLogger(MsgTransactionListener.class); private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMaplocalTrans = new ConcurrentHashMap<>(); /** * 在你的prepare消息到达broker的时候调用这个方法 * 此时消息并没有被被消费者消费 * 如果本方法执行成功(commit)或者失败(rollback),则不会执行checkLocalTransaction方法 * UNKNOW,才会调用checkLocalTransaction回查,回查频率间隔1分钟 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { logger.info("消息id:{}",msg.getTransactionId()); try { String msgStr = new String(msg.getBody(),"utf-8"); logger.info("消息id:{},消息内容为:{}",msg.getTransactionId(),msgStr); JSONObject msgBody = JSONObject.parseObject(msgStr); return LocalTransactionState.COMMIT_MESSAGE; } catch (UnsupportedEncodingException e) { e.printStackTrace(); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { logger.info("checkLocalTransaction 消息id:{},消息内容为:{}",msg.getTransactionId(),new String(msg.getBody())); Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; }}
consumer
package com.imassbank.demo.web.message.transaction;import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.starter.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import com.imassbank.demo.bo.RepayStatusChangeVo;/** * @author fengpuchao * @date 2018年11月15日 */@Component("msgTransactionReceiver")@RocketMQMessageListener(topic = "REPAY_STATUS_TOPIC_TRANSACTION", consumerGroup = "REPAY_STATUS_TXGROUP")public class MsgTransactionReceiver implements RocketMQListener{ private static final Logger logger = LoggerFactory.getLogger(MsgTransactionReceiver.class); @Override public void onMessage(String repay) { logger.info("接受到还款状态 repay: {}", repay); }}