博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zbb20190430 springboot 配置alimq
阅读量:5050 次
发布时间:2019-06-12

本文共 7564 字,大约阅读时间需要 25 分钟。

 源码地址

 

 

1.

2.pom

4.0.0
com.zbb.mq.app
AliMq
war
0.0.1-SNAPSHOT
AliMq Maven Webapp
http://maven.apache.org
org.springframework.boot
spring-boot-starter-parent
2.0.5.RELEASE
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
com.aliyun.openservices
ons-client
1.7.8.Final
AliMq

3.

package com.zbb.alimq.app;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class AppAliMq {    public static void main(String[] args) {        SpringApplication.run(AppAliMq.class, args);    }}

4.

package com.zbb.alimq.app.config;import java.util.HashMap;import java.util.Map;import java.util.Properties;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.aliyun.openservices.ons.api.MessageListener;import com.aliyun.openservices.ons.api.PropertyKeyConst;import com.aliyun.openservices.ons.api.bean.ConsumerBean;import com.aliyun.openservices.ons.api.bean.ProducerBean;import com.aliyun.openservices.ons.api.bean.Subscription;import com.zbb.alimq.app.util.AliMQConsumerListener;@Configurationpublic class AliMqConfig {    @Value("${aliyunMq.producerId}")    public String producerId;    @Value("${aliyunMq.consumerId}")    public String consumerId;    @Value("${aliyunMq.jconsumerId}")    public String jconsumerId;    @Value("${aliyunMq.flightDelayConsumerId}")    public String flightDelayConsumerId;    @Value("${aliyunMq.accessKey}")    public String accessKey;    @Value("${aliyunMq.secretKey}")    public String secretKey;    @Value("${aliyunMq.topic}")    public String topic;    @Value("${aliyunMq.flightDelayTopic}")    public String flightDelayTopic;    @Value("${aliyunMq.tagDep}")    public String tagDep;    @Value("${aliyunMq.tagArr}")    public String tagArr;    @Value("${aliyunMq.tagFlightDelay}")    public String tagFlightDelay;    @Value("${aliyunMq.onsAddr}")    public String onsAddr;    // 超时时间    @Value("${aliyunMq.sendMsgTimeoutMillis}")    public String sendMsgTimeoutMillis;    @Value("${aliyunMq.suspendTimeMillis}")    public String suspendTimeMillis;    @Value("${aliyunMq.maxReconsumeTimes}")    public String maxReconsumeTimes;    @Bean(initMethod = "start", destroyMethod = "shutdown")    public ProducerBean getProducer() {        ProducerBean producerBean = new ProducerBean();        Properties properties = new Properties();        properties.put(PropertyKeyConst.ProducerId, producerId);        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建        properties.put(PropertyKeyConst.AccessKey, accessKey);        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建        properties.put(PropertyKeyConst.SecretKey, secretKey);        properties.put(PropertyKeyConst.SendMsgTimeoutMillis, sendMsgTimeoutMillis);        properties.put(PropertyKeyConst.ONSAddr, onsAddr);        producerBean.setProperties(properties);        return producerBean;    }    @Bean(initMethod = "start", destroyMethod = "shutdown")    public ConsumerBean getConsumer() {        ConsumerBean consumerBean = new ConsumerBean();        Properties properties = new Properties();        properties.put(PropertyKeyConst.ConsumerId, consumerId);        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建        properties.put(PropertyKeyConst.AccessKey, accessKey);        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建        properties.put(PropertyKeyConst.SecretKey, secretKey);        properties.put(PropertyKeyConst.ONSAddr, onsAddr);        consumerBean.setProperties(properties);        Subscription subscription = new Subscription();        subscription.setTopic(topic);        subscription.setExpression(tagDep);        Map
map = new HashMap(); map.put(subscription, new AliMQConsumerListener()); consumerBean.setSubscriptionTable(map); return consumerBean; }}

5.

package com.zbb.alimq.app.util;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.aliyun.openservices.ons.api.Action;import com.aliyun.openservices.ons.api.ConsumeContext;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.MessageListener;public class AliMQConsumerListener implements MessageListener {    private static final Logger logger = LoggerFactory.getLogger(AliMQConsumerListener.class);    @Override    public Action consume(Message message, ConsumeContext context) {        String msg = "";        try { // do something..            msg = new String(message.getBody(), "UTF-8");            logger.info("订阅消息:" + msg);            return Action.CommitMessage;        } catch (Exception e) { // 消费失败            logger.info("消费失败:" + msg);            return Action.ReconsumeLater;        }    }}

6.

package com.zbb.alimq.app.util;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.Producer;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.exception.ONSClientException;import com.zbb.alimq.app.config.AliMqConfig;@Component@RestController@RequestMapping("mq")public class AliMQUtil {    private static final Logger logger = LoggerFactory.getLogger(AliMQUtil.class);    @Autowired    private AliMqConfig aliMQConfig;    @Value("${aliyunMq.topic}")    public String topic; // 发送消息    @Value("${aliyunMq.tagDep}")    public String tagDep;    @RequestMapping("sendMessage")    public void sendMessage() {        Producer producer = aliMQConfig.getProducer();        byte[] body = "123".getBytes();        Message msg = new Message(topic, tagDep, body); // msg.setKey(key);        try {            SendResult sendResult = producer.send(msg);            if (sendResult != null) {                logger.info("消息发送成功:" + sendResult.toString());            }        } catch (ONSClientException e) {            logger.info("消息发送失败:", e);            // 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。        }    }}

7.application.yml

server:  port: 9080  servlet:    context-path: /  tomcat:    uri-encoding: UTF-8    max-threads: 1000    min-spare-threads: 30aliyunMq:   producerId: *  consumerId: *  jconsumerId: *  accessKey: *  secretKey: *  tagDep: *  tagArr: *  topic: *  onsAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet  sendMsgTimeoutMillis: 3000  suspendTimeMillis: 100  maxReconsumeTimes: 20  flightDelayConsumerId: *  tagFlightDelay: *  flightDelayTopic: *

 

转载于:https://www.cnblogs.com/super-admin/p/10794937.html

你可能感兴趣的文章
Velocity脚本简明教程
查看>>
虚拟机类加载机制
查看>>
RTSP流媒体数据传输的两种方式(TCP和UDP)
查看>>
大数n!
查看>>
LPC-LINK 2 LPC4370 简化线路图
查看>>
【模板】关于vector的lower_bound和upper_bound以及vector基本用法 STL
查看>>
linux c动态库编译好了,不能用。有些方法报(undefined reference)错误。
查看>>
在CentOS 6.5 中安装JDK 1.7 + Eclipse并配置opencv的java开发环境(二)
查看>>
docker 安装与卸载
查看>>
“搜狐微博零估值”用意何在
查看>>
如何区分 OpenStack Neutron Extension 和 Plugin
查看>>
简述人工智能发展的先决条件
查看>>
AWS API 2.0签名规范
查看>>
MVC3 系统列讲解
查看>>
很开心
查看>>
Codeforces 388 D. Fox and Perfect Sets
查看>>
货币计算程序
查看>>
在析构函数中关闭 SqlConnection 连接
查看>>
对于C#中的一些点滴你真的理解了吗?
查看>>
结对编程项目--电梯调度
查看>>