classRedisMixin(object): """Mixin class to implement reading urls from a redis queue.""" redis_key = None redis_batch_size = None redis_encoding = None
# Redis client placeholder. server = None
defstart_requests(self): """Returns a batch of start requests from redis.""" return self.next_requests()
defsetup_redis(self, crawler=None): """Setup redis connection and idle signal. This should be called after the spider has set its crawler object. """ if self.server isnotNone: return
if crawler isNone: # We allow optional crawler argument to keep backwards # compatibility. # XXX: Raise a deprecation warning. crawler = getattr(self, 'crawler', None)
if crawler isNone: raise ValueError("crawler is required")
settings = crawler.settings
if self.redis_key isNone: self.redis_key = settings.get( 'REDIS_START_URLS_KEY', defaults.START_URLS_KEY, )
self.server = connection.from_settings(crawler.settings) # The idle signal is called when the spider has no requests left, # that's when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
defnext_requests(self): """Returns a request to be scheduled or none.""" use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET) fetch_one = self.server.spop if use_set else self.server.lpop # XXX: Do we need to use a timeout here? found = 0 # TODO: Use redis pipeline execution. while found < self.redis_batch_size: data = fetch_one(self.redis_key) ifnot data: # Queue empty. break req = self.make_request_from_data(data) if req: yield req found += 1 else: self.logger.debug("Request not made from data: %r", data)
if found: self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
defmake_request_from_data(self, data): """Returns a Request instance from data coming from Redis. By default, ``data`` is an encoded URL. You can override this method to provide your own message decoding. Parameters ---------- data : bytes Message from redis. """ url = bytes_to_str(data, self.redis_encoding) return self.make_requests_from_url(url)
defschedule_next_requests(self): """Schedules a request if available""" # TODO: While there is capacity, schedule a batch of redis requests. for req in self.next_requests(): self.crawler.engine.crawl(req, spider=self)
defspider_idle(self): """Schedules a request if available, otherwise waits.""" # XXX: Handle a sentinel to close the spider. self.schedule_next_requests() raise DontCloseSpider
classRedisCrawlSpider(RedisMixin, CrawlSpider): """Spider that reads urls from redis queue when idle. Attributes ---------- redis_key : str (default: REDIS_START_URLS_KEY) Redis key where to fetch start URLs from.. redis_batch_size : int (default: CONCURRENT_REQUESTS) Number of messages to fetch from redis on each attempt. redis_encoding : str (default: REDIS_ENCODING) Encoding to use when decoding messages from redis queue. Settings -------- REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls") Default Redis key where to fetch start URLs from.. REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS) Default number of messages to fetch from redis on each attempt. REDIS_START_URLS_AS_SET : bool (default: True) Use SET operations to retrieve messages from the redis queue. REDIS_ENCODING : str (default: "utf-8") Default encoding to use when decoding messages from redis queue. """