我们提供消息推送系统招投标所需全套资料,包括消息推送系统介绍PPT、消息推送系统产品解决方案、
消息推送系统产品技术参数,以及对应的标书参考文件,详请联系客服。

import asyncio
class MessageCenter:
def __init__(self):
self.queue = []
async def add_message(self, message):
print(f"Adding message: {message}")
await asyncio.sleep(1) # 模拟耗时操作
self.queue.append(message)
async def process_messages(self):
while True:
if self.queue:
msg = self.queue.pop(0)
print(f"Processing message: {msg}")
await asyncio.sleep(2) # 模拟处理时间
else:
await asyncio.sleep(0.5)
async def main():
mc = MessageCenter()
tasks = [
asyncio.create_task(mc.add_message("Hello")),
asyncio.create_task(mc.add_message("World")),
asyncio.create_task(mc.process_messages())
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
pip install aioredis
然后修改代码如下:
import aioredis
import asyncio
class RedisMessageCenter:
def __init__(self, redis_url):
self.redis_url = redis_url
async def connect_redis(self):
self.redis = await aioredis.from_url(self.redis_url)
async def add_message(self, channel, message):
await self.redis.publish(channel, message)
async def listen_messages(self, channel):
pubsub = self.redis.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
print(f"Received message: {message['data']}")
async def main():
rmc = RedisMessageCenter("redis://localhost")
await rmc.connect_redis()
tasks = [
asyncio.create_task(rmc.add_message("channel1", "Hello")),
asyncio.create_task(rmc.add_message("channel1", "World")),
asyncio.create_task(rmc.listen_messages("channel1"))
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
