English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
1.1Project Background: To build a disaster warning message platform, the disaster inspection system needs to push messages into the message platform. This is a typical message transmission in a heterogeneous system. We need to choose a middleware as the message queue, and after researching and analyzing message middlewares such as rabbitmq, zeromq, activemq, and kafka, we have chosen rabbitmq as our message middleware based on comprehensive performance, security, and persistence. In terms of requirements, we have various types of messages, including urgent push and general ones. When there is high concurrency, there will be a situation where messages need to be prioritized. Therefore, the priority push function of the rabbitmq message queue is the first technical point we need to solve.
1.2Technical Research: There is a concept that needs to be explained, why it is said to be the priority of the message queue rather than the priority of the message, let's look at the working principle of the message queue
The producer sends messages to the exchange (if no exchange is declared, it will go to the default exchange), the exchange binds one or more queues, messages enter the queue, and the consumer keeps listening to the queue, starting to consume when it finds messages in the queue. This is a process of message transmission; the queue is a stack queue, and the stack is first in, first out, meaning that messages come in and line up one after another, and a single queue cannot achieve the function of message jumping the queue or priority push.
1.2.1 Why is it said to be a relative priority in a sense
There is priority only when there is concurrency. If each message can be processed instantly, there would be no need for priority message delivery. Let's see where messages will be blocked
1, queue, it is obvious that there will be many messages in the queue during high concurrency.2, exchange, there will also be blocking when the producer sends messages to the exchange during high concurrency.
In the first case, since we have defined the priority of the queue, all messages entering the queue are of the same priority level and there is no need to queue up. For the second case, when messages are blocked during exchange, it cannot be achieved that messages with priority enter the queue first, and it is still a scenario processed in order. However, due to the extremely fast processing speed from exchange to queue, we ignored this priority level.
1.2.3 Code implementation
In rabbitmq3.5Before version, the official implementation did not have the queue priority feature, but there were some plugins available on the forum that could achieve this (links at the end), and here we mainly talk about3.5Implementation after version
1.2.3.1 Java code
Connection conn = RabbitMQConnectionUtil.getRabbitmqConnection();//Create connection Channel channel = conn.createChannel();//Create channel Map<String, Object> arg = new HashMap<String, Object>(); arg.put("x-max-priority",10); //The queue's attribute parameters include10A priority level // Declare (create) queue //channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueDeclare(QUEUE_NAME, true, false, false, arg); // Message content String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); BasicProperties prop = new BasicProperties(null, null, null, null, 1, null, null, null, null, null, null, null, null, null);//Message parameters, declare the message priority as1 channel.basicPublish("", QUEUE_NAME, prop, message.getBytes()); //Message publishing System.out.println("[x] Sent '" + message + "'"); //Close the channel and connection channel.close(); conn.close();
Check the result from the client:
1.2.3.2Implementation combined with Spring:
1.2.3.2.1 XML configuration:
<beans xmlns="http://www.springframework.org/schema/beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance xmlns:rabbit="http://www.springframework.org/schema/rabbit xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <description>rabbitmq connection service configuration</description> <!-- Connection configuration --> <rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" username="${rabbit.username}" password="${rabbit.password}" port="${rabbit.port}" virtual-host="${rabbit.vhost}"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> <!-- Spring template declaration--> <!-- Declare a queue --> <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false"> <rabbit:queue}}-arguments> <entry key="x-max-priority"> <value type="java.lang.Integer">10</value>//This must definitely be an integer, as others do not work!! </entry> </rabbit:queue-arguments> </rabbit:queue> <!-- Listening configuration queues: The queues to be listened to, separated by commas (,) if multiple ref: Listener--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queue-names="test_queue_key" ref="queueListenter" method="onMessage"/> </rabbit:listener-container> <bean id="queueListenter" class="com.DF.spring.springAMQP.QueueListener"> />
1.2.3.2.2Code section:
producter:
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:")/spring/rabbitmq-contextDemo2.xml"); RabbitTemplate amqpTemplate = ctx.getBean(RabbitTemplate.class); Random random = new Random(); for (int i=< 1000; i++{ final int priority = random.nextInt(10 - 1 + 1) + 1;//Random priority amqpTemplate.convertAndSend("test_queue_key", (Object)("hello world"), new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(priority); return message; } ); }
customer:
public class QueueListener implements MessageListener{ @Override public void onMessage(Message message) { try{ System.out.print("[x] Received message:")+new String(message.getBody(),"utf-8);+"&&&"+"Priority"+message.getMessageProperties().getPriority()); Thread.sleep(1000); } e.printStackTrace(); } } }
Let's take a look at the messages in the queue from the client:
We send messages with random priority into the queue and see the messages printed out by the consumer end:
Up to this point, the demo function implementation of rabbitmq combined with spring...
That's all for this article, I hope it will be helpful to everyone's learning, and I also hope everyone will support the Yelling Tutorial more.
Declaration: The content of this article is from the Internet, the copyright belongs to the original author, the content is contributed and uploaded by Internet users spontaneously, this website does not own the copyright, has not been manually edited, and does not assume relevant legal liability. If you find content suspected of copyright infringement, please send an email to: notice#w3Please send an email to codebox.com (replace # with @ when sending email) to report violations, and provide relevant evidence. Once verified, this site will immediately delete the content suspected of infringement.