English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
(I) Preface
Why introduce a message queue?
1.Program decoupling
2.Improve performance
3.Reduce the complexity of multi-business logic
(II) Python operation of rabbit mq
For the basic configuration, installation, and usage of rabbitmq, please refer to the previous section of the article, and will not be repeated here.
If you want to use python to operate rabbitmq, you need to install the pika module, install it directly with pip:
pip install pika
1.The simplest rabbitmq producer and consumer dialogue:
producer:
#Author :ywq import pika auth = pika.PlainCredentials('ywq', 'qwe') #save auth info connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #connect to rabbit channel = connection.channel() #create channel channel.queue_declare(queue='hello') #declare queue #n In RabbitMQ, a message can never be sent directly to the queue; it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') #the body is the msg content print(" [x] Sent 'Hello World!'") connection.close()
consumer:
#Author :ywq import pika connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #connect to rabbit channel = connection.channel() #create channel channel.queue_declare(queue='hello') #declare queue def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. Press CTRL to exit+C') channel.start_consuming()
During the message transmission and consumption process, you can view the queue message information in real-time on the rabbit web management page.
2.Persistent message queue to prevent message queue loss due to unexpected situations such as crashes.
No changes are needed on the consumer side. Add two properties to the producer side code to make messages and queues persistent, and you must enable both to avoid message loss:
delivery_mode=2 #make msg persistent durable=True
Attribute insertion position see the following code (producer side):
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.queue_declare(queue='test1#durable=True, make queue persistent msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='', routing_key='test1', body=msg, properties=pika.BasicProperties( delivery_mode=2 #make msg persistent ) ) print('Send done:', msg) connection.close()
3.Fair Distribution
By default, rabbit polls messages when there are multiple consumers, but some consumers consume messages faster while others are slower. To make resource usage more balanced, an acknowledgment mechanism is introduced. After consuming the message, the consumer will send an ack to rabbit. Once the number of unacknowledged messages exceeds the specified allowed number, messages will no longer be sent to this consumer and will instead be sent to other consumers.
The producer side code does not need to be changed; two attributes need to be added to the consumer side code:
channel.basic_qos(prefetch_count= *) # define the max non_ack_count channel.basic_ack(delivery_tag=deliver.delivery_tag) # send ack to rabbitmq
The position of the attribute insertion is as follows (consumer side):
#Author :ywq import pika, time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.queue_declare(queue='test2', durable=True) def callback(chann,deliver,properties,body): print('Recv:',body) time.sleep(5) chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit channel.basic_qos(prefetch_count=1) ''' Note, no_ack=False Note that the no_ack type here only tells rabbit whether the consumer queue should return an ack. If you want to return an ack, you need to define it in the callback. prefetch_count=1, the number of unacknowledged messages exceeds1If the message count reaches a certain number, this consumer will no longer accept msg, and this configuration needs to be written above channel.basic_consume, otherwise it may cause non_ack situations to occur. ''' channel.basic_consume( callback, queue='test2' ) channel.start_consuming()
Third, Message Publishing/Subscription
The above modes are that when the producer sends a message once, the consumer receives it once. Can a producer send a message and multiple related consumers receive it simultaneously? Of course, rabbit supports message publication and subscription, supporting three modes, and implementing it through the component exchange forwarder.3There are several modes:
fanout: All queues bound to this exchange can receive messages, similar to broadcasting.
direct: The unique queue that can receive messages determined by routingKey and exchange is pushed to the consumer bound to the queue, similar to multicast.
topic: Queues that match the routingKey (which can be an expression at this time) can receive messages, similar to prefix list matching routing.
1.fanout
publish end (producer):
#Author :ywq import pika,sys,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='hello', exchange_type='fanout' ) msg=''.join(sys.argv[1:]) or 'Hello world %s' %time.time() channel.basic_publish( exchange='hello', routing_key='', body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) print('send done') connection.close()
subscribe end (consumer):
#Author :ywq import pika auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare( exchange='hello', exchange_type='fanout' ) random_num=channel.queue_declare(exclusive=True) #Randomly establish a queue with rabbit, the queue is immediately deleted and released when the consumer disconnects queue_name=random_num.method.queue channel.basic_qos(prefetch_count=1) channel.queue_bind( queue=queue_name, exchange='hello' ) def callback(chann,deliver,properties,body): print('Recv:',body) chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit channel.basic_consume( callback, queue=queue_name, ) channel.start_consuming()
Implement one-time sending by the producer and multiple associated consumers receiving.
When using exchange mode:
1.producer no longer declares queue, directly declares exchange
2.consumer still needs to bind the queue and specify the exchange to receive the message
3.consumer should create a random queue and release it immediately after use.
Random queue names can be detected under web:
2.direct
using the exchange, the consumer can selectively receive messages. Queue binding keywords, the producer sends data to the message exchange based on keywords, and the exchange determines which queue the data should be sent to based on the keyword, and the consumer receives accordingly. This is an addition to fanout with routing key.
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='direct_log', exchange_type='direct', ) while True: route_key=input('Input routing key:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='direct_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='direct_log', exchange_type='direct' ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Input routing key:') channel.queue_bind( queue=queue_name, exchange='direct_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[level:%s],[msg:%s]' %(route_key,body)) chann.basic_ack(delivery_tag=deliver.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
At the same time, enable multiple consumers, among which two receive notice and two receive warning, the running effect is as follows:
3.topic
Compared to direct, topic can implement a fuzzy matching working mode (specified at the consumer end), as long as the routing key contains the specified keyword, the message will be sent to the bound queue.
RabbitMQ wildcard rules:
The symbol “#” matches one or more words, and the symbol “” matches a single word. Therefore, “abc.#” can match “abc.m.n”, but “abc.*‘' Only matches to “abc.m”. ‘.’ is the delimiter. When using wildcard matching, the ‘.’ delimiter must be used.
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='topic_log', exchange_type='topic', ) while True: route_key=input('Input routing key:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='topic_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='topic_log', exchange_type='topic' ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Input routing key:') channel.queue_bind( queue=queue_name, exchange='topic_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[type:%s],[msg:%s]' %(route_key,body)) chann.basic_ack(delivery_tag=deliver.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
Running effect:
Three types of rabbitmq publish/The simple introduction of the subscribe model is completed.
The above article on python queue communication: rabbitMQ usage (instance explanation) is all the content shared by the editor. I hope it can provide a reference for everyone and hope everyone will support Yell Tutorial more.
Statement: The content of this article is from the Internet, the copyright belongs to the original author. The content is contributed and uploaded by Internet users spontaneously. This website does not own the copyright, does not undergo manual editing, and does not assume relevant legal liability. If you find any content suspected of copyright infringement, please send an email to notice#w3Please send an email to codebox.com (replace # with @ when sending email) to report violations, and provide relevant evidence. Once verified, this site will immediately delete the suspected infringing content.