四、RabbitMQ的消息模式

image-20210302195101777

image-20210302195125054

image-20210302195142276

一共有7种消息模式

第一种:直连模式

image-20210302205514504

添加虚拟主机

image-20210302203224876

授权:

image-20210302203655607

消息发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.saxon;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* RabbitMQ 消息提供者
*
* @author saxon
*/
public class Provider {
@Test
public void testSendMessage () throws IOException, TimeoutException {
//获得连接MQ的工厂
ConnectionFactory connectionFactory = new ConnectionFactory ();
//设置连接的主机
connectionFactory.setHost ("192.168.44.128");
//设置端口号
connectionFactory.setPort (5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost ("ems");
//设置用户和密码
connectionFactory.setUsername ("saxon");
connectionFactory.setPassword ("saxon");
//获得连接对象
Connection connection = connectionFactory.newConnection ();
//获得连接的通道
Channel channel = connection.createChannel ();
//绑定消息队列
//第一个参数:消息队列的名字,如果没有就自动创建一个
//第二个参数:用来确定数据是否持久化
//第三个参数;是否要独占队列
//第四个参数:是否要在消息完成后自动删除队列
//第五个参数:额外的参数
channel.queueDeclare ("hello", false, false, false, null);
//发布消息
//参数一:交换机名称
//参数二:消息队列名称
//参数三:传递消息的额外配置
//参数四:消息的具体内容
channel.basicPublish ("","hello",null,"hello world".getBytes ());
}
}

发送消息:

image-20210302205349942

1
2
3
4
5
6
7
8
9
10
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @see <a href="https://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
channel.basicPublish ("","hello",null,"hello world".getBytes ());

从上面我们可以明白一件事就是,消息队列可以发送给不同的人,并且取决于第二个参数的设置

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.saxon;

import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author saxon
* 消费者
*/
public class Consumer {

public static void testGetMessage () throws IOException, TimeoutException {
//获得连接MQ的工厂
ConnectionFactory connectionFactory = new ConnectionFactory ();
//设置连接的主机
connectionFactory.setHost ("192.168.44.128");
//设置端口号
connectionFactory.setPort (5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost ("ems");
//设置用户和密码
connectionFactory.setUsername ("saxon");
connectionFactory.setPassword ("saxon");
//获得连接对象
Connection connection = connectionFactory.newConnection ();
//获得连接的通道
Channel channel = connection.createChannel ();
//绑定消息队列
//第一个参数:消息队列的名字,如果没有就自动创建一个
//第二个参数:用来确定数据是否持久化
//第三个参数;是否要独占队列
//第四个参数:是否要在消息完成后自动删除队列
//第五个参数:额外的参数
channel.queueDeclare ("hello", false, false, false, null);
//发布消息
//参数一:交换机名称
//参数二:消息队列名称
//参数三:传递消息的额外配置
//参数四:消息的具体内容
channel.basicConsume ("hello", new DefaultConsumer (channel) {
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println ("接收的消息" + new String (body));
}
});
}
}

由于Junit的性质:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.saxon;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* RabbitMQ 测试类
*
* @author saxon
*/
public class RabbitMQ {
public static void main (String[] args) throws IOException, TimeoutException {
Consumer.testGetMessage ();
}
}

结果:

1
2
3
4
5
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
接收的消息hello world
接收的消息hello world

第二种、工作模型

为了解决我们的消息队列产生消息快,但是消费者消费慢,导致消息队列被阻塞的问题;RabbitMQ就有一种模式叫做工作模式,多个消费者连接同一个消息队列消费消息;

代码如下:

provider:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.saxon.workquene;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.saxon.utils.RabbitMQUtils;

import java.util.Objects;

/**
* @author saxon
* @version 1.0
* @date 2021/3/3 13:47
*/
public class WorkProvider {
public static void main (String[] args) throws Exception{
//获得连接的通道
Connection connect = RabbitMQUtils.getConnect ();
Channel channel = Objects.requireNonNull (connect).createChannel ();
channel.queueDeclare ("hello", false, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish ("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, (i+"hello world").getBytes ());
}
}
}

消费者一号:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.saxon.workquene;

import com.rabbitmq.client.*;
import com.saxon.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Objects;
/**
* @author saxon
* @version 1.0
* @date 2021/3/3 13:47
*/
public class WorkConsumer01 {
public static void main (String[] args) throws Exception{
//获得连接的通道
Connection connect = RabbitMQUtils.getConnect ();
Channel channel = Objects.requireNonNull (connect).createChannel ();
channel.queueDeclare ("hello", false, false, false, null);
channel.basicConsume ("hello",new DefaultConsumer (channel){
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println ("消费者一:"+new String (body));
}
});
}
}

消费者二号:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.saxon.workquene;

import com.rabbitmq.client.*;
import com.saxon.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Objects;
/**
* @author saxon
* @version 1.0
* @date 2021/3/3 13:47
*/
public class WorkConsumer02 {
public static void main (String[] args) throws Exception{
//获得连接的通道
Connection connect = RabbitMQUtils.getConnect ();
Channel channel = Objects.requireNonNull (connect).createChannel ();
channel.queueDeclare ("hello", false, false, false, null);
channel.basicConsume ("hello",new DefaultConsumer (channel){
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println ("消费者二:"+new String (body));
}
});
}
}

结果:

消费者一:

1
2
3
4
5
消费者一:0hello world
消费者一:2hello world
消费者一:4hello world
消费者一:6hello world
消费者一:8hello world

消费者二:

1
2
3
4
5
消费者二:1hello world
消费者二:3hello world
消费者二:5hello world
消费者二:7hello world
消费者二:9hello world

从上面我们可以看出来我们的消费者存在多个的情况下,我们的消息的消费是平均消费的;不会随着消费者处理消息的快慢而产生变化,绝对的平均分配

按需分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.saxon.workquene;

import com.rabbitmq.client.*;
import com.saxon.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Objects;

/**
* @author saxon
* @version 1.0
* @date 2021/3/3 13:47
*/
public class WorkConsumer01 {
public static void main (String[] args) throws Exception {
//获得连接的通道
Connection connect = RabbitMQUtils.getConnect ();
Channel channel = Objects.requireNonNull (connect).createChannel ();
channel.queueDeclare ("hello", false, false, false, null);
//一次只接受一条未确认的消息
channel.basicQos (1);
//关闭自动确认
channel.basicConsume ("hello", false, new DefaultConsumer (channel) {
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println ("消费者一:" + new String (body));
//自动确定消息,那个比较快就哪个先使用,不会造成队列的堵塞
channel.basicAck (envelope.getDeliveryTag (), true);
}
});
}
}

重点代码:

1
2
3
4
5
6
7
8
9
10
11
//一次只接受一条未确认的消息
channel.basicQos (1);
//关闭自动确认
channel.basicConsume ("hello", false, new DefaultConsumer (channel) {
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println ("消费者一:" + new String (body));
//自动确定消息,那个比较快就哪个先使用,不会造成队列的堵塞
channel.basicAck (envelope.getDeliveryTag (), true);
}
});

接收消息,每一次接收消息就确认,所以消息不会堵塞队列;实现按需分配;

第三种、fanout模式

广播模式,一个发送订阅的都可以收到

从图上看出来:

  • 消费者面对的就是交换机,一个交换机对应多个队列
  • 每个消费者都有自己的队列
  • 生产者发送消息,只能发送到交换机,无法决定发到哪个队列

image-20210303194551717

在创建的时候就要在模式哪里变成fanout,不然会报错

发送端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.saxon.fanout;

import com.rabbitmq.client.Channel;
import com.saxon.utils.RabbitMQUtils;

import java.util.Objects;

/**
* @author saxon
* @version 1.0
* @date 2021/3/3 19:37
*/
public class Provider {
public static void main (String[] args) throws Exception {
Channel channel = Objects.requireNonNull (RabbitMQUtils.getConnect ()).createChannel ();
//定义一个交换机,如果没有就创建,如果有的话,那么就必须一样
channel.exchangeDeclare ("log", "fanout");
//向交换机里面发送消息
channel.basicPublish ("log", "", null, "fanout message".getBytes ());
//释放资源
RabbitMQUtils.closeConnectAndChannel (null, channel);
}
}

接收端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.saxon.fanout;

import com.rabbitmq.client.*;
import com.saxon.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Objects;

/**
* @author saxon
* @version 1.0
* @date 2021/3/3 20:09
*/
public class Consumer1 {
public static void main (String[] args) throws Exception {
Connection connect = RabbitMQUtils.getConnect ();
Channel channel = Objects.requireNonNull (connect).createChannel ();
//获得一个交换机
channel.exchangeDeclare ("log", "fanout");
//临时队列
String queue = channel.queueDeclare ().getQueue ();
//绑定交换机和队列
channel.queueBind (queue, "log", "");
//关闭自动确认
channel.basicConsume (queue, false, new DefaultConsumer (channel) {
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println ("消费者一:" + new String (body));
//自动确定消息,那个比较快就哪个先使用,不会造成队列的堵塞
channel.basicAck (envelope.getDeliveryTag (), true);
}
});
}
}

如果要使用多个消费者的话,那么就多写几个消费者;

第四种、routing

图解;

  • P:消费者
  • X:交换机
  • 符合routingkey=error的值会被c1接收
  • 符合routingkey=info,error,warning会被c2接收

1.routing-direct直连模式

发送端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.saxon.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.saxon.utils.RabbitMQUtils;

/**
* @author saxon
* @version 1.0
* @date 2021/3/3 20:33
*/
public class Provider {
public static void main (String[] args) throws Exception {
Connection connect = RabbitMQUtils.getConnect ();
Channel channel = connect.createChannel ();
//定义一个交换机
channel.exchangeDeclare ("route_direct", "direct");
//定义一个routing_key
String routingKey = "info";
//发送消息
channel.basicPublish ("route_direct", routingKey, null, ("发送消息 key=" + routingKey).getBytes ());
System.out.println ("关闭");
RabbitMQUtils.closeConnectAndChannel (connect, channel);
}
}

接收端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.saxon.routing;

import com.rabbitmq.client.*;
import com.saxon.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author saxon
* @version 1.0
* @date 2021/3/3 20:55
*/
public class InfoConsumer {
public static void main (String[] args) throws Exception {
Connection connect = RabbitMQUtils.getConnect ();
Channel channel = connect.createChannel ();
//获得队列
String queue = channel.queueDeclare ().getQueue ();
//定义交换机
channel.exchangeDeclare ("route_direct", "direct");
//定义消息
channel.queueBind (queue, "route_direct", "info");
channel.basicConsume (queue, false, new DefaultConsumer (channel) {
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println ("消费者:" + new String (body));
}
});
}
}

重要代码:

1
2
3
4
//定义交换机
channel.exchangeDeclare ("route_direct", "direct");
//绑定交换机和队列
channel.queueBind (queue, "route_direct", "info");

它只会接收同一个routingkey的数据

第五种、topic模式

我们如果发送的消息routingkey太多的话,每一个都写的话,代码的冗余量太大;所以我们决定使用一种统配的方式,如果它满足我们的routing的要求那么我们就接收消息,反之则不接受;

发送端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.saxon.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.saxon.utils.RabbitMQUtils;

/**
* @author saxon
* @version 1.0
* @date 2021/3/3 21:27
*/
public class Provider {
public static void main (String[] args) throws Exception {
Connection connect = RabbitMQUtils.getConnect ();
Channel channel = connect.createChannel ();
//指定交换机
channel.exchangeDeclare ("topic", "topic");
String key = "user.id";
channel.basicPublish ("topic", key, null, "发送topic消息".getBytes ());
RabbitMQUtils.closeConnectAndChannel (connect, channel);
}
}

接收端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.saxon.topic;

import com.rabbitmq.client.*;
import com.saxon.utils.RabbitMQUtils;

import java.io.IOException;

/**
* @author saxon
* @version 1.0
* @date 2021/3/3 21:30
*/
public class Consumer {
public static void main (String[] args) throws Exception {
Connection connect = RabbitMQUtils.getConnect ();
Channel channel = connect.createChannel ();
String queue = channel.queueDeclare ().getQueue ();
channel.queueBind (queue, "topic", "user.*");
channel.basicConsume (queue, false, new DefaultConsumer (channel) {
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println ("接收的消息" + new String (body));
}
});
}
}

重要代码:

1
2
//指定交换机,交换机的类型
channel.exchangeDeclare ("topic", "topic");
1
2
String queue = channel.queueDeclare ().getQueue ();
channel.queueBind (queue, "topic", "user.*");

关于通配:

  • *#:匹配多个,相当于多个**
  • *:匹配一个,就相当于一个占位符

通配符的位置在哪都可以