RabbitMQ is the most widely deployed open source message broker.
RabbitMQ是一款非常受欢迎的开源消息队列中间件,在全世界的大小企业中超过35000个产品在使用。
RabbitMQ是轻量级的并且易于部署的。提供了多种消息协议、异步消息机制、多语言实现、分布式部署、多样化工具和插件以及管理与监控等丰富的功能。
接下来我们看下如何使用RabbitMQ进行消息的发布与订阅。
一、主要网址
RabbitMQ下载:http://www.rabbitmq.com/download.html
erlang:http://www.erlang.org/downloads
erlang支持版本表:http://www.rabbitmq.com/which-erlang.html
RabbitMQ配置项:http://www.rabbitmq.com/configure.html
二、安装
环境
本文演示使用window环境安装。
前提条件
下载RabbitMQ后安装之前必须安装支持当前RabbitMQ版本的Erlang。否则会提示你去下载。
RabbitMQ官网也提供了erlang版本的要求页面http://www.rabbitmq.com/which-erlang.html,页面上可以查看RabbitMQ与erlang的各个版本支持关系。
我准备使用的RabbitMQ版本是3.7.7,图中可以看出19.3.6.4以前的Erlang/OTP版本目前还不支持该版本。官网推荐最好选择GA稳定的erlang版本下载。接下来就是去erlang官网上下载我们要求的版本了。根据RabbitMQ官网推荐的我选择20.3的GA文档版本。
安装完erlang,RabbitMQ已经可以安装了。
三、运行与管理RabbitMQ
安装完RabbitMQ后会自动运行,可以通过命令services.msc在电脑的服务列表看到一个RabbitMQ的服务正在运行。
将安装目录下的sbin目录添加到环境变量下可以进行命令行管理,停止、查看状态等
- 查看帮助:
rabbitmqctl help
- 查看状态:
rabbitmqctl status
- 停止:
rabbitmqctl stop
或者rabbitmqctl shutdown
停止命令短暂停止后马上又自动启动了,可以在电脑服务列表手动停止
四、使用java创建客户端进行收/发消息
RabbitMQ需要需要200M的硬盘容量,否则可能发送/接受不了消息。检查RabbitMQ日志文件(logfile)看当前配置信息,必要时减少限制。可以通过网站 http://www.rabbitmq.com/configure.html (disk_free_limit项)查看如何配置。
- 导入依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> </dependency>
- 创建发送消息类
package com.huangtl.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 发布消息测试 */ public class SendTest { private final static String queue = "test"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection conn = factory.newConnection(); //创建一个通道 Channel channel = conn.createChannel(); //定义一个队列queue,我们可以向队列发布消息: //声明一个队列是等幂的(只会在不存在的时候创建),消息是一个字节数组,你可以发送任何内容 channel.queueDeclare(queue,false,false,false,null); String message = "hello world"; channel.basicPublish("",queue,null,message.getBytes()); System.out.println("sent '"+message+"'"); //关闭通道和连接 channel.close(); conn.close(); } }
- 创建消费者类接收消息
package com.huangtl.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接收者/消费者(consumer),监听队列消息 */ public class ReciveTest { private final static String queue = "test"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection conn = factory.newConnection(); Channel channe = conn.createChannel(); //因为实际中消费者可能先与生产者启动,有可能生产者还没创建队列,所以使用前要确保队列存在,没有则会创建 channe.queueDeclare(queue,false,false,false,null); System.out.println("waiting for messsage."); //定义一个消费者,并重写接收消息方法 Consumer consumer = new DefaultConsumer(channe) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("接收到消息:"+message); } }; channe.basicConsume(queue,true,consumer); } }
查看结果
运行ReciveTest.java的main方法,监听等待消息。
运行SendTest.java的main方法每次都会在监听ReciveTest.java的程序控制台打印一次消息。
以下是运行三次SendTest的main方法后控制台截图。
本教程参考官方网站教程:http://www.rabbitmq.com/tutorials/tutorial-one-java.html