RabbitMQ is the most widely deployed open source message broker.

RabbitMQ是一款非常受欢迎的开源消息队列中间件,在全世界的大小企业中超过35000个产品在使用。

RabbitMQ是轻量级的并且易于部署的。提供了多种消息协议、异步消息机制、多语言实现、分布式部署、多样化工具和插件以及管理与监控等丰富的功能。

接下来我们看下如何使用RabbitMQ进行消息的发布与订阅。

一、主要网址

RabbitMQ下载:http://www.rabbitmq.com/download.html
erlang:http://www.erlang.org/downloads
erlang支持版本表:http://www.rabbitmq.com/which-erlang.html
RabbitMQ配置项:http://www.rabbitmq.com/configure.html

二、安装

环境

本文演示使用window环境安装。

前提条件

下载RabbitMQ后安装之前必须安装支持当前RabbitMQ版本的Erlang。否则会提示你去下载。

RabbitMQ官网也提供了erlang版本的要求页面http://www.rabbitmq.com/which-erlang.html,页面上可以查看RabbitMQ与erlang的各个版本支持关系。

我准备使用的RabbitMQ版本是3.7.7,图中可以看出19.3.6.4以前的Erlang/OTP版本目前还不支持该版本。官网推荐最好选择GA稳定的erlang版本下载。接下来就是去erlang官网上下载我们要求的版本了。根据RabbitMQ官网推荐的我选择20.3的GA文档版本。

安装完erlang,RabbitMQ已经可以安装了。

三、运行与管理RabbitMQ

安装完RabbitMQ后会自动运行,可以通过命令services.msc在电脑的服务列表看到一个RabbitMQ的服务正在运行。

将安装目录下的sbin目录添加到环境变量下可以进行命令行管理,停止、查看状态等

四、使用java创建客户端进行收/发消息

RabbitMQ需要需要200M的硬盘容量,否则可能发送/接受不了消息。检查RabbitMQ日志文件(logfile)看当前配置信息,必要时减少限制。可以通过网站 http://www.rabbitmq.com/configure.html (disk_free_limit项)查看如何配置。

  1. 导入依赖
     <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>4.0.2</version>
     </dependency>
    
  2. 创建发送消息类
     package com.huangtl.rabbitmq;
    
     import com.rabbitmq.client.Channel;
     import com.rabbitmq.client.Connection;
     import com.rabbitmq.client.ConnectionFactory;
    
     import java.io.IOException;
     import java.util.concurrent.TimeoutException;
    
     /**
      * 发布消息测试
      */
     public class SendTest {
    
         private final static String queue = "test";
    
         public static void main(String[] args) throws IOException, TimeoutException {
             ConnectionFactory factory = new ConnectionFactory();
             factory.setHost("localhost");
             Connection conn = factory.newConnection();
    
             //创建一个通道
             Channel channel = conn.createChannel();
    
             //定义一个队列queue,我们可以向队列发布消息:
             //声明一个队列是等幂的(只会在不存在的时候创建),消息是一个字节数组,你可以发送任何内容
             channel.queueDeclare(queue,false,false,false,null);
    
             String message = "hello world";
             channel.basicPublish("",queue,null,message.getBytes());
             System.out.println("sent '"+message+"'");
    
             //关闭通道和连接
             channel.close();
             conn.close();
    
         }
     }
    
  3. 创建消费者类接收消息
     package com.huangtl.rabbitmq;
    
     import com.rabbitmq.client.*;
    
     import java.io.IOException;
     import java.util.concurrent.TimeoutException;
    
     /**
      * 接收者/消费者(consumer),监听队列消息
      */
     public class ReciveTest {
    
         private final static String queue = "test";
    
         public static void main(String[] args) throws IOException, TimeoutException {
             ConnectionFactory factory = new ConnectionFactory();
             factory.setHost("localhost");
             Connection conn = factory.newConnection();
    
             Channel channe = conn.createChannel();
             //因为实际中消费者可能先与生产者启动,有可能生产者还没创建队列,所以使用前要确保队列存在,没有则会创建
             channe.queueDeclare(queue,false,false,false,null);
             System.out.println("waiting for messsage.");
    
             //定义一个消费者,并重写接收消息方法
             Consumer consumer = new DefaultConsumer(channe) {
    
                 @Override
                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                     String message = new String(body,"UTF-8");
                     System.out.println("接收到消息:"+message);
                 }
             };
    
             channe.basicConsume(queue,true,consumer);
         }
     }
    

查看结果

运行ReciveTest.java的main方法,监听等待消息。
运行SendTest.java的main方法每次都会在监听ReciveTest.java的程序控制台打印一次消息。
以下是运行三次SendTest的main方法后控制台截图。

本教程参考官方网站教程:http://www.rabbitmq.com/tutorials/tutorial-one-java.html