四、RabbitMQ的消息模式
一共有7种消息模式
第一种:直连模式
添加虚拟主机
授权:
消息发送者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package com.saxon;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import org.junit.Test;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Provider { @Test public void testSendMessage () throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory (); connectionFactory.setHost ("192.168.44.128" ); connectionFactory.setPort (5672 ); connectionFactory.setVirtualHost ("ems" ); connectionFactory.setUsername ("saxon" ); connectionFactory.setPassword ("saxon" ); Connection connection = connectionFactory.newConnection (); Channel channel = connection.createChannel (); channel.queueDeclare ("hello" , false , false , false , null ); channel.basicPublish ("" ,"hello" ,null ,"hello world" .getBytes ()); } }
发送消息:
1 2 3 4 5 6 7 8 9 10 * @see com.rabbitmq.client.AMQP.Basic.Publish * @see <a href="https://www.rabbitmq.com/alarms.html">Resource-driven alarms</a> * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish (String exchange, String routingKey, BasicProperties props, byte [] body) throws IOException ;channel.basicPublish ("" ,"hello" ,null ,"hello world" .getBytes ());
从上面我们可以明白一件事就是,消息队列可以发送给不同的人,并且取决于第二个参数的设置
消息消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package com.saxon;import com.rabbitmq.client.*;import org.junit.Test;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer { public static void testGetMessage () throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory (); connectionFactory.setHost ("192.168.44.128" ); connectionFactory.setPort (5672 ); connectionFactory.setVirtualHost ("ems" ); connectionFactory.setUsername ("saxon" ); connectionFactory.setPassword ("saxon" ); Connection connection = connectionFactory.newConnection (); Channel channel = connection.createChannel (); channel.queueDeclare ("hello" , false , false , false , null ); channel.basicConsume ("hello" , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println ("接收的消息" + new String (body)); } }); } }
由于Junit的性质:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package com.saxon;import java.io.IOException;import java.util.concurrent.TimeoutException;public class RabbitMQ { public static void main (String[] args) throws IOException, TimeoutException { Consumer.testGetMessage (); } }
结果:
1 2 3 4 5 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html 接收的消息hello world 接收的消息hello world
第二种、工作模型 为了解决我们的消息队列产生消息快,但是消费者消费慢,导致消息队列被阻塞的问题;RabbitMQ就有一种模式叫做工作模式,多个消费者连接同一个消息队列消费消息;
代码如下:
provider:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.saxon.workquene;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.MessageProperties;import com.saxon.utils.RabbitMQUtils;import java.util.Objects;public class WorkProvider { public static void main (String[] args) throws Exception { Connection connect = RabbitMQUtils.getConnect (); Channel channel = Objects.requireNonNull (connect).createChannel (); channel.queueDeclare ("hello" , false , false , false , null ); for (int i = 0 ; i < 10 ; i++) { channel.basicPublish ("" , "hello" , MessageProperties.PERSISTENT_TEXT_PLAIN, (i+"hello world" ).getBytes ()); } } }
消费者一号:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.saxon.workquene;import com.rabbitmq.client.*;import com.saxon.utils.RabbitMQUtils;import java.io.IOException;import java.util.Objects;public class WorkConsumer01 { public static void main (String[] args) throws Exception { Connection connect = RabbitMQUtils.getConnect (); Channel channel = Objects.requireNonNull (connect).createChannel (); channel.queueDeclare ("hello" , false , false , false , null ); channel.basicConsume ("hello" ,new DefaultConsumer (channel){ @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println ("消费者一:" +new String (body)); } }); } }
消费者二号:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.saxon.workquene;import com.rabbitmq.client.*;import com.saxon.utils.RabbitMQUtils;import java.io.IOException;import java.util.Objects;public class WorkConsumer02 { public static void main (String[] args) throws Exception { Connection connect = RabbitMQUtils.getConnect (); Channel channel = Objects.requireNonNull (connect).createChannel (); channel.queueDeclare ("hello" , false , false , false , null ); channel.basicConsume ("hello" ,new DefaultConsumer (channel){ @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println ("消费者二:" +new String (body)); } }); } }
结果:
消费者一:
1 2 3 4 5 消费者一:0hello world 消费者一:2hello world 消费者一:4hello world 消费者一:6hello world 消费者一:8hello world
消费者二:
1 2 3 4 5 消费者二:1hello world 消费者二:3hello world 消费者二:5hello world 消费者二:7hello world 消费者二:9hello world
从上面我们可以看出来我们的消费者存在多个的情况下,我们的消息的消费是平均消费的;不会随着消费者处理消息的快慢而产生变化 ,绝对的平均分配
按需分配 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.saxon.workquene;import com.rabbitmq.client.*;import com.saxon.utils.RabbitMQUtils;import java.io.IOException;import java.util.Objects;public class WorkConsumer01 { public static void main (String[] args) throws Exception { Connection connect = RabbitMQUtils.getConnect (); Channel channel = Objects.requireNonNull (connect).createChannel (); channel.queueDeclare ("hello" , false , false , false , null ); channel.basicQos (1 ); channel.basicConsume ("hello" , false , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println ("消费者一:" + new String (body)); channel.basicAck (envelope.getDeliveryTag (), true ); } }); } }
重点代码:
1 2 3 4 5 6 7 8 9 10 11 channel.basicQos (1 ); channel.basicConsume ("hello" , false , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println ("消费者一:" + new String (body)); channel.basicAck (envelope.getDeliveryTag (), true ); } });
接收消息,每一次接收消息就确认,所以消息不会堵塞队列;实现按需分配;
第三种、fanout模式 广播模式,一个发送订阅的都可以收到
从图上看出来:
消费者面对的就是交换机,一个交换机对应多个队列
每个消费者都有自己的队列
生产者发送消息,只能发送到交换机,无法决定发到哪个队列
在创建的时候就要在模式哪里变成fanout,不然会报错
发送端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.saxon.fanout;import com.rabbitmq.client.Channel;import com.saxon.utils.RabbitMQUtils;import java.util.Objects;public class Provider { public static void main (String[] args) throws Exception { Channel channel = Objects.requireNonNull (RabbitMQUtils.getConnect ()).createChannel (); channel.exchangeDeclare ("log" , "fanout" ); channel.basicPublish ("log" , "" , null , "fanout message" .getBytes ()); RabbitMQUtils.closeConnectAndChannel (null , channel); } }
接收端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.saxon.fanout;import com.rabbitmq.client.*;import com.saxon.utils.RabbitMQUtils;import java.io.IOException;import java.util.Objects;public class Consumer1 { public static void main (String[] args) throws Exception { Connection connect = RabbitMQUtils.getConnect (); Channel channel = Objects.requireNonNull (connect).createChannel (); channel.exchangeDeclare ("log" , "fanout" ); String queue = channel.queueDeclare ().getQueue (); channel.queueBind (queue, "log" , "" ); channel.basicConsume (queue, false , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println ("消费者一:" + new String (body)); channel.basicAck (envelope.getDeliveryTag (), true ); } }); } }
如果要使用多个消费者的话,那么就多写几个消费者;
第四种、routing
图解;
P:消费者
X:交换机
符合routingkey=error的值会被c1接收
符合routingkey=info,error,warning会被c2接收
1.routing-direct直连模式 发送端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.saxon.routing;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.saxon.utils.RabbitMQUtils;public class Provider { public static void main (String[] args) throws Exception { Connection connect = RabbitMQUtils.getConnect (); Channel channel = connect.createChannel (); channel.exchangeDeclare ("route_direct" , "direct" ); String routingKey = "info" ; channel.basicPublish ("route_direct" , routingKey, null , ("发送消息 key=" + routingKey).getBytes ()); System.out.println ("关闭" ); RabbitMQUtils.closeConnectAndChannel (connect, channel); } }
接收端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.saxon.routing;import com.rabbitmq.client.*;import com.saxon.utils.RabbitMQUtils;import java.io.IOException;public class InfoConsumer { public static void main (String[] args) throws Exception { Connection connect = RabbitMQUtils.getConnect (); Channel channel = connect.createChannel (); String queue = channel.queueDeclare ().getQueue (); channel.exchangeDeclare ("route_direct" , "direct" ); channel.queueBind (queue, "route_direct" , "info" ); channel.basicConsume (queue, false , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println ("消费者:" + new String (body)); } }); } }
重要代码:
1 2 3 4 channel.exchangeDeclare ("route_direct" , "direct" ); channel.queueBind (queue, "route_direct" , "info" );
它只会接收同一个routingkey的数据
第五种、topic模式
我们如果发送的消息routingkey太多的话,每一个都写的话,代码的冗余量太大;所以我们决定使用一种统配的方式,如果它满足我们的routing的要求那么我们就接收消息,反之则不接受;
发送端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package com.saxon.topic;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.saxon.utils.RabbitMQUtils;public class Provider { public static void main (String[] args) throws Exception { Connection connect = RabbitMQUtils.getConnect (); Channel channel = connect.createChannel (); channel.exchangeDeclare ("topic" , "topic" ); String key = "user.id" ; channel.basicPublish ("topic" , key, null , "发送topic消息" .getBytes ()); RabbitMQUtils.closeConnectAndChannel (connect, channel); } }
接收端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.saxon.topic;import com.rabbitmq.client.*;import com.saxon.utils.RabbitMQUtils;import java.io.IOException;public class Consumer { public static void main (String[] args) throws Exception { Connection connect = RabbitMQUtils.getConnect (); Channel channel = connect.createChannel (); String queue = channel.queueDeclare ().getQueue (); channel.queueBind (queue, "topic" , "user.*" ); channel.basicConsume (queue, false , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println ("接收的消息" + new String (body)); } }); } }
重要代码:
1 2 channel.exchangeDeclare ("topic" , "topic" );
1 2 String queue = channel.queueDeclare ().getQueue (); channel.queueBind (queue, "topic" , "user.*" );
关于通配:
*#:匹配多个,相当于多个**
*:匹配一个,就相当于一个占位符
通配符的位置在哪都可以