import asyncio import pickle from urllib.parse import urlparse try: import aioredis except ImportError: aioredis = None from .asyncio_pubsub_manager import AsyncPubSubManager def _parse_redis_url(url): p = urlparse(url) if p.scheme not in {'redis', 'rediss'}: raise ValueError('Invalid redis url') ssl = p.scheme == 'rediss' host = p.hostname or 'localhost' port = p.port or 6379 password = p.password if p.path: db = int(p.path[1:]) else: db = 0 return host, port, password, db, ssl class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover """Redis based client manager for asyncio servers. This class implements a Redis backend for event sharing across multiple processes. Only kept here as one more example of how to build a custom backend, since the kombu backend is perfectly adequate to support a Redis message queue. To use a Redis backend, initialize the :class:`Server` instance as follows:: server = socketio.Server(client_manager=socketio.AsyncRedisManager( 'redis://hostname:port/0')) :param url: The connection URL for the Redis server. For a default Redis store running on the same host, use ``redis://``. To use an SSL connection, use ``rediss://``. :param channel: The channel name on which the server sends and receives notifications. Must be the same in all the servers. :param write_only: If set to ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting and receiving. """ name = 'aioredis' def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False, logger=None): if aioredis is None: raise RuntimeError('Redis package is not installed ' '(Run "pip install aioredis" in your ' 'virtualenv).') ( self.host, self.port, self.password, self.db, self.ssl ) = _parse_redis_url(url) self.pub = None self.sub = None super().__init__(channel=channel, write_only=write_only, logger=logger) async def _publish(self, data): retry = True while True: try: if self.pub is None: self.pub = await aioredis.create_redis( (self.host, self.port), db=self.db, password=self.password, ssl=self.ssl ) return await self.pub.publish(self.channel, pickle.dumps(data)) except (aioredis.RedisError, OSError): if retry: self._get_logger().error('Cannot publish to redis... ' 'retrying') self.pub = None retry = False else: self._get_logger().error('Cannot publish to redis... ' 'giving up') break async def _listen(self): retry_sleep = 1 while True: try: if self.sub is None: self.sub = await aioredis.create_redis( (self.host, self.port), db=self.db, password=self.password, ssl=self.ssl ) self.ch = (await self.sub.subscribe(self.channel))[0] return await self.ch.get() except (aioredis.RedisError, OSError): self._get_logger().error('Cannot receive from redis... ' 'retrying in ' '{} secs'.format(retry_sleep)) self.sub = None await asyncio.sleep(retry_sleep) retry_sleep *= 2 if retry_sleep > 60: retry_sleep = 60