您现在的位置是:首页 > 正文

RabbitMQ:RabbitMQ + Spring配置文件rabbit标签

2024-02-01 03:19:04阅读 2

RabbitMQ:RabbitMQ + Spring配置文件rabbit标签

1.消费者配置文件和启动类:

 【Consumer.xml】:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <!-- 连接服务配置  -->
    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="caoxia"
                               password="caoxia123456" port="5672" virtual-host="/"  channel-cache-size="5" />

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- queue 队列声明-->
    <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/>


    <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <bean id="receiveMessageListener"
          class="com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml.ReceiveMessageListener" />

    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
        <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" />
    </rabbit:listener-container>

    <!--&lt;!&ndash; spring template声明 &ndash;&gt;-->
    <!--<rabbit:template id="amqpTemplate" exchange="spring.queue.exchange"  routing-key="spring.queue.tag.key"-->
                     <!--connection-factory="connectionFactory" message-converter="jsonMessageConverter" />-->

    <!--<bean id="jsonMessageConverter"-->
          <!--class="com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml.Gson2JsonMessageConverter" />-->
</beans>

【ConsumerMain.java】:

package com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml;

/**
 * Created by nazi on 2018/7/30.
 */
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ConsumerMain {

    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("spring-rabbit-label/Consumer.xml");

    }
}

2.生产者配置文件和启动类: 

【Producer.xml】:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <!-- 连接服务配置 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="127.0.0.1" username="caoxia" password="caoxia123456" port="5672"
                               virtual-host="/" channel-cache-size="5" />

    <rabbit:admin connection-factory="connectionFactory" />

    <!-- queue 队列声明 -->
    <rabbit:queue  durable="true"
                   auto-delete="false" exclusive="false" name="spring.queue.tag" />

    <!-- 转发器类型标签:rabbit:fanout-exchange、rabbit:direct-exchange、rabbit:topic-exchange、rabbit:headers-exchange-->
    <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="spring.queue.exchange"
                            durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 -->
    <bean id="jsonMessageConverter"
          class="com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml.Gson2JsonMessageConverter" />

    <!-- spring template声明 -->
    <rabbit:template id="amqpTemplate" exchange="spring.queue.exchange"  routing-key="spring.queue.tag.key"
                     connection-factory="connectionFactory" message-converter="jsonMessageConverter" />

    <!-- 创建rabbitTemplate 消息模板类 -->
    <!--<bean id="rabbitTemplate"-->
          <!--class="org.springframework.amqp.rabbit.core.RabbitTemplate">-->
        <!--<constructor-arg ref="connectionFactory"></constructor-arg>-->
        <!--<property name="queue" value="spring.queue.tag"></property>-->
        <!--<property name="routingKey" value="spring.queue.tag.key"></property>-->
    <!--</bean>-->
</beans>

 【ProducerMain.java】:

package com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml;

/**
 * Created by nazi on 2018/7/30.
 */

import com.caox.sharding.entity.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ProducerMain {

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbit-label/Producer.xml");
        AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class);
        User user = new User();
        user.setName("niuniu");
        amqpTemplate.convertAndSend(user);
    }
}

3.实现一个消息监听器ReceiveMessageListener.java

package com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml;

import org.springframework.amqp.core.Message;

/**
 * Listener interface to receive asynchronous delivery of Amqp Messages.
 *
 * Created by nazi on 2018/7/30.
 */
public interface MessageListener {

    void onMessage(Message message);

}

4.messageconver插件实现(Gson)

spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现

【pom.xml引入jar包】:

<dependency>
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
       <version>2.3</version>
</dependency>

【Gson2JsonMessageConverter.java 】:

package com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;

import com.google.gson.Gson;

/**
 * Created by nazi on 2018/7/30.
 * 由于考虑到效率,如下使用Gson实现消息转换。
 */
public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {

    private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);

    private static  ClassMapper classMapper =  new DefaultClassMapper();

    private static Gson gson = new Gson();

    public Gson2JsonMessageConverter() {
        super();
    }

    @Override
    protected Message createMessage(Object object,
                                    MessageProperties messageProperties) {
        byte[] bytes = null;
        try {
            String jsonString = gson.toJson(object);
            bytes = jsonString.getBytes(getDefaultCharset());
        }
        catch (IOException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(getDefaultCharset());
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        classMapper.fromClass(object.getClass(),messageProperties);
        return new Message(bytes, messageProperties);
    }

    @Override
    public Object fromMessage(Message message)
            throws MessageConversionException {
        Object content = null;
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
            String contentType = properties.getContentType();
            if (contentType != null && contentType.contains("json")) {
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
                    encoding = getDefaultCharset();
                }
                try {
                    Class<?> targetClass = getClassMapper().toClass(
                            message.getMessageProperties());
                    content = convertBytesToObject(message.getBody(),
                            encoding, targetClass);
                }
                catch (IOException e) {
                    throw new MessageConversionException(
                            "Failed to convert Message content", e);
                }
            }
            else {
                log.warn("Could not convert incoming message with content-type ["
                        + contentType + "]");
            }
        }
        if (content == null) {
            content = message.getBody();
        }
        return content;
    }

    private Object convertBytesToObject(byte[] body, String encoding,
                                        Class<?> clazz) throws UnsupportedEncodingException {
        String contentAsString = new String(body, encoding);
        return gson.fromJson(contentAsString, clazz);
    }
}

 

 

网站文章

  • 3.1 TensorFlow计算模型 --- 计算图

      TensorFlow程序一般可以分为两个阶段。在第一个阶段需要定义计算图中的所有的计算。第二个阶段为执行计算。   在TensorFlow程序中,系统会自动维护一个默认的计算图,通过tf.get_default_graph函数可以获取当前默认的计算图。   除去使用默认的计算图,TensorFlow支持通过tf.Graph函数生成新的计算图。不同计算图上的张量和运算都不...

    2024-02-01 03:18:47
  • 阿里云数据库白名单导致的数据库链接不上问题

    百度搜索 IP 获取外网IP。修改白名单

    2024-02-01 03:18:17
  • MyEclipse设置代码提示功能 热门推荐

    MyEclipse设置代码提示功能 热门推荐

    写代码的时候为什么能给我们提示?你知道提示分为手动提示和自动提示么?今天我们就以MyEclipse为例讲一下MyEclipse手动提示和自动提示如何设置。在开始这篇博客之前,我们先来说一下代码的手动提...

    2024-02-01 03:18:11
  • 互联网随想(二): 带宽与拥塞控制

    由于北方在冬季蔬菜资源紧缺,所以在过冬前需要储存大量蔬菜,在整个冬天就吃这些蔬菜,这就好比是下载,其过程非常贪婪,大多数人家整个屋子里堆满了大白菜,大葱,土豆,如果不精打细算,相当大一部分要损耗掉,很...

    2024-02-01 03:18:04
  • 计算机学科所预见未来,四电四邮:全方位培养中国IT界人才的摇篮,毕业生名企抢着要!...

    计算机学科所预见未来,四电四邮:全方位培养中国IT界人才的摇篮,毕业生名企抢着要!...

    IT,即信息技术,自有IT专业以来,它就一直是最热门的专业之一,并且可以预见未来也必将会经久不衰。IT类专业主要包含四大学科,即:电子科学与技术、信息与通信工程、控制科学与工程、计算机科学与技术。目前...

    2024-02-01 03:17:35
  • ipa打包

    http://blog.csdn.net/hengshujiyi/article/details/9159531

    2024-02-01 03:17:30
  • swagger2.x升级swagger3.0

    swagger的2.x版本都是需要导入两个依赖包的,而3.0仅需要导入一个依赖,据说这是swagger社区整合的,但是个人感觉还不错,因为2.X已经停更了。 所以本人就对2.X系列进行升级,过程还是挺简单的。 首先注释掉原本的两个依赖,然后导入新的依赖。 io.springfox springfox-boot-starter

    2024-02-01 03:17:22
  • 求是否有整数解

    点击打开链接 Problem Description 有二个整数,它们加起来等于某个整数,乘起来又等于另一个整数,它们到底是真还是假,也就是这种整数到底存不存在,实在有点吃不准,你能快速回答吗?看来只能通过编程。 例如: x + y = 9,x * y = 15 ? 找不到这样的整数x和y 1+4=5,1*4=4,所以,加起来等于5,乘起来等于4的二个整数为1和4 7+(-8)=-

    2024-02-01 03:17:17
  • Markdown基本用法

    Markdown基本用法

    在想要变为斜体的两端加上‘*’使之变为斜体文字,例如 *hello world。最多有六级标题,在文字前打# ,例###### hello world。在两端加上ESC下的波浪键‘~’,例:~~hel...

    2024-02-01 03:16:49
  • 监控利器:普罗米修斯介绍和安装

    监控利器:普罗米修斯介绍和安装

    我们的程序想要稳定的运行,或者说当出现问题时能第一时间知道,这就离不开监控,目前比较主流的就是 Prometheus(普罗米修斯)+ Grafana 的组合。准备用三篇文章来介绍怎么使用:1、基本介绍...

    2024-02-01 03:16:42