我们提供消息推送系统招投标所需全套资料,包括消息推送系统介绍PPT、消息推送系统产品解决方案、
消息推送系统产品技术参数,以及对应的标书参考文件,详请联系客服。
import asyncio class MessageCenter: def __init__(self): self.queue = [] async def add_message(self, message): print(f"Adding message: {message}") await asyncio.sleep(1) # 模拟耗时操作 self.queue.append(message) async def process_messages(self): while True: if self.queue: msg = self.queue.pop(0) print(f"Processing message: {msg}") await asyncio.sleep(2) # 模拟处理时间 else: await asyncio.sleep(0.5) async def main(): mc = MessageCenter() tasks = [ asyncio.create_task(mc.add_message("Hello")), asyncio.create_task(mc.add_message("World")), asyncio.create_task(mc.process_messages()) ] await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.run(main())
pip install aioredis
然后修改代码如下:
import aioredis import asyncio class RedisMessageCenter: def __init__(self, redis_url): self.redis_url = redis_url async def connect_redis(self): self.redis = await aioredis.from_url(self.redis_url) async def add_message(self, channel, message): await self.redis.publish(channel, message) async def listen_messages(self, channel): pubsub = self.redis.pubsub() await pubsub.subscribe(channel) async for message in pubsub.listen(): print(f"Received message: {message['data']}") async def main(): rmc = RedisMessageCenter("redis://localhost") await rmc.connect_redis() tasks = [ asyncio.create_task(rmc.add_message("channel1", "Hello")), asyncio.create_task(rmc.add_message("channel1", "World")), asyncio.create_task(rmc.listen_messages("channel1")) ] await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.run(main())