python 进程通信 延时_收发定时和延时消息_收发定时和延时消息_开源 Python SDK 接入说明_TCP 协议(社区版)_SDK 参考_消息队列 RocketMQ 版 - 阿里云...

本文提供使用 TCP 协议下的开源 Python SDK 来收发定时和延时消息的示例代码供您参考。

概念介绍

定时消息:Producer 将消息发送到消息队列 RocketMQ 版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息。

延时消息:Producer 将消息发送到消息队列 RocketMQ 版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。

定时消息与延时消息在代码配置上存在一些差异,但是最终达到的效果相同。消息在发送到阿里云 RocketMQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。详情请参见定时和延时消息。

注意 开源版本的 Apache RocketMQ 支持延时消息,但不支持定时消息,因此没有专门的定时消息接口。阿里云 RocketMQ 的延时消息是通过设置定时时间来实现的。如需使用云上定时消息,请参照以下步骤。

前提条件

您已完成准备工作。详情请参见准备工作。

发送定时消息

发送定时消息的示例代码如下。

# -*- coding: utf-8 -*-

from rocketmq.client import Producer, Message

import time

# 发送消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic。

topic = 'XXXXXX'

# 您在阿里云 RocketMQ 控制台上申请的 GID。

gid = 'GID_XXXXX'

# 设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取。

name_srv = 'http://XXXX.aliyuncs.com:80'

# 您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证。

ak = 'AK'

# 您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证。

sk = 'SK'

# 用户渠道,默认值为:ALIYUN。

channel = 'ALIYUN'

def delay_times_from_now(n):

t = time.time()

return n * 1000 + round(t * 1000)

def create_message():

msg = Message(topic)

msg.set_keys('YourKey')

msg.set_tags('YourTags')

msg.set_body('Hello RocketMQ, This is a Python Delay Message.')

# 设置需要发送或者延迟的时间,此处以延迟 2s 为例。

# 单位毫秒(ms),在指定时间戳(当前时间之后)进行投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。

msg.set_property('__STARTDELIVERTIME', str(delay_times_from_now(2)))

return msg

def send_message_delay(count):

producer = Producer(gid)

producer.set_name_server_address(name_srv)

producer.set_session_credentials(ak, sk, channel)

producer.start()

for n in range(count):

msg = create_message()

ret = producer.send_sync(msg)

print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)

print ('send delay message done')

producer.shutdown()

if __name__ == '__main__':

send_message_delay(10)

消费定时消息

消费定时消息的示例代码如下。

# -*- coding: utf-8 -*-

from rocketmq.client import PushConsumer, ConsumeStatus

import time

# 消费消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic。

topic = 'XXXXXX'

# 您在阿里云 RocketMQ 控制台上申请的 GID。

gid = 'GID_XXXXX'

# 设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取。

name_srv = 'http://XXXX.aliyuncs.com:80'

# 您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证。

ak = 'AK'

# 您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证。

sk = 'SK'

# 用户渠道,默认值为:ALIYUN。

channel = 'ALIYUN'

def callback(msg):

print(msg.id, msg.body)

# 消费成功回复 CONSUME_SUCCESS,消费失败回复 RECONSUME_LATER。此时会触发消费重试。

return ConsumeStatus.CONSUME_SUCCESS

def start_consume_message():

consumer = PushConsumer(gid)

consumer.set_name_server_address(name_srv)

consumer.set_session_credentials(ak, sk, channel)

consumer.subscribe(topic, callback)

# ********************************************

# 1. 确保订阅关系的设置在启动之前完成。

# 2. 确保相同 GID 下面的消费者的订阅关系一致。

# *********************************************

print ('start consume message')

consumer.start()

# 请保持消费者一直处于运行状态。

while True:

time.sleep(3600)

if __name__ == '__main__':

start_consume_message()


版权声明:本文为weixin_39761422原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。