博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Springboot2.0整合RocketMQ示例
阅读量:5794 次
发布时间:2019-06-18

本文共 8767 字,大约阅读时间需要 29 分钟。

hot3.png

配置

依赖的jar

org.apache.rocketmq
spring-boot-starter-rocketmq
1.0.0-SNAPSHOT

property配置

消息发送端这两个必须有

# 消费端可以只配置一个nameserver,nameserver spring.rocketmq.nameServer=10.201.9.65:9876;10.201.9.67:9876# 发送端需要提供这个,消费端根据topic和group去消费,集群模式,同一个group只会被消费一次spring.rocketmq.producer.group=imassbank-demo-web-group# 发送端发送失败重试的次数,默认两次,可能会带来消息重复发送spring.rocketmq.producer.retryTimesWhenSendFailed=2# 异步发送模式,同上spring.rocketmq.producer.retryTimesWhenSendAsyncFailed=2#发送消息超时时间,单位毫秒,默认 3000(3秒钟)spring.rocketmq.producer.sendMsgTimeout=300000# 消息大小超过 4k,则被压缩,默认大小4kspring.rocketmq.producer.compressMsgBodyOverHowmuch=4096# 最大消息允许4Mspring.rocketmq.producer.maxMessageSize=4194304# 消息存储失败,是否允许发送换brokerspring.rocketmq.producer.retryAnotherBrokerWhenNotStoreOk=false

否则,不能实例DefaultMQProducer

@Bean    @ConditionalOnClass(DefaultMQProducer.class)    @ConditionalOnMissingBean(DefaultMQProducer.class)    @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"nameServer", "producer.group"})    @Order(1)    public DefaultMQProducer mqProducer(RocketMQProperties rocketMQProperties) {        RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();        String groupName = producerConfig.getGroup();        Assert.hasText(groupName, "[spring.rocketmq.producer.group] must not be null");        DefaultMQProducer producer = new DefaultMQProducer(producerConfig.getGroup());        producer.setNamesrvAddr(rocketMQProperties.getNameServer());        producer.setSendMsgTimeout(producerConfig.getSendMsgTimeout());        producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());        producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());        producer.setMaxMessageSize(producerConfig.getMaxMessageSize());        producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMsgBodyOverHowmuch());        producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryAnotherBrokerWhenNotStoreOk());        return producer;    }

Simple消息发送示例

定义一个公共的实体类,必须实现序列化接口

public class RepayStatusChangeVo implements Serializable {		/**	 * 	 */	private static final long serialVersionUID = 1924468458546695791L;	private String loanOrderId;		private Integer loanNumber;		private Integer status;		………省略getter setter}

Producer发送端

@Component("msgSender")public class MsgSender {	private static final Logger logger = LoggerFactory.getLogger(MsgSender.class);		private final String REPAY_STATUS_TOPIC = "REPAY_STATUS_TOPIC";		@Resource    private RocketMQTemplate rocketMQTemplate;		public boolean repaySuccessNotify(RepayStatusChangeVo	repay){		try {			if(repay == null){				repay = new RepayStatusChangeVo();			}			repay.setLoanOrderId("asdfasfadasdfas");			repay.setLoanNumber(1);			repay.setStatus(1);			logger.info("发送还款状态:{}",JSONObject.toJSONString(repay));			SendResult result = rocketMQTemplate.syncSend(this.REPAY_STATUS_TOPIC, JSONObject.toJSONString(repay));			 if(result != null && result.getSendStatus().compareTo(SendStatus.SEND_OK)==0){				 logger.info("发送消息成功,TOPIC:{},消息ID:{}",this.REPAY_STATUS_TOPIC,result.getMsgId());			 }else{				 logger.error("发送消息失败,TOPIC:{},消息内容:{}",this.REPAY_STATUS_TOPIC,JSONObject.toJSONString(repay));			 }        } catch (Exception e) {        	logger.info("发送还款状态失败:{},异常:{}",JSONObject.toJSONString(repay),e);        }		return true;	}	}

Consumer消费端

@Component("msgReceiver")@RocketMQMessageListener(topic = "REPAY_STATUS_TOPIC", consumerGroup = "imassbank-demo-group")public class MsgReceiver implements RocketMQListener
{ private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class); @Override public void onMessage(RepayStatusChangeVo repay) { logger.info("接受到还款状态 repay: {}", repay); }}

事务消息示例

消息发送端

producer

package com.imassbank.demo.web.message.transaction;import java.util.HashMap;import java.util.Map;import javax.annotation.Resource;import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSONObject;/** * @author fengpuchao * @date 2018年11月21日 */@Component("msgTransactionSender")public class MsgTransactionSender {	private static final Logger logger = LoggerFactory.getLogger(MsgTransactionSender.class);		private final String REPAY_STATUS_TOPIC = "REPAY_STATUS_TOPIC_TRANSACTION";		private final String REPAY_STATUS_TXGROUP = "REPAY_STATUS_TXGROUP";		@Resource    private RocketMQTemplate rocketMQTemplate;		public boolean repaySuccessNotify(){		Map
param = new HashMap
(); param.put("loanOrderId", "adfasdfasdfasdf"); param.put("loanNumber", 5); param.put("status", 1); try { Message
msg = MessageBuilder.withPayload(JSONObject.toJSONString(param)).build(); logger.info("发送事务消息,消息内容:{}",JSONObject.toJSONString(param)); rocketMQTemplate.sendMessageInTransaction(REPAY_STATUS_TXGROUP, REPAY_STATUS_TOPIC, msg, null); } catch (Exception e) { logger.info("发送还款状态失败:{},异常:{}",JSONObject.toJSONString(param),e); return false; } return true; }}

listener

package com.imassbank.demo.web.message.transaction;import java.io.UnsupportedEncodingException;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.atomic.AtomicInteger;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.starter.annotation.RocketMQTransactionListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSONObject;/** * @author fengpuchao * @date 2018年11月21日 */@Component("msgTransactionListener")@RocketMQTransactionListener(txProducerGroup="REPAY_STATUS_TXGROUP")public class MsgTransactionListener implements TransactionListener {	private static final Logger logger = LoggerFactory.getLogger(MsgTransactionListener.class);		private AtomicInteger transactionIndex = new AtomicInteger(0);    private ConcurrentHashMap
localTrans = new ConcurrentHashMap<>(); /** * 在你的prepare消息到达broker的时候调用这个方法 * 此时消息并没有被被消费者消费 * 如果本方法执行成功(commit)或者失败(rollback),则不会执行checkLocalTransaction方法 * UNKNOW,才会调用checkLocalTransaction回查,回查频率间隔1分钟 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { logger.info("消息id:{}",msg.getTransactionId()); try { String msgStr = new String(msg.getBody(),"utf-8"); logger.info("消息id:{},消息内容为:{}",msg.getTransactionId(),msgStr); JSONObject msgBody = JSONObject.parseObject(msgStr); return LocalTransactionState.COMMIT_MESSAGE; } catch (UnsupportedEncodingException e) { e.printStackTrace(); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { logger.info("checkLocalTransaction 消息id:{},消息内容为:{}",msg.getTransactionId(),new String(msg.getBody())); Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; }}

consumer

package com.imassbank.demo.web.message.transaction;import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.starter.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import com.imassbank.demo.bo.RepayStatusChangeVo;/** * @author fengpuchao * @date 2018年11月15日 */@Component("msgTransactionReceiver")@RocketMQMessageListener(topic = "REPAY_STATUS_TOPIC_TRANSACTION", consumerGroup = "REPAY_STATUS_TXGROUP")public class MsgTransactionReceiver implements RocketMQListener
{ private static final Logger logger = LoggerFactory.getLogger(MsgTransactionReceiver.class); @Override public void onMessage(String repay) { logger.info("接受到还款状态 repay: {}", repay); }}

转载于:https://my.oschina.net/liangxiao/blog/2885832

你可能感兴趣的文章
Macaca 极简教程
查看>>
Git用法初探
查看>>
Java 分支结构 - if…else/switch
查看>>
深入浅出Java内存模型这篇给你解决
查看>>
Android App 上架流程汇总
查看>>
全局变量污染什么的最讨厌了
查看>>
备忘录模式
查看>>
零 bug 策略
查看>>
python的random模块
查看>>
Program Ability
查看>>
大话爬虫的实践技巧
查看>>
探索Redis设计与实现7:Redis内部数据结构详解——intset
查看>>
如何在GitHub上大显身手?
查看>>
Javascript 修改 input 验证提示
查看>>
Django前后端分离实践之DRF--09
查看>>
损失函数详解
查看>>
Google AI 系统 DeepMind 高中数学考试不及格
查看>>
启迪之星刘博:优秀的创业者=家国情怀+学者智慧+商业思维+江湖行动
查看>>
QuasarRAT-windows下远程控制工具
查看>>
了解了解
查看>>