RabbitMQ快速使用

27 天前
/
6
AI 生成的摘要

RabbitMQ快速使用

安装

docker run --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
--network hm-net \
--hostname my-rabbit \
-e RABBITMQ_DEFAULT_USER=user \
-e RABBITMQ_DEFAULT_PASS=password \
-v mq_plugins:/plugins \
-d rabbitmq:3.8-management

Spring Boot 集成

example:springboot-middlewave-example/springboot-rabbitmq at master · liyown/springboot-middlewave-example (github.com)

发送者

  1. 发送者超时重连(异步)配置
spring:  
  application:  
    name: publisher  
  rabbitmq:  
    host: 192.168.208.128  
    port: 5672  
    username: user  
    password: password  
    virtual-host: /hmall  
    # 发送confirm机制
    publisher-confirm-type: correlated  
    publisher-returns: true  
    # 重试机制
    template:  
      retry:  
        enabled: true  
        max-attempts: 3  
        initial-interval: 1000  
        max-interval: 10000  
        multiplier: 2  
server:  
  port: 7081
  1. 使用配置:
  • 配置消息转化服务,默认是JDK序列化,不易读,占用空间大。兼容性不高
@Bean  
public MessageConverter messageConverter() {  
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();  
    jackson2JsonMessageConverter.setCreateMessageIds(true);  
    return  jackson2JsonMessageConverter;  
}
  • 设置消息发送回调 回调有两种情况,一是到了交换机就会返回confirm信息,如果没有到达队列,将会回调ReturnsCallback,此时一般是运维层面的问题
    image.png|600

    image.png|600
public void init() {  
    rabbitTemplate.setReturnsCallback(  
            (ReturnedMessage returned) -> {  
                System.out.println("消息丢失: " + returned.getMessage());  
            }  
    );  
    rabbitTemplate.setBeforePublishPostProcessors((message) -> {  
        MessageProperties messageProperties = message.getMessageProperties();  
        String messageId = messageProperties.getMessageId();  
        System.out.println("messageId: " + messageId);  
        return message;  
    });  
  
}

JAVA API使用

  1. 带回调的使用:
public void testRabiitStart() throws InterruptedException {  
  
    CorrelationData  correlationData = new CorrelationData("1");  
    // 设置回调  
    rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {  
        System.out.println("cause: " + cause);  
        System.out.println("correlationData: " + correlationData1);  
        if (ack) {  
            System.out.println("消息发送成功");  
        } else {  
            System.out.println("消息发送失败");  
        }  
    });  
    // 队列名  
    String queueName = "queue.lyw";  
  
  
    // 发送消息, 如果没有队列会自动创建  
    rabbitTemplate.convertAndSend("direct.lyw", queueName, "hello rabbitmq", correlationData);  
  
}
  1. 延迟队列使用
public void testRabiitMqDelayMessage() {  
    CorrelationData  correlationData = new CorrelationData("1");  
    rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",  
                                  (Message message) -> {  
                                      message.getMessageProperties().setDelayLong(10000L);  
                                      return message;  
                                  }  
                                  , correlationData);  
}

消费者

spring配置

spring:  
  application:  
    name: publisher  
  rabbitmq:  
    host: 192.168.208.128  
    port: 5672  
    username: user  
    password: password  
    virtual-host: /hmall  
    listener:  
      simple:  
        # 表示自动回应,当发送异常,将重试,重试不成功发送到死信队列(三种策略)
        acknowledge-mode: auto  
        retry:  
          enabled: true  
          max-attempts: 3  
          initial-interval: 1000  
          max-interval: 10000  
          multiplier: 2  
          stateless: true  
server:  
  port: 7082

创建交换机、队列、绑定关系

@Bean  
public DirectExchange directExchange() {  
    return new DirectExchange("direct.lyw");  
}  
      
@Bean  
public Queue queue() {  
    return new Queue("queue.lyw");  
}  
      
@Bean  
public Binding binding(Queue queue, FanoutExchange fanoutExchange) {  
    return BindingBuilder.bind(queue).to(fanoutExchange);  
}

创建消息监听

@RabbitListener(queues = "work.queue1")  
public void onMessageWorkerQueue2(String message) throws InterruptedException {  
    Thread.sleep(200);  
    Thread thread = Thread.currentThread();  
    log.info("工作队列2接收到消息: {},时间:{},线程{}", message, LocalDateTime.now(), thread.getName());  
}

死信队列的初始化 当到达重试次数后,有三种策略,分别为

image.png|600

image.png|600

@Bean  
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){  
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");  
}

延迟队列的使用

安装插件(github) rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com) 复制插件到插件文件夹 安装插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

创建交换机、队列、绑定关系

@Bean  
public Queue delayQueue() {  
    return new Queue("delay.queue");  
}  
      
@Bean  
public DirectExchange delayExchange() {  
    return ExchangeBuilder.directExchange("delay.direct").delayed().durable(true).build();  
}  
      
@Bean Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {  
    return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.queue");  
}

发送消息

@Test  
public void testRabiitMqDelayMessage() {  
    CorrelationData  correlationData = new CorrelationData("1");  
    rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",  
                                  (Message message) -> {  
                                      message.getMessageProperties().setDelayLong(10000L);  
                                      return message;  
                                  }  
                                  , correlationData);  
}  
      
@Test  
public void testRabiitMqDelayMessage2() {  
    rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "1812793267355439105", message -> {  
        message.getMessageProperties().setDelayLong(10000L);  
        return message;  
    });  
}

消费延迟消息:和普通队列一致

  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • Loading...