DotNET 环境下使用开源消息中间件RabbitMQ
使用DotNet开发的应用软件,经常会用到消息推送,广播等,比如网站中实时聊天,实时接收推送过来的消息或者实时发送消息到其它平台或应用系统中,诸如此类的等等,这里最好使用消息中间件来实现,以下就 教大家在.NET环境下使用RabbitMQ,如有讲解不到位的地方,希望批评指正!
一、RabbitMQ服务器安装(windows环境)
1.1、安装Erlang
下载地址:
https://www.erlang.org/downloads,本文选择OTP 23.2 Windows 64-bit Binary File (109672744)
设置环境变量,新建ERLANG_HOME
修改环境变量path,增加Erlang变量至path,%ERLANG_HOME%\bin;
打开cmd命令框,输入erl
OK,到这里,Erlang 安装就完成了。
1.2、安装rabbitmq
下载地址:
http://www.rabbitmq.com/download.html
exe安装地址:
http://www.rabbitmq.com/install-windows.html
解压缩安装地址:
http://www.rabbitmq.com/install-windows-manual.html
本文选择解压缩安装
rabbitmq-server-windows-3.8.14.zip
将
rabbitmq-server-windows-3.8.14.zip解压缩至D:\Program Files目录下
设置环境变量,新建RABBITMQ_SERVER
修改环境变量path,增加rabbitmq变量至path,%RABBITMQ_SERVER%\sbin;
打开cmd命令框,切换至D:\Program Files\rabbitmq_server-3.8.14\sbin目录下,输入rabbitmqctl status
如果rabbmitmq未启动,先启动服务。
安装插件,命令:rabbitmq-plugins.bat enable rabbitmq_management,出现:
输入命令:rabbitmq-server.bat
看日志,说明已经启动过了。
rabbitmq启动成功,浏览器中http://localhost:15672
打开cmd,再次输入命令:rabbitmqctl status
到这里,rabbitMQ安装部署完成。
二、RabbitMQ服务器开启Websocket插件
使用命令行:rabbitmq-plugins enable rabbitmq_web_stomp
开启示例:rabbitmq-plugins enable
rabbitmq_web_stomp_examples
示例地址: http://127.0.0.1:15670/
默认Websocket采用15674端口
三、RabbitMQ主要概念及工作原理
RabbitMQ是一个开源的消息代理软件。它接收生产者发布的消息并发送给消费者。它扮演中间商的角色,可以用来降低web服务器因发送消息带来的负载以及延时。
RabbitMQ如何工作的?
我们来简单看看RabbitMQ是如何工作的。
首先来看看RabbitMQ里的几个重要概念:
- 生产者(Producer):发送消息的应用。
- 消费者(Consumer):接收消息的应用。
- 队列(Queue):存储消息的缓存。
- 消息(Message):由生产者通过RabbitMQ发送给消费者的信息。
- 连接(Connection):连接RabbitMQ和应用服务器的TCP连接。
- 通道(Channel):连接里的一个虚拟通道。当你通过消息队列发送或者接收消息时,这个操作都是通过通道进行的。
- 交换机(Exchange):交换机负责从生产者那里接收消息,并根据交换类型分发到对应的消息列队里。要实现消息的接收,一个队列必须到绑定一个交换机。
- 绑定(Binding):绑定是队列和交换机的一个关联连接。
- 路由键(Routing Key):路由键是供交换机查看并根据键来决定如何分发消息到列队的一个键。路由键可以说是消息的目的地址。
生产者(Producer)发送/发布消息到代理->消费者(Consumer)从代理那里接收消息。哪怕生产者和消费者运行在不同的机器上,RabbitMQ也能扮演代理中间件的角色。
当生产者发送消息时,它并不是直接把消息发送到队列里的,而是使用交换机(Exchange)来发送。下面的设计图简单展示了这三个主要的组件之间是如何连接起来的。
交换机代理(exchange agent)负责把消息分发到不同的队列里。这样的话,消息就能够从生产者发送到交换机,然后被分发到消息队列里。这就是常见的“发布”方法。
然后,消息会被消费者从队列里读取并消费,这就是“消费”。
往多个队列里发送消息
对一个复杂的应用而言,往往会有多个消息队列,所以消息也会被发往多个队列。
给带有多个队列的交换机发送的消息是通过绑定和路由键来进行分发的。绑定是你设置的用来连接一个队列和交换机的连接。路由键是消息的一个属性。交换机会根据路由键来决定消息分发到哪个队列里(取决于交换机的类型)。
交换机(Exchange)
消息并不是直接发布到队里里的,而是被生产者发送到一个交换机上。交换机负责把消息发布到不同的队列里。交换机从生产者应用上接收消息,然后根据绑定和路由键将消息发送到对应的队列里。绑定是交换机和队列之间的一个关系连接。
RabbitMQ里的消息流程
- 生产者(producer)把消息发送给交换机。当你创建交换机的时候,你需要指定类型。交换机的类型接下来会讲到。
- 交换机(exchange)接收消息并且负责对消息进行路由。根据交换机的类型,消息的多个属性会被使用,例如路由键。
- 绑定(binding)需要从交换机到队列的这种方式来进行创建。在这个例子里,我们可以看到交换机有到两个不同队列的绑定。交换机根据消息的属性来把消息分发到不同的队列上。
- 消息(message)消息会一直留在队列里直到被消费。
- 消费者(consumer)处理消息。
交换机类型
- 直接(Direct):直接交换机通过消息上的路由键直接对消息进行分发。
- 扇出(Fanout):一个扇出交换机会将消息发送到所有和它进行绑定的队列上。
- 主题(Topic):这个交换机会将路由键和绑定上的模式进行通配符匹配。
- 消息头(Headers):消息头交换机使用消息头的属性进行消息路由。
RabbitMQ主要概念
- 生产者(Producer):发送消息的应用。
- 消费者(Consumer):接收消息的应用。
- 队列(Queue):存储消息的缓存。
- 消息(Message):由生产者通过RabbitMQ发送给消费者的信息。
- 连接(Connection):连接RabbitMQ和应用服务器的TCP连接。
- 通道(Channel):连接里的一个虚拟通道。当你通过消息队列发送或者接收消息时,这个操作都是通过通道进行的。
- 交换机(Exchange):从生产者那里接收消息,并根据交换类型分发到对应的消息列队里。要实现消息的接收,一个队列必须绑定一个交换机。
- 绑定(Binding):绑定是队列和交换机的一个链接。
- 路由键(Routing Key):路由键是供交换机查看并根据键的值来决定如何分发消息到列队的一个键。路由键可以说是消息的目的地址。
- AMQP:AMQP(高级消息队列协议Advanced Message Queuing Protocol)是RabbitMQ使用的消息协议。
- 用户(Users):在RabbitMQ里,是可以通过指定的用户名和密码来进行连接的。每个用户可以分配不同的权限,例如读权限,写权限以及在实例里进行配置的权限。
四、JS代码
网页中JS使用RabbitMQ,可以开启Websocket,采用Websocket,直接使用已有的stomp.js
var ws = new WebSocket('ws://127.0.0.1:15674/ws'); var client = Stomp.over(ws); var on_connect = function() { console.log('connected'); $("#incomming").html('成功连接服务器....'); client.subscribe("/queue/user1", function(data) { var msg = data.body; console.log("收到数据:" + msg); $("#incomming").html('收到消息....' + msg); }); client.subscribe("/exchange/egood.all", function(data) { var msg = data.body; console.log("收到广播数据:" + msg); $("#incomming").html('收到广播消息....' + msg); }); }; var on_error = function() { console.log('error'); $("#incomming").html('发生错误......'); }; client.connect('egood', 'egood', on_connect, on_error, '/'); function send_msg() { if (client === undefined) { alert("websocket还未连接") return; } client.send('/queue/user2', {}, $("#sendText").val()); } |
五、.NET代码
.NET使用RabbitMQ,需要 RabbitMQ.Client,支持 .net Framework 2.0 及以上
5.1发送消息到服务器-生产者(Producer)
//发送广播消息 using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "egood"; factory.Password = "egood"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { string message = txt_bo.Text; //消息内容 channel.ExchangeDeclare("egood.all", "fanout"); //channel.QueueDeclare("uservv", true, false, false, null); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("egood.all", "", null, body); } } |
//发送到消息队列 using RabbitMQ.Client; using RabbitMQ.Client.Events; var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "egood"; factory.Password = "egood"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { string message = txt_dot.Text; channel.QueueDeclare("user1", true, false, false, null); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 1; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", "user1", properties, body); } } |
5.2接收消息-消费者(Consumer)
using RabbitMQ.Client; using RabbitMQ.Client.Events; //使用线程接收广播消息 Thread bt = new Thread(() => { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "egood"; factory.Password = "egood"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare("egood.all", "fanout");//广播 QueueDeclareOk queueOk = channel.QueueDeclare(); string queueName = queueOk.QueueName; channel.QueueBind(queueName, "egood.all", ""); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queueName, true, consumer); while (true) { var ev = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ev.Body; //消息内容 var message = Encoding.UTF8.GetString(body); var dots = message.Split('.').Length - 1 ; Thread.Sleep(dots * 1000); Show(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + " -> " + message + "\r\n"); } } } }); bt.Start(); |
using RabbitMQ.Client; using RabbitMQ.Client.Events; //接收消息... Thread t = new Thread(() => { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.UserName = "egood"; factory.Password = "egood"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare("user2", true, false, false, null); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume("user2", true, consumer); while (true) { var ev = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ev.Body; //内容 var message = Encoding.UTF8.GetString(body); var dots = message.Split('.').Length - 1 ; Thread.Sleep(dots * 1000); Show(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + " -> " + message + "\r\n"); } } } }); t.Start(); |