源码地址
1.
2.pom
4.0.0 com.zbb.mq.app AliMq war 0.0.1-SNAPSHOT AliMq Maven Webapp http://maven.apache.org org.springframework.boot spring-boot-starter-parent 2.0.5.RELEASE org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test com.aliyun.openservices ons-client 1.7.8.Final AliMq
3.
package com.zbb.alimq.app;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class AppAliMq { public static void main(String[] args) { SpringApplication.run(AppAliMq.class, args); }}
4.
package com.zbb.alimq.app.config;import java.util.HashMap;import java.util.Map;import java.util.Properties;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.aliyun.openservices.ons.api.MessageListener;import com.aliyun.openservices.ons.api.PropertyKeyConst;import com.aliyun.openservices.ons.api.bean.ConsumerBean;import com.aliyun.openservices.ons.api.bean.ProducerBean;import com.aliyun.openservices.ons.api.bean.Subscription;import com.zbb.alimq.app.util.AliMQConsumerListener;@Configurationpublic class AliMqConfig { @Value("${aliyunMq.producerId}") public String producerId; @Value("${aliyunMq.consumerId}") public String consumerId; @Value("${aliyunMq.jconsumerId}") public String jconsumerId; @Value("${aliyunMq.flightDelayConsumerId}") public String flightDelayConsumerId; @Value("${aliyunMq.accessKey}") public String accessKey; @Value("${aliyunMq.secretKey}") public String secretKey; @Value("${aliyunMq.topic}") public String topic; @Value("${aliyunMq.flightDelayTopic}") public String flightDelayTopic; @Value("${aliyunMq.tagDep}") public String tagDep; @Value("${aliyunMq.tagArr}") public String tagArr; @Value("${aliyunMq.tagFlightDelay}") public String tagFlightDelay; @Value("${aliyunMq.onsAddr}") public String onsAddr; // 超时时间 @Value("${aliyunMq.sendMsgTimeoutMillis}") public String sendMsgTimeoutMillis; @Value("${aliyunMq.suspendTimeMillis}") public String suspendTimeMillis; @Value("${aliyunMq.maxReconsumeTimes}") public String maxReconsumeTimes; @Bean(initMethod = "start", destroyMethod = "shutdown") public ProducerBean getProducer() { ProducerBean producerBean = new ProducerBean(); Properties properties = new Properties(); properties.put(PropertyKeyConst.ProducerId, producerId); // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.AccessKey, accessKey); // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.SecretKey, secretKey); properties.put(PropertyKeyConst.SendMsgTimeoutMillis, sendMsgTimeoutMillis); properties.put(PropertyKeyConst.ONSAddr, onsAddr); producerBean.setProperties(properties); return producerBean; } @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean getConsumer() { ConsumerBean consumerBean = new ConsumerBean(); Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, consumerId); // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.AccessKey, accessKey); // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.SecretKey, secretKey); properties.put(PropertyKeyConst.ONSAddr, onsAddr); consumerBean.setProperties(properties); Subscription subscription = new Subscription(); subscription.setTopic(topic); subscription.setExpression(tagDep); Mapmap = new HashMap(); map.put(subscription, new AliMQConsumerListener()); consumerBean.setSubscriptionTable(map); return consumerBean; }}
5.
package com.zbb.alimq.app.util;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.aliyun.openservices.ons.api.Action;import com.aliyun.openservices.ons.api.ConsumeContext;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.MessageListener;public class AliMQConsumerListener implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(AliMQConsumerListener.class); @Override public Action consume(Message message, ConsumeContext context) { String msg = ""; try { // do something.. msg = new String(message.getBody(), "UTF-8"); logger.info("订阅消息:" + msg); return Action.CommitMessage; } catch (Exception e) { // 消费失败 logger.info("消费失败:" + msg); return Action.ReconsumeLater; } }}
6.
package com.zbb.alimq.app.util;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.Producer;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.exception.ONSClientException;import com.zbb.alimq.app.config.AliMqConfig;@Component@RestController@RequestMapping("mq")public class AliMQUtil { private static final Logger logger = LoggerFactory.getLogger(AliMQUtil.class); @Autowired private AliMqConfig aliMQConfig; @Value("${aliyunMq.topic}") public String topic; // 发送消息 @Value("${aliyunMq.tagDep}") public String tagDep; @RequestMapping("sendMessage") public void sendMessage() { Producer producer = aliMQConfig.getProducer(); byte[] body = "123".getBytes(); Message msg = new Message(topic, tagDep, body); // msg.setKey(key); try { SendResult sendResult = producer.send(msg); if (sendResult != null) { logger.info("消息发送成功:" + sendResult.toString()); } } catch (ONSClientException e) { logger.info("消息发送失败:", e); // 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 } }}
7.application.yml
server: port: 9080 servlet: context-path: / tomcat: uri-encoding: UTF-8 max-threads: 1000 min-spare-threads: 30aliyunMq: producerId: * consumerId: * jconsumerId: * accessKey: * secretKey: * tagDep: * tagArr: * topic: * onsAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet sendMsgTimeoutMillis: 3000 suspendTimeMillis: 100 maxReconsumeTimes: 20 flightDelayConsumerId: * tagFlightDelay: * flightDelayTopic: *