我们提供消息推送系统招投标所需全套资料,包括消息推送系统介绍PPT、消息推送系统产品解决方案、
			消息推送系统产品技术参数,以及对应的标书参考文件,详请联系客服。
		
 
	 
		 
		 
		 
		
import asyncio
class MessageCenter:
    def __init__(self):
        self.queue = []
    async def add_message(self, message):
        print(f"Adding message: {message}")
        await asyncio.sleep(1)  # 模拟耗时操作
        self.queue.append(message)
    
    async def process_messages(self):
        while True:
            if self.queue:
                msg = self.queue.pop(0)
                print(f"Processing message: {msg}")
                await asyncio.sleep(2)  # 模拟处理时间
            else:
                await asyncio.sleep(0.5)
async def main():
    mc = MessageCenter()
    tasks = [
        asyncio.create_task(mc.add_message("Hello")),
        asyncio.create_task(mc.add_message("World")),
        asyncio.create_task(mc.process_messages())
    ]
    await asyncio.gather(*tasks)
if __name__ == "__main__":
    asyncio.run(main())
pip install aioredis
然后修改代码如下:
import aioredis
import asyncio
class RedisMessageCenter:
    def __init__(self, redis_url):
        self.redis_url = redis_url
    async def connect_redis(self):
        self.redis = await aioredis.from_url(self.redis_url)
    
    async def add_message(self, channel, message):
        await self.redis.publish(channel, message)
    async def listen_messages(self, channel):
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(channel)
        async for message in pubsub.listen():
            print(f"Received message: {message['data']}")
async def main():
    rmc = RedisMessageCenter("redis://localhost")
    await rmc.connect_redis()
    tasks = [
        asyncio.create_task(rmc.add_message("channel1", "Hello")),
        asyncio.create_task(rmc.add_message("channel1", "World")),
        asyncio.create_task(rmc.listen_messages("channel1"))
    ]
    await asyncio.gather(*tasks)
if __name__ == "__main__":
    asyncio.run(main())
