More flexible Rate Controller (#716)

Move InstaloaderContext's rate controlling logic into a class
RateController with fine-grained methods to enable easily changing
Instaloader's behavior regarding rate limits.
This commit is contained in:
Alexander Graf 2020-07-11 13:54:31 +02:00 committed by GitHub
parent fa62025ea3
commit fbe05a1add
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 133 additions and 95 deletions

View File

@ -229,5 +229,16 @@ Exceptions
``InstaloaderContext`` (Low-level functions) ``InstaloaderContext`` (Low-level functions)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
``InstaloaderContext``
""""""""""""""""""""""
.. autoclass:: InstaloaderContext .. autoclass:: InstaloaderContext
:no-show-inheritance: :no-show-inheritance:
``RateController``
""""""""""""""""""
.. autoclass:: RateController
:no-show-inheritance:
.. versionadded:: 4.5

View File

@ -14,6 +14,6 @@ else:
from .exceptions import * from .exceptions import *
from .instaloader import Instaloader from .instaloader import Instaloader
from .instaloadercontext import InstaloaderContext from .instaloadercontext import InstaloaderContext, RateController
from .structures import (Hashtag, Highlight, Post, PostSidecarNode, PostComment, PostCommentAnswer, PostLocation, from .structures import (Hashtag, Highlight, Post, PostSidecarNode, PostComment, PostCommentAnswer, PostLocation,
Profile, Story, StoryItem, TopSearchResults, load_structure_from_file, save_structure_to_file) Profile, Story, StoryItem, TopSearchResults, load_structure_from_file, save_structure_to_file)

View File

@ -19,7 +19,7 @@ import requests
import urllib3 # type: ignore import urllib3 # type: ignore
from .exceptions import * from .exceptions import *
from .instaloadercontext import InstaloaderContext from .instaloadercontext import InstaloaderContext, RateController
from .structures import (Hashtag, Highlight, JsonExportable, Post, PostLocation, Profile, Story, StoryItem, from .structures import (Hashtag, Highlight, JsonExportable, Post, PostLocation, Profile, Story, StoryItem,
save_structure_to_file) save_structure_to_file)
@ -153,6 +153,7 @@ class Instaloader:
:param storyitem_metadata_txt_pattern: :option:`--storyitem-metadata-txt`, default is empty (=none) :param storyitem_metadata_txt_pattern: :option:`--storyitem-metadata-txt`, default is empty (=none)
:param max_connection_attempts: :option:`--max-connection-attempts` :param max_connection_attempts: :option:`--max-connection-attempts`
:param request_timeout: :option:`--request-timeout`, set per-request timeout (seconds) :param request_timeout: :option:`--request-timeout`, set per-request timeout (seconds)
:param rate_controller: Generator for a :class:`RateController` to override rate controlling behavior
.. attribute:: context .. attribute:: context
@ -175,9 +176,11 @@ class Instaloader:
post_metadata_txt_pattern: str = None, post_metadata_txt_pattern: str = None,
storyitem_metadata_txt_pattern: str = None, storyitem_metadata_txt_pattern: str = None,
max_connection_attempts: int = 3, max_connection_attempts: int = 3,
request_timeout: Optional[float] = None): request_timeout: Optional[float] = None,
rate_controller: Optional[Callable[[InstaloaderContext], RateController]] = None):
self.context = InstaloaderContext(sleep, quiet, user_agent, max_connection_attempts, request_timeout) self.context = InstaloaderContext(sleep, quiet, user_agent, max_connection_attempts,
request_timeout, rate_controller)
# configuration parameters # configuration parameters
self.dirname_pattern = dirname_pattern or "{target}" self.dirname_pattern = dirname_pattern or "{target}"

View File

@ -53,7 +53,8 @@ class InstaloaderContext:
""" """
def __init__(self, sleep: bool = True, quiet: bool = False, user_agent: Optional[str] = None, def __init__(self, sleep: bool = True, quiet: bool = False, user_agent: Optional[str] = None,
max_connection_attempts: int = 3, request_timeout: Optional[float] = None): max_connection_attempts: int = 3, request_timeout: Optional[float] = None,
rate_controller: Optional[Callable[["InstaloaderContext"], "RateController"]] = None):
self.user_agent = user_agent if user_agent is not None else default_user_agent() self.user_agent = user_agent if user_agent is not None else default_user_agent()
self.request_timeout = request_timeout self.request_timeout = request_timeout
@ -69,9 +70,7 @@ class InstaloaderContext:
# error log, filled with error() and printed at the end of Instaloader.main() # error log, filled with error() and printed at the end of Instaloader.main()
self.error_log = [] # type: List[str] self.error_log = [] # type: List[str]
# For the adaption of sleep intervals (rate control) self._rate_controller = rate_controller(self) if rate_controller is not None else RateController(self)
self._graphql_query_timestamps = dict() # type: Dict[str, List[float]]
self._graphql_earliest_next_request_time = 0.0
# Can be set to True for testing, disables supression of InstaloaderContext._error_catcher # Can be set to True for testing, disables supression of InstaloaderContext._error_catcher
self.raise_all_errors = False self.raise_all_errors = False
@ -295,77 +294,6 @@ class InstaloaderContext:
if self.sleep: if self.sleep:
time.sleep(min(random.expovariate(0.7), 5.0)) time.sleep(min(random.expovariate(0.7), 5.0))
def _dump_query_timestamps(self, current_time: float):
"""Output the number of GraphQL queries grouped by their query_hash within the last time."""
windows = [10, 11, 15, 20, 30, 60]
print("GraphQL requests:", file=sys.stderr)
for query_hash, times in self._graphql_query_timestamps.items():
print(" {}".format(query_hash), file=sys.stderr)
for window in windows:
reqs_in_sliding_window = sum(t > current_time - window * 60 for t in times)
print(" last {} minutes: {} requests".format(window, reqs_in_sliding_window), file=sys.stderr)
def _graphql_request_count_per_sliding_window(self, query_hash: str) -> int:
"""Return how many GraphQL requests can be done within the sliding window."""
if self.is_logged_in:
max_reqs = {'1cb6ec562846122743b61e492c85999f': 20,
'33ba35852cb50da46f5b5e889df7d159': 20,
'iphone': 100,
'other': 100}
else:
max_reqs = {'1cb6ec562846122743b61e492c85999f': 200,
'33ba35852cb50da46f5b5e889df7d159': 200,
'other': 200}
return max_reqs.get(query_hash) or min(max_reqs.values())
def _graphql_query_waittime(self, query_hash: str, current_time: float, untracked_queries: bool = False) -> float:
"""Calculate time needed to wait before GraphQL query can be executed."""
sliding_window = 660
if query_hash not in self._graphql_query_timestamps:
self._graphql_query_timestamps[query_hash] = []
self._graphql_query_timestamps[query_hash] = list(filter(lambda t: t > current_time - 60 * 60,
self._graphql_query_timestamps[query_hash]))
reqs_in_sliding_window = list(filter(lambda t: t > current_time - sliding_window,
self._graphql_query_timestamps[query_hash]))
count_per_sliding_window = self._graphql_request_count_per_sliding_window(query_hash)
if len(reqs_in_sliding_window) < count_per_sliding_window and not untracked_queries:
return max(0, self._graphql_earliest_next_request_time - current_time)
next_request_time = min(reqs_in_sliding_window) + sliding_window + 6
if untracked_queries:
self._graphql_earliest_next_request_time = next_request_time
return round(max(next_request_time, self._graphql_earliest_next_request_time) - current_time)
def _ratecontrol_graphql_query(self, query_hash: str, untracked_queries: bool = False):
"""Called before a GraphQL query is made in order to stay within Instagram's rate limits.
:param query_hash: The query_hash parameter of the query.
:param untracked_queries: True, if 429 has been returned to apply 429 logic.
"""
if not untracked_queries:
waittime = self._graphql_query_waittime(query_hash, time.monotonic(), untracked_queries)
assert waittime >= 0
if waittime > 10:
self.log('\nToo many queries in the last time. Need to wait {} seconds, until {:%H:%M}.'
.format(waittime, datetime.now() + timedelta(seconds=waittime)))
time.sleep(waittime)
if query_hash not in self._graphql_query_timestamps:
self._graphql_query_timestamps[query_hash] = [time.monotonic()]
else:
self._graphql_query_timestamps[query_hash].append(time.monotonic())
else:
text_for_429 = ("HTTP error code 429 was returned because too many queries occurred in the last time. "
"Please do not use Instagram in your browser or run multiple instances of Instaloader "
"in parallel.")
print(textwrap.fill(text_for_429), file=sys.stderr)
current_time = time.monotonic()
waittime = self._graphql_query_waittime(query_hash, current_time, untracked_queries)
assert waittime >= 0
if waittime > 10:
self.log('The request will be retried in {} seconds, at {:%H:%M}.'
.format(waittime, datetime.now() + timedelta(seconds=waittime)))
self._dump_query_timestamps(current_time)
time.sleep(waittime)
def get_json(self, path: str, params: Dict[str, Any], host: str = 'www.instagram.com', def get_json(self, path: str, params: Dict[str, Any], host: str = 'www.instagram.com',
session: Optional[requests.Session] = None, _attempt=1) -> Dict[str, Any]: session: Optional[requests.Session] = None, _attempt=1) -> Dict[str, Any]:
"""JSON request to Instagram. """JSON request to Instagram.
@ -386,11 +314,11 @@ class InstaloaderContext:
try: try:
self.do_sleep() self.do_sleep()
if is_graphql_query: if is_graphql_query:
self._ratecontrol_graphql_query(params['query_hash']) self._rate_controller.wait_before_query(params['query_hash'])
if is_iphone_query: if is_iphone_query:
self._ratecontrol_graphql_query('iphone') self._rate_controller.wait_before_query('iphone')
if is_other_query: if is_other_query:
self._ratecontrol_graphql_query('other') self._rate_controller.wait_before_query('other')
resp = sess.get('https://{0}/{1}'.format(host, path), params=params, allow_redirects=False) resp = sess.get('https://{0}/{1}'.format(host, path), params=params, allow_redirects=False)
while resp.is_redirect: while resp.is_redirect:
redirect_url = resp.headers['location'] redirect_url = resp.headers['location']
@ -446,12 +374,13 @@ class InstaloaderContext:
raise ConnectionException(error_string) from err raise ConnectionException(error_string) from err
self.error(error_string + " [retrying; skip with ^C]", repeat_at_end=False) self.error(error_string + " [retrying; skip with ^C]", repeat_at_end=False)
try: try:
if is_graphql_query and isinstance(err, TooManyRequestsException): if isinstance(err, TooManyRequestsException):
self._ratecontrol_graphql_query(params['query_hash'], untracked_queries=True) if is_graphql_query:
if is_iphone_query and isinstance(err, TooManyRequestsException): self._rate_controller.handle_429(params['query_hash'])
self._ratecontrol_graphql_query('iphone', untracked_queries=True) if is_iphone_query:
if is_other_query and isinstance(err, TooManyRequestsException): self._rate_controller.handle_429('iphone')
self._ratecontrol_graphql_query('other', untracked_queries=True) if is_other_query:
self._rate_controller.handle_429('other')
return self.get_json(path=path, params=params, host=host, session=sess, _attempt=_attempt + 1) return self.get_json(path=path, params=params, host=host, session=sess, _attempt=_attempt + 1)
except KeyboardInterrupt: except KeyboardInterrupt:
self.error("[skipped by user]", repeat_at_end=False) self.error("[skipped by user]", repeat_at_end=False)
@ -595,3 +524,97 @@ class InstaloaderContext:
if self._root_rhx_gis is None: if self._root_rhx_gis is None:
self._root_rhx_gis = self.get_json('', {}).get('rhx_gis', '') self._root_rhx_gis = self.get_json('', {}).get('rhx_gis', '')
return self._root_rhx_gis or None return self._root_rhx_gis or None
class RateController:
"""
Class providing request tracking and rate controlling to stay within rate limits.
It can be overridden to change Instaloader's behavior regarding rate limits, for example to raise a custom
exception when the rate limit is hit::
import instaloader
class MyRateController(instaloader.RateController):
def sleep(self, secs):
raise MyCustomException()
L = instaloader.Instaloader(rate_controller=lambda ctx: MyRateController(ctx))
"""
def __init__(self, context: InstaloaderContext):
self._context = context
self._graphql_query_timestamps = dict() # type: Dict[str, List[float]]
self._graphql_earliest_next_request_time = 0.0
def sleep(self, secs: float):
"""Wait given number of seconds."""
# Not static, to allow for the behavior of this method to depend on context-inherent properties, such as
# whether we are logged in.
# pylint:disable=no-self-use
time.sleep(secs)
def _dump_query_timestamps(self, current_time: float):
windows = [10, 11, 15, 20, 30, 60]
print("GraphQL requests:", file=sys.stderr)
for query_hash, times in self._graphql_query_timestamps.items():
print(" {}".format(query_hash), file=sys.stderr)
for window in windows:
reqs_in_sliding_window = sum(t > current_time - window * 60 for t in times)
print(" last {} minutes: {} requests".format(window, reqs_in_sliding_window), file=sys.stderr)
def count_per_sliding_window(self, query_type: str) -> int:
"""Return how many GraphQL requests can be done within the sliding window."""
# Not static, to allow for the count_per_sliding_window to depend on context-inherent properties, such as
# whether we are logged in.
# pylint:disable=no-self-use,unused-argument
return 200
def query_waittime(self, query_type: str, current_time: float, untracked_queries: bool = False) -> float:
"""Calculate time needed to wait before GraphQL query can be executed."""
sliding_window = 660
if query_type not in self._graphql_query_timestamps:
self._graphql_query_timestamps[query_type] = []
self._graphql_query_timestamps[query_type] = list(filter(lambda t: t > current_time - 60 * 60,
self._graphql_query_timestamps[query_type]))
reqs_in_sliding_window = list(filter(lambda t: t > current_time - sliding_window,
self._graphql_query_timestamps[query_type]))
count_per_sliding_window = self.count_per_sliding_window(query_type)
if len(reqs_in_sliding_window) < count_per_sliding_window and not untracked_queries:
return max(0.0, self._graphql_earliest_next_request_time - current_time)
next_request_time = min(reqs_in_sliding_window) + sliding_window + 6
if untracked_queries:
self._graphql_earliest_next_request_time = next_request_time
return max(next_request_time, self._graphql_earliest_next_request_time) - current_time
def wait_before_query(self, query_type: str) -> None:
"""This method is called before a query to Instagram. It calls :meth:`RateController.sleep` to wait
until the request can be made."""
waittime = self.query_waittime(query_type, time.monotonic(), False)
assert waittime >= 0
if waittime > 15:
self._context.log("\nToo many queries in the last time. Need to wait {} seconds, until {:%H:%M}."
.format(round(waittime), datetime.now() + timedelta(seconds=waittime)))
if waittime > 0:
self.sleep(waittime)
if query_type not in self._graphql_query_timestamps:
self._graphql_query_timestamps[query_type] = [time.monotonic()]
else:
self._graphql_query_timestamps[query_type].append(time.monotonic())
def handle_429(self, query_type: str) -> None:
"""This method is called to handle a 429 Too Many Requests response. It calls :meth:`RateController.sleep` to
wait until we can repeat the same request."""
text_for_429 = ("HTTP error code 429 was returned because too many queries occurred in the last time. "
"Please do not use Instagram in your browser or run multiple instances of Instaloader "
"in parallel.")
print(textwrap.fill(text_for_429), file=sys.stderr)
current_time = time.monotonic()
waittime = self.query_waittime(query_type, current_time, True)
assert waittime >= 0
if waittime > 15:
self._context.log("The request will be retried in {} seconds, at {:%H:%M}."
.format(round(waittime), datetime.now() + timedelta(seconds=waittime)))
self._dump_query_timestamps(current_time)
if waittime > 0:
self.sleep(waittime)

View File

@ -5,7 +5,7 @@ import shutil
import tempfile import tempfile
import unittest import unittest
from itertools import islice from itertools import islice
from typing import Dict, List from typing import Optional
import instaloader import instaloader
@ -23,8 +23,7 @@ PRIVATE_PROFILE_ID = 1706625676
EMPTY_PROFILE = "not_public" EMPTY_PROFILE = "not_public"
EMPTY_PROFILE_ID = 1928659031 EMPTY_PROFILE_ID = 1928659031
# Preserve query timestamps (rate control) between tests to not get rate limited ratecontroller = None # type: Optional[instaloader.RateController]
instaloadercontext_query_timestamps = dict() # type: Dict[str, List[float]]
class TestInstaloaderAnonymously(unittest.TestCase): class TestInstaloaderAnonymously(unittest.TestCase):
@ -37,13 +36,15 @@ class TestInstaloaderAnonymously(unittest.TestCase):
download_comments=True, download_comments=True,
save_metadata=True) save_metadata=True)
self.L.context.raise_all_errors = True self.L.context.raise_all_errors = True
# pylint:disable=protected-access if ratecontroller is not None:
self.L.context._graphql_query_timestamps = instaloadercontext_query_timestamps.copy() # pylint:disable=protected-access
ratecontroller._context = self.L.context
self.L.context._rate_controller = ratecontroller
def tearDown(self): def tearDown(self):
# pylint:disable=global-statement,protected-access # pylint:disable=global-statement,protected-access
global instaloadercontext_query_timestamps global ratecontroller
instaloadercontext_query_timestamps = self.L.context._graphql_query_timestamps.copy() ratecontroller = self.L.context._rate_controller
self.L.close() self.L.close()
os.chdir('/') os.chdir('/')
print("Removing {}".format(self.dir)) print("Removing {}".format(self.dir))