博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RABBITMQ队列
阅读量:7089 次
发布时间:2019-06-28

本文共 15156 字,大约阅读时间需要 50 分钟。

 

安装python rabbitMQ module 

pip install pika官网 https://pypi.python.org/pypi/pika

安装rabbit-server服务,centos7系统

这个中间商就是MQ erlang语言支持的安装依赖ERLANGwget https://mirrors.tuna.tsinghua.edu.cn/epel/7/x86_64/Packages/e/epel-release-7-11.noarch.rpmrpm -ivh epel-release-7-11.noarch.rpmyum repolistyum clean allyum install erlang安装依赖SOCATyum - y install socat安装RABBITMQrpm - ivh https: // bintray.com / rabbitmq / rabbitmq - server - rpm / download_file?file_path = rabbitmq - server - 3.6.10 - 1.el7.noarch.rpm/sbin/service rabbitmq - server start端口号 5672

简单队列通信

远程连接rabbit server的话,需要配置远程用户,权限

在rabbitmq server上创建用户

rabbitmqctl add_user joker 123456  

配置权限,允许从外面访问

rabbitmqctl  set_permissions -p "/" joker '.*' '.*' '.*'

set_permissions [-p vhost] {

user} {
conf} {
write} {
read}

vhost

The name of the virtual host to which to grant the user access, defaulting to /.

user

The name of the user to grant access to the specified virtual host.

conf

A regular expression matching resource names for which the user is granted configure permissions.

write

A regular expression matching resource names for which the user is granted write permissions.

read

A regular expression matching resource names for which the user is granted read permissions.

消息队列收发端连接到远程的rabbit-server需要配置认证参数 

credentials = pika.PlainCredentials('joker', 'joker123')  connection = pika.BlockingConnection(pika.ConnectionParameters(    'remoteip',5672,'/',credentials))channel = connection.channel()

send端

import pika connection = pika.BlockingConnection(pika.ConnectionParameters(               'localhost'))    # 建立socketchannel = connection.channel()  # 建立通道 #声明queuechannel.queue_declare(queue='hello') # 通道里面声明队列 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.channel.basic_publish(exchange='',                      routing_key='hello', # 队列                      body='Hello World!') # 消息print(" [x] Sent 'Hello World!'")connection.close()

receive端 

import pika connection = pika.BlockingConnection(pika.ConnectionParameters(               'localhost'))channel = connection.channel()  #You may ask why we declare the queue again ‒ we have already declared it in our previous code.# We could avoid that if we were sure that the queue already exists. For example if send.py program#was run before. But we're not yet sure which program to run first. In such cases it's a good# practice to repeat declaring the queue in both programs.channel.queue_declare(queue='hello') def callback(ch, method, properties, body):    print(" [x] Received %r" % body) channel.basic_consume(callback,                      queue='hello',                      no_ack=True)    # 这个是不需要接收确认信息,后面会继续说 print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming() # 开始接收消息,这里是阻塞状态,一直接收下去

 

Work Queues

这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多

采用轮训机制,如果有多个消费者,会按照先后顺序发送消息

消费者处理完了告诉生产者处理完了,生产者就会从消息队列删除消息,如果没有处理完宕机了,那消息就没了

no_ack=FALSE

# 当消费者执行任务的时候断掉,那么这个消息就掉了,no_ack注销掉,消费者就必须给服务器信息,是否执行完了,手动给服务器确认 # 判断断掉的机制就是SOCKET断了 

ch.basic_ack(delivery_tag=method.delivery_tag) # 手动跟服务端确认

 

消息持久化

channel.queue_declare(queue='hello', durable=True) # 持久化消息队列名称channel.basic_publish(exchange='',                      routing_key="task_queue",                      body=message,                      properties=pika.BasicProperties(                         delivery_mode = 2, # make message persistent # 消息持久化                      ))

服务器宕机,队列里的消息持久在  

 

消息公平发布 

消息公平分发,谁有本事(性能高)你多发,看队列有多少消息channel.basic_qos(prefetch_count=1),我这里有一条消息就先别给我发

channel.basic_qos(prefetch_count=1)  

 

生产者端

# !/usr/bin/env python# _*_coding:utf-8_*_# Author:Jokerimport pikausername = 'joker'pwd = '123456'user_pwd = pika.PlainCredentials(username, pwd)connection = pika.BlockingConnection(pika.ConnectionParameters(    host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKETchannel = connection.channel() # 建立通道# 管道里面声明queuechannel.queue_declare(queue='hello2',durable=True) # 持久化队列# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.channel.basic_publish(exchange='',                      routing_key='hello2',  # QUEUE名字                      body='Hello World!',                      properties=pika.BasicProperties( # 消息持久化                          delivery_mode=2,  # make message persistent                      )                      )  # 消息内容print(" [x] Sent 'Hello World!'")connection.close()

消费者端

# !/usr/bin/env python# _*_coding:utf-8_*_# Author:Jokerimport pika,timeusername = 'joker'pwd = '123456'user_pwd = pika.PlainCredentials(username, pwd)connection = pika.BlockingConnection(pika.ConnectionParameters(    host='remoteip',port=5672,credentials=user_pwd))channel = connection.channel()# You may ask why we declare the queue again ‒ we have already declared it in our previous code.# We could avoid that if we were sure that the queue already exists. For example if send.py program# was run before. But we're not yet sure which program to run first. In such cases it's a good# practice to repeat declaring the queue in both programs.channel.queue_declare(queue='hello2',durable=True) # 无法保证生产消费谁先运行,为了不出错可以自己声明,durable=True持久化队列def callback(ch, method, properties, body):    print(ch,method,properties)    # 
('60.205.188.107', 5672) params=
>>> #
#
# 通道的内存对象,链接的消息,消息属性见下面 # time.sleep(30) print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 手动跟服务端确认,devlivery_tag是信息标签,一般是第一条是1,第二条是2channel.basic_qos(prefetch_count=1) # 如果我这里还有消息就先别发给我channel.basic_consume(# 消费消息 callback, # 如果收到消息,就调用CALLBACK函数来处理消息 queue='hello2', # 从哪个队列收消息 # no_ack=True) # 不会给服务端发消息,是否处理完 )print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming() # 永远收下去

Propertie

content_type : 消息内容的类型content_encoding: 消息内容的编码格式priority: 消息的优先级correlation_id:关联idreply_to: 用于指定回复的队列的名称expiration: 消息的失效时间message_id: 消息idtimestamp:消息的时间戳type: 类型user_id: 用户idapp_id: 应用程序idcluster_id: 集群id

  

Publish\Subscribe(消息发布\订阅) 

我们在之前学习的是1对1的消息发送接收,也就是消息只能发送到指定的queue,如果你想发送的消息让所有的queue收到,就要用到exchage了

Exchange在定义的时候有类型的,以决定到底是哪些queue符合条件,可以接收消息

 

fanout: 所有bind到此exchange的queue都可以接收消息

EXCHANGE FANDOUT 纯广播,因为广播的原因,不会帮你保留消息,消费者先启动才能收到消息,每个人都能收到

消息publisher

# !/usr/bin/env python# _*_coding:utf-8_*_# Author:Jokerimport pikaimport sysusername = 'joker'pwd = '123456'user_pwd = pika.PlainCredentials(username, pwd)connection = pika.BlockingConnection(pika.ConnectionParameters(    host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKETchannel = connection.channel() # 建立通道channel.exchange_declare(exchange='logs',exchange_type='fanout')   # 广播不需要写Qmessage = ' '.join(sys.argv[1:]) or "info: Hello World!"channel.basic_publish(exchange='logs',                      routing_key='', # Q 名                      body=message)print(" [x] Sent %r" % message)connection.close()

消息subscriber

# !/usr/bin/env python# _*_coding:utf-8_*_# Author:Jokerimport pikausername = 'joker'pwd = '123456'user_pwd = pika.PlainCredentials(username, pwd)connection = pika.BlockingConnection(pika.ConnectionParameters(    host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKETchannel = connection.channel() # 建立通道channel.exchange_declare(exchange='logs',exchange_type='fanout')result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除queue_name = result.method.queueprint(queue_name) # 随机channel.queue_bind(exchange='logs', # 绑定Q到转发器EXCHAGE上                   queue=queue_name) #print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r" % body)channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()
 

有选择的接收消息exchage type=direct

队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列  

publisher

# !/usr/bin/env python# _*_coding:utf-8_*_# Author:Joker import pikaimport sysusername = 'joker'pwd = '123456'user_pwd = pika.PlainCredentials(username, pwd)connection = pika.BlockingConnection(pika.ConnectionParameters(    host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKETchannel = connection.channel() # 建立通道channel.exchange_declare(exchange='direct_logs',                         exchange_type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ' '.join(sys.argv[2:]) or 'Hello World!'channel.basic_publish(exchange='direct_logs',                      routing_key=severity,                      body=message)print(" [x] Sent %r:%r" % (severity, message))connection.close()# import sys# print(sys.argv[0]) # /Users/liqianlong/Desktop/Django project/kkk/开启py之旅/消息队列/7DIRECT_PUBLISHER.py

subscriber

# !/usr/bin/env python# _*_coding:utf-8_*_# Author:Jokerimport pikaimport sysusername = 'joker'pwd = '123456'user_pwd = pika.PlainCredentials(username, pwd)connection = pika.BlockingConnection(pika.ConnectionParameters(    host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKETchannel = connection.channel() # 建立通道channel.exchange_declare(exchange='direct_logs',                         exchange_type='direct')result = channel.queue_declare(exclusive=True)queue_name = result.method.queueseverities = sys.argv[1:]   # 获取脚本后面跟的参数,WARNING,ERROR,INFO ..if not severities:    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])    sys.exit(1)for severity in severities:    channel.queue_bind(exchange='direct_logs',                       queue=queue_name,                       routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()  

 

更细致的消息过滤exchage type=topic

publisher

# !/usr/bin/env python# _*_coding:utf-8_*_# Author:Jokerimport pikaimport sysusername = 'joker'pwd = '123456'user_pwd = pika.PlainCredentials(username, pwd)connection = pika.BlockingConnection(pika.ConnectionParameters(    host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKETchannel = connection.channel() # 建立通道channel.exchange_declare(exchange='topic_logs',                         exchange_type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'message = ' '.join(sys.argv[2:]) or 'Hello World!'channel.basic_publish(exchange='topic_logs',                      routing_key=routing_key,                      body=message)print(" [x] Sent %r:%r" % (routing_key, message))connection.close()

subscriber

# !/usr/bin/env python# _*_coding:utf-8_*_# Author:Jokerimport pikaimport sysusername = 'joker'pwd = '123456'user_pwd = pika.PlainCredentials(username, pwd)connection = pika.BlockingConnection(pika.ConnectionParameters(    host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKETchannel = connection.channel() # 建立通道channel.exchange_declare(exchange='topic_logs',                         exchange_type='topic')result = channel.queue_declare(exclusive=True)queue_name = result.method.queuebinding_keys = sys.argv[1:] # 监测的文件匹配if not binding_keys:    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])    sys.exit(1)for binding_key in binding_keys:    channel.queue_bind(exchange='topic_logs',                       queue=queue_name,                       routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()

To receive all the logs run:

python receive_logs_topic.py "#"  可以收取所有的信息

To receive all logs from the facility "kern":

python receive_logs_topic.py "kern.*"

Or if you want to hear only about "critical" logs:

python receive_logs_topic.py "*.critical"

You can create multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical"

And to emit a log with a routing key "kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error"

 

Remote procedure call(RPC) 

远程调用方法执行,SNMP简单网络管理协议,发一条执行返回结果就是一个简单的RPC,实现的就是服务器端也是客户端双向

server

#_*_coding:utf-8_*___author__ = 'joker lii'import pikaimport timeconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n):    if n == 0:        return 0    elif n == 1:        return 1    else:        return fib(n-1) + fib(n-2) def on_request(ch, method, props, body):    n = int(body)     print(" [.] fib(%s)" % n)    response = fib(n)     ch.basic_publish(exchange='', # 结果发挥给客户端                     routing_key=props.reply_to, # props指的就是客户端里面的replay_to的q                     properties=pika.BasicProperties(correlation_id = \                                                         props.correlation_id), # 客户端的correlation_id                     body=str(response))    ch.basic_ack(delivery_tag = method.delivery_tag) # 确保消息消费了 channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests")channel.start_consuming()

client

import pikaimport uuid import time class FibonacciRpcClient(object):    def __init__(self):        self.connection = pika.BlockingConnection(pika.ConnectionParameters(                host='localhost'))  # socket         self.channel = self.connection.channel() # 通道          result = self.channel.queue_declare(exclusive=True)        self.callback_queue = result.method.queue # 随机q         self.channel.basic_consume(self.on_response, no_ack=True,  # on_response 回调函数                                   queue=self.callback_queue)      # 随机q     def on_response(self, ch, method, props, body): # 消息收到后做了些什么        if self.corr_id == props.correlation_id: # 先判断自己当前的id和服务器端的id是否一样,保证消息一对一            self.response = body # 这里将收到的消息赋值response,看是不是none     def call(self, n):        self.response = None # response,谁会将这个none改为true,因为none就会一直收        self.corr_id = str(uuid.uuid4()) # 唯一的随机一串数字         self.channel.basic_publish(exchange='', # 发消息                                   routing_key='rpc_queue', # rpc_queue                                   properties=pika.BasicProperties( # 消息持久化                                         reply_to = self.callback_queue, # 让服务器端执行完命令返回这个q里面                                         correlation_id = self.corr_id, # 唯一的随机一串数字                                         ),                                   body=str(n)) # 发的消息         while self.response is None: # response为none就会一直收            self.connection.process_data_events() # 非阻塞版的start_consuming(),收到消息触发回调函数,没有收到继续往下走        print('no msg...')             time.sleep(0.5)         return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)")response = fibonacci_rpc.call(30) # 调用call方法print(" [.] Got %r" % response)

 

 

转载于:https://www.cnblogs.com/jokerbj/p/9436910.html

你可能感兴趣的文章
SpringBoot四大神器之Actuator
查看>>
html复习之标签整理
查看>>
Yii2 使用 faker 生成假数据(转)
查看>>
Consul安装使用
查看>>
tomcat事件处理机制
查看>>
JS BUG 传递数字过大,数据值会变化
查看>>
橡皮筋进度条ElasticProgressBar
查看>>
spring boot引入json,jsonobject,需要指定jdk15
查看>>
企业架构 - 涉众管理(Stakeholder Management)
查看>>
Ubuntu11.10 解决rar文件解压错误
查看>>
sqlplus: error while loading shared libraries: /u01/app/lib/libclntsh.so.11.1
查看>>
ORACLE等待事件:enq: TX - row lock contention
查看>>
使用Fiddler2录制HTTP操作脚本
查看>>
响应activex事件
查看>>
Winform 进程之间通讯的几种方法
查看>>
Google LOGO现代舞舞蹈动画
查看>>
有人3见解
查看>>
[python]decimal常用操作和需要注意的地方
查看>>
Ubuntu 网卡信息2
查看>>
android 没有main函数,怎么找到程序执行入口呢?以及activity主要生命周期的方法说明...
查看>>