diff --git a/instaloader.py b/instaloader.py index 09ead30..fe86d15 100755 --- a/instaloader.py +++ b/instaloader.py @@ -24,7 +24,7 @@ from datetime import datetime from enum import Enum from functools import wraps from io import BytesIO -from typing import Any, Callable, Dict, Iterator, List, Optional, Union +from typing import Any, Callable, Dict, Iterator, List, Optional import requests import requests.utils @@ -173,14 +173,323 @@ def filterstr_to_filterfunc(filter_str: str, logged_in: bool) -> Callable[['Post return filterfunc +class InstaloaderContext: + """Class providing methods for (error) logging and low-level communication with Instagram. + + It is not thought to be instantiated directly, rather :class:`Instaloader` instances maintain a context + object. + + For logging, it provides :meth:`log`, :meth:`error`, :meth:`error_catcher`. + + It provides low-level communication routines :meth:`get_json`, :meth:`graphql_query`, :meth:`graphql_node_list`, + :meth:`get_and_write_raw` and implements mechanisms for rate controlling and error handling. + + Further, it provides methods for logging in and general session handles, which are used by that routines in + class :class:`Instaloader`. + """ + + def __init__(self, sleep: bool = True, quiet: bool = False, + user_agent: Optional[str] = None, max_connection_attempts: int = 3): + + self.user_agent = user_agent if user_agent is not None else default_user_agent() + self._session = self.get_anonymous_session() + self.username = None + self.sleep = sleep + self.quiet = quiet + self.max_connection_attempts = max_connection_attempts + + # error log, filled with error() and printed at the end of Instaloader.main() + self.error_log = [] + + # For the adaption of sleep intervals (rate control) + self.previous_queries = dict() + + # Can be set to True for testing, disables supression of InstaloaderContext._error_catcher + self.raise_all_errors = False + + @property + def is_logged_in(self) -> bool: + """True, if this Instaloader instance is logged in.""" + return bool(self.username) + + def log(self, *msg, sep='', end='\n', flush=False): + """Log a message to stdout that can be suppressed with --quiet.""" + if not self.quiet: + print(*msg, sep=sep, end=end, flush=flush) + + def error(self, msg, repeat_at_end = True): + """Log a non-fatal error message to stderr, which is repeated at program termination. + + :param repeat_at_end: Set to false if the message should be printed, but not repeated at program termination.""" + print(msg, file=sys.stderr) + if repeat_at_end: + self.error_log.append(msg) + + def close(self): + """Print error log and close session""" + if self.error_log and not self.quiet: + print("\nErrors occured:", file=sys.stderr) + for err in self.error_log: + print(err, file=sys.stderr) + self._session.close() + + @contextmanager + def error_catcher(self, extra_info: Optional[str] = None): + """ + Context manager to catch, print and record InstaloaderExceptions. + + :param extra_info: String to prefix error message with.""" + try: + yield + except InstaloaderException as err: + if extra_info: + self.error('{}: {}'.format(extra_info, err)) + else: + self.error('{}'.format(err)) + if self.raise_all_errors: + raise err + + def _default_http_header(self, empty_session_only: bool = False) -> Dict[str, str]: + """Returns default HTTP header we use for requests.""" + header = {'Accept-Encoding': 'gzip, deflate', + 'Accept-Language': 'en-US,en;q=0.8', + 'Connection': 'keep-alive', + 'Content-Length': '0', + 'Host': 'www.instagram.com', + 'Origin': 'https://www.instagram.com', + 'Referer': 'https://www.instagram.com/', + 'User-Agent': self.user_agent, + 'X-Instagram-AJAX': '1', + 'X-Requested-With': 'XMLHttpRequest'} + if empty_session_only: + del header['Host'] + del header['Origin'] + del header['Referer'] + del header['X-Instagram-AJAX'] + del header['X-Requested-With'] + return header + + def get_anonymous_session(self) -> requests.Session: + """Returns our default anonymous requests.Session object.""" + session = requests.Session() + session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1', + 'ig_vw': '1920', 'csrftoken': '', + 's_network': '', 'ds_user_id': ''}) + session.headers.update(self._default_http_header(empty_session_only=True)) + return session + + def save_session_to_file(self, sessionfile): + pickle.dump(requests.utils.dict_from_cookiejar(self._session.cookies), sessionfile) + + def load_session_from_file(self, username, sessionfile): + session = requests.Session() + session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile)) + session.headers.update(self._default_http_header()) + session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']}) + self._session = session + self.username = username + + def test_login(self) -> Optional[str]: + data = self.graphql_query("d6f4427fbe92d846298cf93df0b937d3", {}) + return data["data"]["user"]["username"] if data["data"]["user"] is not None else None + + def login(self, user, passwd): + session = requests.Session() + session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1', + 'ig_vw': '1920', 'csrftoken': '', + 's_network': '', 'ds_user_id': ''}) + session.headers.update(self._default_http_header()) + self._sleep() + resp = session.get('https://www.instagram.com/') + session.headers.update({'X-CSRFToken': resp.cookies['csrftoken']}) + self._sleep() + login = session.post('https://www.instagram.com/accounts/login/ajax/', + data={'password': passwd, 'username': user}, allow_redirects=True) + session.headers.update({'X-CSRFToken': login.cookies['csrftoken']}) + if login.status_code == 200: + self._session = session + if user == self.test_login(): + self.username = user + else: + self.username = None + self._session = None + raise BadCredentialsException('Login error! Check your credentials!') + else: + raise ConnectionException('Login error! Connection error!') + + def _sleep(self): + """Sleep a short time if self.sleep is set. Called before each request to instagram.com.""" + if self.sleep: + time.sleep(random.uniform(0.5, 3)) + + def get_json(self, path: str, params: Dict[str, Any], host: str = 'www.instagram.com', + session: Optional[requests.Session] = None, _attempt = 1) -> Dict[str, Any]: + """JSON request to Instagram. + + :param path: URL, relative to the given domain which defaults to www.instagram.com/ + :param params: GET parameters + :param host: Domain part of the URL from where to download the requested JSON; defaults to www.instagram.com + :param session: Session to use, or None to use self.session + :return: Decoded response dictionary + :raises QueryReturnedNotFoundException: When the server responds with a 404. + :raises ConnectionException: When query repeatedly failed. + """ + def graphql_query_waittime(query_hash: str, untracked_queries: bool = False) -> int: + sliding_window = 660 + timestamps = self.previous_queries.get(query_hash) + if not timestamps: + return sliding_window if untracked_queries else 0 + current_time = time.monotonic() + timestamps = list(filter(lambda t: t > current_time - sliding_window, timestamps)) + self.previous_queries[query_hash] = timestamps + if len(timestamps) < 100 and not untracked_queries: + return 0 + return round(min(timestamps) + sliding_window - current_time) + 6 + is_graphql_query = 'query_hash' in params and 'graphql/query' in path + if is_graphql_query: + query_hash = params['query_hash'] + waittime = graphql_query_waittime(query_hash) + if waittime > 0: + self.log('\nToo many queries in the last time. Need to wait {} seconds.'.format(waittime)) + time.sleep(waittime) + timestamp_list = self.previous_queries.get(query_hash) + if timestamp_list is not None: + timestamp_list.append(time.monotonic()) + else: + self.previous_queries[query_hash] = [time.monotonic()] + sess = session if session else self._session + try: + self._sleep() + resp = sess.get('https://{0}/{1}'.format(host, path), params=params, allow_redirects=False) + while resp.is_redirect: + redirect_url = resp.headers['location'] + self.log('\nHTTP redirect from https://{0}/{1} to {2}'.format(host, path, redirect_url)) + if redirect_url.index('https://{}/'.format(host)) == 0: + resp = sess.get(redirect_url if redirect_url.endswith('/') else redirect_url + '/', + params=params, allow_redirects=False) + else: + break + if resp.status_code == 404: + raise QueryReturnedNotFoundException("404") + if resp.status_code == 429: + raise TooManyRequestsException("429 - Too Many Requests") + if resp.status_code != 200: + raise ConnectionException("HTTP error code {}.".format(resp.status_code)) + resp_json = resp.json() + if 'status' in resp_json and resp_json['status'] != "ok": + if 'message' in resp_json: + raise ConnectionException("Returned \"{}\" status, message \"{}\".".format(resp_json['status'], + resp_json['message'])) + else: + raise ConnectionException("Returned \"{}\" status.".format(resp_json['status'])) + return resp_json + except (ConnectionException, json.decoder.JSONDecodeError, requests.exceptions.RequestException) as err: + error_string = "JSON Query to {}: {}".format(path, err) + if _attempt == self.max_connection_attempts: + raise ConnectionException(error_string) + self.error(error_string + " [retrying; skip with ^C]", repeat_at_end=False) + text_for_429 = ("HTTP error code 429 was returned because too many queries occured in the last time. " + "Please do not use Instagram in your browser or run multiple instances of Instaloader " + "in parallel.") + try: + if isinstance(err, TooManyRequestsException): + print(textwrap.fill(text_for_429), file=sys.stderr) + if is_graphql_query: + waittime = graphql_query_waittime(query_hash=params['query_hash'], untracked_queries=True) + if waittime > 0: + self.log('The request will be retried in {} seconds.'.format(waittime)) + time.sleep(waittime) + self._sleep() + return self.get_json(path=path, params=params, host=host, session=sess, _attempt=_attempt + 1) + except KeyboardInterrupt: + self.error("[skipped by user]", repeat_at_end=False) + raise ConnectionException(error_string) + + def graphql_query(self, query_hash: str, variables: Dict[str, Any], + referer: Optional[str] = None) -> Dict[str, Any]: + """ + Do a GraphQL Query. + + :param query_hash: Query identifying hash. + :param variables: Variables for the Query. + :param referer: HTTP Referer, or None. + :return: The server's response dictionary. + """ + tmpsession = copy_session(self._session) + tmpsession.headers.update(self._default_http_header(empty_session_only=True)) + del tmpsession.headers['Connection'] + del tmpsession.headers['Content-Length'] + tmpsession.headers['authority'] = 'www.instagram.com' + tmpsession.headers['scheme'] = 'https' + tmpsession.headers['accept'] = '*/*' + if referer is not None: + tmpsession.headers['referer'] = urllib.parse.quote(referer) + resp_json = self.get_json('graphql/query', + params={'query_hash': query_hash, + 'variables': json.dumps(variables, separators=(',', ':'))}, + session=tmpsession) + tmpsession.close() + if 'status' not in resp_json: + self.error("GraphQL response did not contain a \"status\" field.") + return resp_json + + def graphql_node_list(self, query_hash: str, query_variables: Dict[str, Any], + query_referer: Optional[str], + edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]]) -> Iterator[Dict[str, Any]]: + """Retrieve a list of GraphQL nodes.""" + query_variables['first'] = Instaloader.GRAPHQL_PAGE_LENGTH + data = self.graphql_query(query_hash, query_variables, query_referer) + while True: + edge_struct = edge_extractor(data) + yield from [edge['node'] for edge in edge_struct['edges']] + if edge_struct['page_info']['has_next_page']: + query_variables['after'] = edge_struct['page_info']['end_cursor'] + data = self.graphql_query(query_hash, query_variables, query_referer) + else: + break + + def get_and_write_raw(self, url: str, filename: str, _attempt = 1) -> None: + """Downloads raw data. + + :raises QueryReturnedNotFoundException: When the server responds with a 404. + :raises QueryReturnedForbiddenException: When the server responds with a 403. + :raises ConnectionException: When download repeatedly failed.""" + try: + with self.get_anonymous_session() as anonymous_session: + resp = anonymous_session.get(url) + if resp.status_code == 200: + self.log(filename, end=' ', flush=True) + with open(filename, 'wb') as file: + resp.raw.decode_content = True + shutil.copyfileobj(resp.raw, file) + else: + if resp.status_code == 403: + # suspected invalid URL signature + raise QueryReturnedForbiddenException("403 when accessing {}.".format(url)) + if resp.status_code == 404: + # 404 not worth retrying. + raise QueryReturnedNotFoundException("404 when accessing {}.".format(url)) + raise ConnectionException("HTTP error code {}.".format(resp.status_code)) + except (urllib3.exceptions.HTTPError, requests.exceptions.RequestException, ConnectionException) as err: + error_string = "URL {}: {}".format(url, err) + if _attempt == self.max_connection_attempts: + raise ConnectionException(error_string) + self.error(error_string + " [retrying; skip with ^C]", repeat_at_end=False) + try: + self._sleep() + self.get_and_write_raw(url, filename, _attempt + 1) + except KeyboardInterrupt: + self.error("[skipped by user]", repeat_at_end=False) + raise ConnectionException(error_string) + + class Post: """ Structure containing information about an Instagram post. Created by methods :meth:`Profile.get_posts`, :meth:`Instaloader.get_hashtag_posts`, :meth:`Instaloader.get_feed_posts` and :meth:`Profile.get_saved_posts`. - Posts are linked to an :class:`Instaloader` instance which is used for error logging and obtaining of additional - metadata, if required. This class unifies access to the properties associated with a post. It implements == and is + This class unifies access to the properties associated with a post. It implements == and is hashable. The properties defined here are accessible by the filter expressions specified with the :option:`--only-if` @@ -189,31 +498,31 @@ class Post: LOGIN_REQUIRING_PROPERTIES = ["viewer_has_liked"] - def __init__(self, instaloader: 'Instaloader', node: Dict[str, Any], + def __init__(self, context: InstaloaderContext, node: Dict[str, Any], owner_profile: Optional['Profile'] = None): """Create a Post instance from a node structure as returned by Instagram. - :param instaloader: :class:`Instaloader` instance used for additional queries if neccessary. + :param context: :class:`InstaloaderContext` instance used for additional queries if neccessary. :param node: Node structure, as returned by Instagram. :param owner_profile: The Profile of the owner, if already known at creation. """ - self._instaloader = instaloader + self._context = context self._node = node self._owner_profile = owner_profile self._full_metadata_dict = None @classmethod - def from_shortcode(cls, instaloader: 'Instaloader', shortcode: str): + def from_shortcode(cls, context: InstaloaderContext, shortcode: str): """Create a post object from a given shortcode""" # pylint:disable=protected-access - post = cls(instaloader, {'shortcode': shortcode}) + post = cls(context, {'shortcode': shortcode}) post._node = post._full_metadata return post @classmethod - def from_mediaid(cls, instaloader: 'Instaloader', mediaid: int): + def from_mediaid(cls, context: InstaloaderContext, mediaid: int): """Create a post object from a given mediaid""" - return cls.from_shortcode(instaloader, mediaid_to_shortcode(mediaid)) + return cls.from_shortcode(context, mediaid_to_shortcode(mediaid)) @property def shortcode(self) -> str: @@ -239,7 +548,7 @@ class Post: @property def _full_metadata(self) -> Dict[str, Any]: if not self._full_metadata_dict: - pic_json = self._instaloader.get_json("p/{0}/".format(self.shortcode), params={'__a': 1}) + pic_json = self._context.get_json("p/{0}/".format(self.shortcode), params={'__a': 1}) if "graphql" in pic_json: self._full_metadata_dict = pic_json["graphql"]["shortcode_media"] else: @@ -267,9 +576,9 @@ class Post: return self._owner_profile.username.lower() return self._field('owner', 'username').lower() except (InstaloaderException, KeyError, TypeError) as err: - if self._instaloader.raise_all_errors: + if self._context.raise_all_errors: raise err - self._instaloader.error("Get owner name of {}: {} -- using \'UNKNOWN\'.".format(self, err)) + self._context.error("Get owner name of {}: {} -- using \'UNKNOWN\'.".format(self, err)) return 'UNKNOWN' @property @@ -356,7 +665,7 @@ class Post: @property def viewer_has_liked(self) -> Optional[bool]: """Whether the viewer has liked the post, or None if not logged in.""" - if not self._instaloader.is_logged_in: + if not self._context.is_logged_in: return None if 'likes' in self._node and 'viewer_has_liked' in self._node['likes']: return self._node['likes']['viewer_has_liked'] @@ -385,10 +694,10 @@ class Post: if self.comments == len(comment_edges): # If the Post's metadata already contains all comments, don't do GraphQL requests to obtain them yield from (comment['node'] for comment in comment_edges) - yield from self._instaloader.graphql_node_list("33ba35852cb50da46f5b5e889df7d159", - {'shortcode': self.shortcode}, - 'https://www.instagram.com/p/' + self.shortcode + '/', - lambda d: d['data']['shortcode_media']['edge_media_to_comment']) + yield from self._context.graphql_node_list("33ba35852cb50da46f5b5e889df7d159", + {'shortcode': self.shortcode}, + 'https://www.instagram.com/p/' + self.shortcode + '/', + lambda d: d['data']['shortcode_media']['edge_media_to_comment']) def get_likes(self) -> Iterator[Dict[str, Any]]: """Iterate over all likes of the post. @@ -403,16 +712,16 @@ class Post: if self.likes == len(likes_edges): # If the Post's metadata already contains all likes, don't do GraphQL requests to obtain them yield from (like['node'] for like in likes_edges) - yield from self._instaloader.graphql_node_list("1cb6ec562846122743b61e492c85999f", {'shortcode': self.shortcode}, - 'https://www.instagram.com/p/' + self.shortcode + '/', - lambda d: d['data']['shortcode_media']['edge_liked_by']) + yield from self._context.graphql_node_list("1cb6ec562846122743b61e492c85999f", {'shortcode': self.shortcode}, + 'https://www.instagram.com/p/' + self.shortcode + '/', + lambda d: d['data']['shortcode_media']['edge_liked_by']) def get_location(self) -> Optional[Dict[str, str]]: """If the Post has a location, returns a dictionary with fields 'lat' and 'lng'.""" loc_dict = self._field("location") if loc_dict is not None: - location_json = self._instaloader.get_json("explore/locations/{0}/".format(loc_dict["id"]), - params={'__a': 1}) + location_json = self._context.get_json("explore/locations/{0}/".format(loc_dict["id"]), + params={'__a': 1}) return location_json["location"] if "location" in location_json else location_json['graphql']['location'] @staticmethod @@ -440,24 +749,22 @@ class Profile: Provides methods for accessing profile properties, as well as :meth:`Profile.get_posts` and for own profile :meth:`Profile.get_saved_posts`. - Instances are linked to an :class:`Instaloader` instance. This class implements == and is hashable. + This class implements == and is hashable. """ - def __init__(self, instaloader: 'Instaloader', identifier: Union[str, int]): + def __init__(self, context: InstaloaderContext, profile_name: str): """ Lookup Profile information and create Profile instance. - :param instaloader: :class:`Instaloader` instance used for queries etc. - :param identifier: Profile name (string) or Profile ID (integer). + :param context: :class:`InstaloaderContext` instance used for queries etc. + :param identifier: Profile name (string). """ - self._instaloader = instaloader + self._context = context - profile_name = identifier if isinstance(identifier, str) else \ - self._instaloader.get_username_by_id(identifier) try: - metadata = self._instaloader.get_json('{}/'.format(profile_name), params={'__a': 1}) + metadata = self._context.get_json('{}/'.format(profile_name), params={'__a': 1}) self._metadata = metadata['graphql'] if 'graphql' in metadata else metadata except QueryReturnedNotFoundException: - raise ProfileNotExistsException('Profile {} does not exist.'.format(identifier)) + raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name)) @property def userid(self) -> int: @@ -528,12 +835,12 @@ class Profile: def get_profile_pic_url(self) -> str: """Return URL of profile picture""" try: - with self._instaloader.get_anonymous_session() as anonymous_session: - data = self._instaloader.get_json(path='api/v1/users/{0}/info/'.format(self.userid), params={}, - host='i.instagram.com', session=anonymous_session) + with self._context.get_anonymous_session() as anonymous_session: + data = self._context.get_json(path='api/v1/users/{0}/info/'.format(self.userid), params={}, + host='i.instagram.com', session=anonymous_session) return data["user"]["hd_profile_pic_url_info"]["url"] except (InstaloaderException, KeyError) as err: - self._instaloader.error('{} Unable to fetch high quality profile pic.'.format(err)) + self._context.error('{} Unable to fetch high quality profile pic.'.format(err)) return self._metadata["user"]["profile_pic_url_hd"] if "profile_pic_url_hd" in self._metadata["user"] \ else self._metadata["user"]["profile_pic_url"] @@ -541,25 +848,25 @@ class Profile: """Retrieve all posts from a profile.""" if 'media' in self._metadata['user']: # backwards compatibility with old non-graphql structure - yield from (Post(self._instaloader, node, owner_profile=self) + yield from (Post(self._context, node, owner_profile=self) for node in self._metadata['user']['media']['nodes']) has_next_page = self._metadata['user']['media']['page_info']['has_next_page'] end_cursor = self._metadata['user']['media']['page_info']['end_cursor'] else: - yield from (Post(self._instaloader, edge['node'], owner_profile=self) + yield from (Post(self._context, edge['node'], owner_profile=self) for edge in self._metadata['user']['edge_owner_to_timeline_media']['edges']) has_next_page = self._metadata['user']['edge_owner_to_timeline_media']['page_info']['has_next_page'] end_cursor = self._metadata['user']['edge_owner_to_timeline_media']['page_info']['end_cursor'] while has_next_page: # We do not use self.graphql_node_list() here, because profile_metadata # lets us obtain the first 12 nodes 'for free' - data = self._instaloader.graphql_query("472f257a40c653c64c666ce877d59d2b", - {'id': self.userid, - 'first': Instaloader.GRAPHQL_PAGE_LENGTH, - 'after': end_cursor}, - 'https://www.instagram.com/{0}/'.format(self.username)) + data = self._context.graphql_query("472f257a40c653c64c666ce877d59d2b", + {'id': self.userid, + 'first': Instaloader.GRAPHQL_PAGE_LENGTH, + 'after': end_cursor}, + 'https://www.instagram.com/{0}/'.format(self.username)) media = data['data']['user']['edge_owner_to_timeline_media'] - yield from (Post(self._instaloader, edge['node'], owner_profile=self) + yield from (Post(self._context, edge['node'], owner_profile=self) for edge in media['edges']) has_next_page = media['page_info']['has_next_page'] end_cursor = media['page_info']['end_cursor'] @@ -567,7 +874,7 @@ class Profile: def get_saved_posts(self) -> Iterator[Post]: """Get Posts that are marked as saved by the user.""" - if self.username != self._instaloader.username: + if self.username != self._context.username: raise LoginRequiredException("--login={} required to get that profile's saved posts.".format(self.username)) data = self._metadata @@ -581,15 +888,15 @@ class Profile: saved_media = data["user"]["saved_media"] if is_edge: - yield from (Post(self._instaloader, edge["node"]) for edge in saved_media["edges"]) + yield from (Post(self._context, edge["node"]) for edge in saved_media["edges"]) else: - yield from (Post(self._instaloader, node) for node in saved_media["nodes"]) + yield from (Post(self._context, node) for node in saved_media["nodes"]) if not saved_media["page_info"]["has_next_page"]: break - data = self._instaloader.graphql_query("f883d95537fbcd400f466f63d42bd8a1", - {'id': self.userid, 'first': Instaloader.GRAPHQL_PAGE_LENGTH, - 'after': saved_media["page_info"]["end_cursor"]})['data'] + data = self._context.graphql_query("f883d95537fbcd400f466f63d42bd8a1", + {'id': self.userid, 'first': Instaloader.GRAPHQL_PAGE_LENGTH, + 'after': saved_media["page_info"]["end_cursor"]})['data'] class Tristate(Enum): @@ -613,7 +920,7 @@ def _requires_login(func: Callable) -> Callable: """Decorator to raise an exception if herewith-decorated function is called without being logged in""" @wraps(func) def call(instaloader, *args, **kwargs): - if not instaloader.is_logged_in: + if not instaloader.context.is_logged_in: raise LoginRequiredException("--login=USERNAME required.") return func(instaloader, *args, **kwargs) # pylint:disable=no-member @@ -644,14 +951,10 @@ class Instaloader: download_comments: Tristate = Tristate.no_extra_query, save_metadata: Tristate = Tristate.never, max_connection_attempts: int = 3): - """Instaloader.""" + + self.context = InstaloaderContext(sleep, quiet, user_agent, max_connection_attempts) # configuration parameters - self.user_agent = user_agent if user_agent is not None else default_user_agent() - self.session = self.get_anonymous_session() - self.username = None - self.sleep = sleep - self.quiet = quiet self.dirname_pattern = _PathPattern(dirname_pattern if dirname_pattern is not None else '{target}') if filename_pattern is not None: filename_pattern = re.sub(r"({(?:post\.)?date)([:}])", r"\1_utc\2", filename_pattern) @@ -671,248 +974,37 @@ class Instaloader: self.save_captions = save_captions self.download_comments = download_comments self.save_metadata = save_metadata - self.max_connection_attempts = max_connection_attempts - - # error log, filled with error() and printed at the end of Instaloader.main() - self.error_log = [] - - # For the adaption of sleep intervals (rate control) - self.previous_queries = dict() - - # Can be set to True for testing, disables supression of Instaloader._error_catcher - self.raise_all_errors = False - - @property - def is_logged_in(self) -> bool: - """True, if this Instaloader instance is logged in.""" - return bool(self.username) @contextmanager def anonymous_copy(self): """Yield an anonymous, otherwise equally-configured copy of an Instaloader instance; Then copy its error log.""" - new_loader = Instaloader(self.sleep, self.quiet, self.user_agent, + new_loader = Instaloader(self.context.sleep, self.context.quiet, self.context.user_agent, self.dirname_pattern, self.filename_pattern, self.download_videos, self.download_video_thumbnails, self.download_geotags, self.save_captions, self.download_comments, - self.save_metadata, self.max_connection_attempts) - new_loader.previous_queries = self.previous_queries + self.save_metadata, self.context.max_connection_attempts) + new_loader.context.previous_queries = self.context.previous_queries yield new_loader - self.error_log.extend(new_loader.error_log) - self.previous_queries = new_loader.previous_queries + self.context.error_log.extend(new_loader.context.error_log) + new_loader.context.error_log = [] # avoid double-printing of errors + self.context.previous_queries = new_loader.context.previous_queries + new_loader.close() - def _log(self, *msg, sep='', end='\n', flush=False): - """Log a message to stdout that can be suppressed with --quiet.""" - if not self.quiet: - print(*msg, sep=sep, end=end, flush=flush) + def close(self): + self.context.close() - def error(self, msg, repeat_at_end = True): - """Log a non-fatal error message to stderr, which is repeated at program termination. + def __enter__(self): + return self - :param repeat_at_end: Set to false if the message should be printed, but not repeated at program termination.""" - print(msg, file=sys.stderr) - if repeat_at_end: - self.error_log.append(msg) - - @contextmanager - def _error_catcher(self, extra_info: Optional[str] = None): - """ - Context manager to catch, print and record InstaloaderExceptions. - - :param extra_info: String to prefix error message with.""" - try: - yield - except InstaloaderException as err: - if extra_info: - self.error('{}: {}'.format(extra_info, err)) - else: - self.error('{}'.format(err)) - if self.raise_all_errors: - raise err - - def _sleep(self): - """Sleep a short time if self.sleep is set. Called before each request to instagram.com.""" - if self.sleep: - time.sleep(random.uniform(0.5, 3)) - - def _get_and_write_raw(self, url: str, filename: str, _attempt = 1) -> None: - """Downloads raw data. - - :raises QueryReturnedNotFoundException: When the server responds with a 404. - :raises QueryReturnedForbiddenException: When the server responds with a 403. - :raises ConnectionException: When download repeatedly failed.""" - try: - with self.get_anonymous_session() as anonymous_session: - resp = anonymous_session.get(url) - if resp.status_code == 200: - self._log(filename, end=' ', flush=True) - with open(filename, 'wb') as file: - resp.raw.decode_content = True - shutil.copyfileobj(resp.raw, file) - else: - if resp.status_code == 403: - # suspected invalid URL signature - raise QueryReturnedForbiddenException("403 when accessing {}.".format(url)) - if resp.status_code == 404: - # 404 not worth retrying. - raise QueryReturnedNotFoundException("404 when accessing {}.".format(url)) - raise ConnectionException("HTTP error code {}.".format(resp.status_code)) - except (urllib3.exceptions.HTTPError, requests.exceptions.RequestException, ConnectionException) as err: - error_string = "URL {}: {}".format(url, err) - if _attempt == self.max_connection_attempts: - raise ConnectionException(error_string) - self.error(error_string + " [retrying; skip with ^C]", repeat_at_end=False) - try: - self._sleep() - self._get_and_write_raw(url, filename, _attempt + 1) - except KeyboardInterrupt: - self.error("[skipped by user]", repeat_at_end=False) - raise ConnectionException(error_string) - - def get_json(self, path: str, params: Dict[str, Any], host: str = 'www.instagram.com', - session: Optional[requests.Session] = None, _attempt = 1) -> Dict[str, Any]: - """JSON request to Instagram. - - :param path: URL, relative to the given domain which defaults to www.instagram.com/ - :param params: GET parameters - :param host: Domain part of the URL from where to download the requested JSON; defaults to www.instagram.com - :param session: Session to use, or None to use self.session - :return: Decoded response dictionary - :raises QueryReturnedNotFoundException: When the server responds with a 404. - :raises ConnectionException: When query repeatedly failed. - """ - def graphql_query_waittime(query_hash: str, untracked_queries: bool = False) -> int: - sliding_window = 660 - timestamps = self.previous_queries.get(query_hash) - if not timestamps: - return sliding_window if untracked_queries else 0 - current_time = time.monotonic() - timestamps = list(filter(lambda t: t > current_time - sliding_window, timestamps)) - self.previous_queries[query_hash] = timestamps - if len(timestamps) < 100 and not untracked_queries: - return 0 - return round(min(timestamps) + sliding_window - current_time) + 6 - is_graphql_query = 'query_hash' in params and 'graphql/query' in path - if is_graphql_query: - query_hash = params['query_hash'] - waittime = graphql_query_waittime(query_hash) - if waittime > 0: - self._log('\nToo many queries in the last time. Need to wait {} seconds.'.format(waittime)) - time.sleep(waittime) - timestamp_list = self.previous_queries.get(query_hash) - if timestamp_list is not None: - timestamp_list.append(time.monotonic()) - else: - self.previous_queries[query_hash] = [time.monotonic()] - sess = session if session else self.session - try: - self._sleep() - resp = sess.get('https://{0}/{1}'.format(host, path), params=params, allow_redirects=False) - while resp.is_redirect: - redirect_url = resp.headers['location'] - self._log('\nHTTP redirect from https://{0}/{1} to {2}'.format(host, path, redirect_url)) - if redirect_url.index('https://{}/'.format(host)) == 0: - resp = sess.get(redirect_url if redirect_url.endswith('/') else redirect_url + '/', - params=params, allow_redirects=False) - else: - break - if resp.status_code == 404: - raise QueryReturnedNotFoundException("404") - if resp.status_code == 429: - raise TooManyRequestsException("429 - Too Many Requests") - if resp.status_code != 200: - raise ConnectionException("HTTP error code {}.".format(resp.status_code)) - resp_json = resp.json() - if 'status' in resp_json and resp_json['status'] != "ok": - if 'message' in resp_json: - raise ConnectionException("Returned \"{}\" status, message \"{}\".".format(resp_json['status'], - resp_json['message'])) - else: - raise ConnectionException("Returned \"{}\" status.".format(resp_json['status'])) - return resp_json - except (ConnectionException, json.decoder.JSONDecodeError, requests.exceptions.RequestException) as err: - error_string = "JSON Query to {}: {}".format(path, err) - if _attempt == self.max_connection_attempts: - raise ConnectionException(error_string) - self.error(error_string + " [retrying; skip with ^C]", repeat_at_end=False) - text_for_429 = ("HTTP error code 429 was returned because too many queries occured in the last time. " - "Please do not use Instagram in your browser or run multiple instances of Instaloader " - "in parallel.") - try: - if isinstance(err, TooManyRequestsException): - print(textwrap.fill(text_for_429), file=sys.stderr) - if is_graphql_query: - waittime = graphql_query_waittime(query_hash=params['query_hash'], untracked_queries=True) - if waittime > 0: - self._log('The request will be retried in {} seconds.'.format(waittime)) - time.sleep(waittime) - self._sleep() - return self.get_json(path=path, params=params, host=host, session=sess, _attempt=_attempt + 1) - except KeyboardInterrupt: - self.error("[skipped by user]", repeat_at_end=False) - raise ConnectionException(error_string) - - def _default_http_header(self, empty_session_only: bool = False) -> Dict[str, str]: - """Returns default HTTP header we use for requests.""" - header = {'Accept-Encoding': 'gzip, deflate', - 'Accept-Language': 'en-US,en;q=0.8', - 'Connection': 'keep-alive', - 'Content-Length': '0', - 'Host': 'www.instagram.com', - 'Origin': 'https://www.instagram.com', - 'Referer': 'https://www.instagram.com/', - 'User-Agent': self.user_agent, - 'X-Instagram-AJAX': '1', - 'X-Requested-With': 'XMLHttpRequest'} - if empty_session_only: - del header['Host'] - del header['Origin'] - del header['Referer'] - del header['X-Instagram-AJAX'] - del header['X-Requested-With'] - return header - - def get_anonymous_session(self) -> requests.Session: - """Returns our default anonymous requests.Session object.""" - session = requests.Session() - session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1', - 'ig_vw': '1920', 'csrftoken': '', - 's_network': '', 'ds_user_id': ''}) - session.headers.update(self._default_http_header(empty_session_only=True)) - return session - - def graphql_query(self, query_hash: str, variables: Dict[str, Any], - referer: Optional[str] = None) -> Dict[str, Any]: - """ - Do a GraphQL Query. - - :param query_hash: Query identifying hash. - :param variables: Variables for the Query. - :param referer: HTTP Referer, or None. - :return: The server's response dictionary. - """ - tmpsession = copy_session(self.session) - tmpsession.headers.update(self._default_http_header(empty_session_only=True)) - del tmpsession.headers['Connection'] - del tmpsession.headers['Content-Length'] - tmpsession.headers['authority'] = 'www.instagram.com' - tmpsession.headers['scheme'] = 'https' - tmpsession.headers['accept'] = '*/*' - if referer is not None: - tmpsession.headers['referer'] = urllib.parse.quote(referer) - resp_json = self.get_json('graphql/query', - params={'query_hash': query_hash, - 'variables': json.dumps(variables, separators=(',', ':'))}, - session=tmpsession) - tmpsession.close() - if 'status' not in resp_json: - self.error("GraphQL response did not contain a \"status\" field.") - return resp_json + def __exit__(self, *args): + self.close() def get_username_by_id(self, profile_id: int) -> str: """To get the current username of a profile, given its unique ID, this function can be used.""" - data = self.graphql_query("472f257a40c653c64c666ce877d59d2b", {'id': str(profile_id), 'first': 1})['data']['user'] + data = self.context.graphql_query("472f257a40c653c64c666ce877d59d2b", + {'id': str(profile_id), 'first': 1})['data']['user'] if data: data = data["edge_owner_to_timeline_media"] else: @@ -924,27 +1016,12 @@ class Instaloader: else: raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").") else: - return Post.from_mediaid(self, int(data['edges'][0]["node"]["id"])).owner_username + return Post.from_mediaid(self.context, int(data['edges'][0]["node"]["id"])).owner_username def get_id_by_username(self, profile: str) -> int: """Each Instagram profile has its own unique ID which stays unmodified even if a user changes his/her username. To get said ID, given the profile's name, you may call this function.""" - return Profile(self, profile).userid - - def graphql_node_list(self, query_hash: str, query_variables: Dict[str, Any], - query_referer: Optional[str], - edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]]) -> Iterator[Dict[str, Any]]: - """Retrieve a list of GraphQL nodes.""" - query_variables['first'] = Instaloader.GRAPHQL_PAGE_LENGTH - data = self.graphql_query(query_hash, query_variables, query_referer) - while True: - edge_struct = edge_extractor(data) - yield from [edge['node'] for edge in edge_struct['edges']] - if edge_struct['page_info']['has_next_page']: - query_variables['after'] = edge_struct['page_info']['end_cursor'] - data = self.graphql_query(query_hash, query_variables, query_referer) - else: - break + return Profile(self.context, profile).userid @_requires_login def get_followers(self, profile: str) -> Iterator[Dict[str, Any]]: @@ -955,10 +1032,10 @@ class Instaloader: :param profile: Name of profile to lookup followers. """ - yield from self.graphql_node_list("37479f2b8209594dde7facb0d904896a", - {'id': str(self.get_id_by_username(profile))}, - 'https://www.instagram.com/' + profile + '/', - lambda d: d['data']['user']['edge_followed_by']) + yield from self.context.graphql_node_list("37479f2b8209594dde7facb0d904896a", + {'id': str(self.get_id_by_username(profile))}, + 'https://www.instagram.com/' + profile + '/', + lambda d: d['data']['user']['edge_followed_by']) @_requires_login def get_followees(self, profile: str) -> Iterator[Dict[str, Any]]: @@ -969,10 +1046,10 @@ class Instaloader: :param profile: Name of profile to lookup followers. """ - yield from self.graphql_node_list("58712303d941c6855d4e888c5f0cd22f", - {'id': str(self.get_id_by_username(profile))}, - 'https://www.instagram.com/' + profile + '/', - lambda d: d['data']['user']['edge_follow']) + yield from self.context.graphql_node_list("58712303d941c6855d4e888c5f0cd22f", + {'id': str(self.get_id_by_username(profile))}, + 'https://www.instagram.com/' + profile + '/', + lambda d: d['data']['user']['edge_follow']) def download_pic(self, filename: str, url: str, mtime: datetime, filename_alt: Optional[str] = None, filename_suffix: Optional[str] = None) -> bool: @@ -986,14 +1063,14 @@ class Instaloader: filename_alt += '_' + filename_suffix filename += '.' + file_extension if os.path.isfile(filename): - self._log(filename + ' exists', end=' ', flush=True) + self.context.log(filename + ' exists', end=' ', flush=True) return False if filename_alt is not None: filename_alt += '.' + file_extension if os.path.isfile(filename_alt): - self._log(filename_alt + 'exists', end=' ', flush=True) + self.context.log(filename_alt + 'exists', end=' ', flush=True) return False - self._get_and_write_raw(url, filename) + self.context.get_and_write_raw(url, filename) os.utime(filename, (datetime.now().timestamp(), mtime.timestamp())) return True @@ -1002,7 +1079,7 @@ class Instaloader: filename += '.json' with open(filename, 'w') as fp: json.dump(post, fp=fp, indent=4, default=Post.json_encoder) - self._log('json', end=' ', flush=True) + self.context.log('json', end=' ', flush=True) def update_comments(self, filename: str, post: Post, filename_alt: Optional[str] = None) -> None: try: @@ -1030,7 +1107,7 @@ class Instaloader: unique_comments_list.append(y) file.write(json.dumps(unique_comments_list, indent=4)) os.rename(filename_current, filename + '_comments.json') - self._log('comments', end=' ', flush=True) + self.context.log('comments', end=' ', flush=True) def save_caption(self, filename: str, mtime: datetime, caption: str, filename_alt: Optional[str] = None) -> None: """Updates picture caption""" @@ -1050,9 +1127,9 @@ class Instaloader: file_caption = file.read() if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'): try: - self._log(pcaption + ' unchanged', end=' ', flush=True) + self.context.log(pcaption + ' unchanged', end=' ', flush=True) except UnicodeEncodeError: - self._log('txt unchanged', end=' ', flush=True) + self.context.log('txt unchanged', end=' ', flush=True) return None else: def get_filename(file, index): @@ -1072,13 +1149,13 @@ class Instaloader: os.rename(get_filename(filename if file_exists_list[index - 1] % 2 else filename_alt, index - 1), get_filename(filename, index)) try: - self._log(pcaption + ' updated', end=' ', flush=True) + self.context.log(pcaption + ' updated', end=' ', flush=True) except UnicodeEncodeError: - self._log('txt updated', end=' ', flush=True) + self.context.log('txt updated', end=' ', flush=True) try: - self._log(pcaption, end=' ', flush=True) + self.context.log(pcaption, end=' ', flush=True) except UnicodeEncodeError: - self._log('txt', end=' ', flush=True) + self.context.log('txt', end=' ', flush=True) with open(filename, 'wb') as text_file: shutil.copyfileobj(BytesIO(caption), text_file) os.utime(filename, (datetime.now().timestamp(), mtime.timestamp())) @@ -1091,7 +1168,7 @@ class Instaloader: with open(filename, 'wb') as text_file: shutil.copyfileobj(BytesIO(location_string.encode()), text_file) os.utime(filename, (datetime.now().timestamp(), mtime.timestamp())) - self._log('geo', end=' ', flush=True) + self.context.log('geo', end=' ', flush=True) def download_profilepic(self, profile: Profile) -> None: """Downloads and saves profile pic.""" @@ -1100,7 +1177,7 @@ class Instaloader: return epoch.strftime('%Y-%m-%d_%H-%M-%S') profile_pic_url = profile.get_profile_pic_url() - with self.get_anonymous_session() as anonymous_session: + with self.context.get_anonymous_session() as anonymous_session: date_object = datetime.strptime(anonymous_session.head(profile_pic_url).headers["Last-Modified"], '%a, %d %b %Y %H:%M:%S GMT') if ((format_string_contains_key(self.dirname_pattern, 'profile') or @@ -1112,25 +1189,25 @@ class Instaloader: filename = '{0}/{1}_{2}_UTC_profile_pic.{3}'.format(self.dirname_pattern.format(), profile.username.lower(), _epoch_to_string(date_object), profile_pic_url[-3:]) if os.path.isfile(filename): - self._log(filename + ' already exists') + self.context.log(filename + ' already exists') return None - self._get_and_write_raw(profile_pic_url, filename) + self.context.get_and_write_raw(profile_pic_url, filename) os.utime(filename, (datetime.now().timestamp(), date_object.timestamp())) - self._log('') # log output of _get_and_write_raw() does not produce \n + self.context.log('') # log output of _get_and_write_raw() does not produce \n @_requires_login def save_session_to_file(self, filename: Optional[str] = None) -> None: """Saves internally stored :class:`requests.Session` object.""" if filename is None: - filename = get_default_session_filename(self.username) + filename = get_default_session_filename(self.context.username) dirname = os.path.dirname(filename) if dirname != '' and not os.path.exists(dirname): os.makedirs(dirname) os.chmod(dirname, 0o700) with open(filename, 'wb') as sessionfile: os.chmod(filename, 0o600) - pickle.dump(requests.utils.dict_from_cookiejar(self.session.cookies), sessionfile) - self._log("Saved session to %s." % filename) + self.context.save_session_to_file(sessionfile) + self.context.log("Saved session to %s." % filename) def load_session_from_file(self, username: str, filename: Optional[str] = None) -> None: """Internally stores :class:`requests.Session` object loaded from file. @@ -1142,43 +1219,16 @@ class Instaloader: if filename is None: filename = get_default_session_filename(username) with open(filename, 'rb') as sessionfile: - session = requests.Session() - session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile)) - session.headers.update(self._default_http_header()) - session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']}) - self._log("Loaded session from %s." % filename) - self.session = session - self.username = username + self.context.load_session_from_file(username, sessionfile) + self.context.log("Loaded session from %s." % filename) def test_login(self) -> Optional[str]: """Returns the Instagram username to which given :class:`requests.Session` object belongs, or None.""" - data = self.graphql_query("d6f4427fbe92d846298cf93df0b937d3", {}) - return data["data"]["user"]["username"] if data["data"]["user"] is not None else None + return self.context.test_login() def login(self, user: str, passwd: str) -> None: """Log in to instagram with given username and password and internally store session object""" - session = requests.Session() - session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1', - 'ig_vw': '1920', 'csrftoken': '', - 's_network': '', 'ds_user_id': ''}) - session.headers.update(self._default_http_header()) - self._sleep() - resp = session.get('https://www.instagram.com/') - session.headers.update({'X-CSRFToken': resp.cookies['csrftoken']}) - self._sleep() - login = session.post('https://www.instagram.com/accounts/login/ajax/', - data={'password': passwd, 'username': user}, allow_redirects=True) - session.headers.update({'X-CSRFToken': login.cookies['csrftoken']}) - if login.status_code == 200: - self.session = session - if user == self.test_login(): - self.username = user - else: - self.username = None - self.session = None - raise BadCredentialsException('Login error! Check your credentials!') - else: - raise ConnectionException('Login error! Connection error!') + self.context.login(user, passwd) def download_post(self, post: Post, target: str) -> bool: """ @@ -1234,7 +1284,7 @@ class Instaloader: downloaded = self.download_pic(filename=filename, filename_alt=filename_old, url=post.url, mtime=post.date_local) else: - self.error("Warning: {0} has unknown typename: {1}".format(post, post.typename)) + self.context.error("Warning: {0} has unknown typename: {1}".format(post, post.typename)) # Save caption if desired if self.save_captions is not Tristate.never: @@ -1242,7 +1292,7 @@ class Instaloader: self.save_caption(filename=filename, filename_alt=filename_old, mtime=post.date_local, caption=post.caption) else: - self._log("", end=' ', flush=True) + self.context.log("", end=' ', flush=True) # Download video if desired if post.is_video and self.download_videos is Tristate.always: @@ -1264,7 +1314,7 @@ class Instaloader: if self.save_metadata is Tristate.always: self.save_metadata_json(filename, post) - self._log() + self.context.log() return downloaded @_requires_login @@ -1277,13 +1327,14 @@ class Instaloader: """ if userids is None: - data = self.graphql_query("d15efd8c0c5b23f0ef71f18bf363c704", {"only_stories": True})["data"]["user"] + data = self.context.graphql_query("d15efd8c0c5b23f0ef71f18bf363c704", + {"only_stories": True})["data"]["user"] if data is None: raise BadResponseException('Bad stories reel JSON.') userids = list(edge["node"]["id"] for edge in data["feed_reels_tray"]["edge_reels_tray_to_reel"]["edges"]) - stories = self.graphql_query("bf41e22b1c4ba4c9f31b844ebb7d9056", - {"reel_ids": userids, "precomposed_overlay": False})["data"] + stories = self.context.graphql_query("bf41e22b1c4ba4c9f31b844ebb7d9056", + {"reel_ids": userids, "precomposed_overlay": False})["data"] for media in stories["reels_media"]: yield media @@ -1304,7 +1355,7 @@ class Instaloader: """ if not userids: - self._log("Retrieving all visible stories...") + self.context.log("Retrieving all visible stories...") if format_string_contains_key(self.filename_pattern, 'post'): raise InvalidArgumentException("The \"post\" keyword is not supported in the filename pattern when " @@ -1314,13 +1365,13 @@ class Instaloader: if "items" not in user_stories: raise BadResponseException('Bad reel media JSON.') name = user_stories["user"]["username"].lower() - self._log("Retrieving stories from profile {}.".format(name)) + self.context.log("Retrieving stories from profile {}.".format(name)) totalcount = len(user_stories["items"]) count = 1 for item in user_stories["items"]: - self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True) + self.context.log("[%3i/%3i] " % (count, totalcount), end="", flush=True) count += 1 - with self._error_catcher('Download story from user {}'.format(name)): + with self.context.error_catcher('Download story from user {}'.format(name)): downloaded = self.download_story(item, filename_target, name) if fast_update and not downloaded: break @@ -1357,29 +1408,27 @@ class Instaloader: filename_alt=filename_old, url=item["video_resources"][-1]["src"], mtime=date_local) - self._log() + self.context.log() return downloaded @_requires_login def get_feed_posts(self) -> Iterator[Post]: """Get Posts of the user's feed.""" - if not self.is_logged_in: - return - data = self.graphql_query("d6f4427fbe92d846298cf93df0b937d3", {})["data"] + data = self.context.graphql_query("d6f4427fbe92d846298cf93df0b937d3", {})["data"] while True: feed = data["user"]["edge_web_feed_timeline"] - yield from (Post(self, edge["node"]) for edge in feed["edges"] + yield from (Post(self.context, edge["node"]) for edge in feed["edges"] if not edge["node"]["__typename"] == "GraphSuggestedUserFeedUnit") if not feed["page_info"]["has_next_page"]: break - data = self.graphql_query("d6f4427fbe92d846298cf93df0b937d3", - {'fetch_media_item_count': 12, - 'fetch_media_item_cursor': feed["page_info"]["end_cursor"], - 'fetch_comment_count': 4, - 'fetch_like': 10, - 'has_stories': False})["data"] + data = self.context.graphql_query("d6f4427fbe92d846298cf93df0b937d3", + {'fetch_media_item_count': 12, + 'fetch_media_item_cursor': feed["page_info"]["end_cursor"], + 'fetch_comment_count': 4, + 'fetch_like': 10, + 'has_stories': False})["data"] @_requires_login def download_feed_posts(self, max_count: int = None, fast_update: bool = False, @@ -1398,18 +1447,18 @@ class Instaloader: :param fast_update: If true, abort when first already-downloaded picture is encountered :param filter_func: function(post), which returns True if given picture should be downloaded """ - self._log("Retrieving pictures from your feed...") + self.context.log("Retrieving pictures from your feed...") count = 1 for post in self.get_feed_posts(): if max_count is not None and count > max_count: break name = post.owner_username if filter_func is not None and not filter_func(post): - self._log("" % name, flush=True) + self.context.log("" % name, flush=True) continue - self._log("[%3i] %s " % (count, name), end="", flush=True) + self.context.log("[%3i] %s " % (count, name), end="", flush=True) count += 1 - with self._error_catcher('Download feed'): + with self.context.error_catcher('Download feed'): downloaded = self.download_post(post, target=':feed') if fast_update and not downloaded: break @@ -1423,18 +1472,18 @@ class Instaloader: :param fast_update: If true, abort when first already-downloaded picture is encountered :param filter_func: function(post), which returns True if given picture should be downloaded """ - self._log("Retrieving saved posts...") + self.context.log("Retrieving saved posts...") count = 1 - for post in Profile(self, self.username).get_saved_posts(): + for post in Profile(self.context, self.context.username).get_saved_posts(): if max_count is not None and count > max_count: break name = post.owner_username if filter_func is not None and not filter_func(post): - self._log("3}] {} ".format(count, name), end=str(), flush=True) + self.context.log("[{:>3}] {} ".format(count, name), end=str(), flush=True) count += 1 - with self._error_catcher('Download saved posts'): + with self.context.error_catcher('Download saved posts'): downloaded = self.download_post(post, target=':saved') if fast_update and not downloaded: break @@ -1442,16 +1491,19 @@ class Instaloader: @_requires_login def get_explore_posts(self) -> Iterator[Post]: """Get Posts which are worthy of exploring suggested by Instagram.""" - yield from (Post(self, node) for node in self.graphql_node_list("df0dcc250c2b18d9fd27c5581ef33c7c", - {}, 'https://www.instagram.com/explore/', - lambda d: d['data']['user']['edge_web_discover_media'])) + yield from (Post(self.context, node) + for node in self.context.graphql_node_list("df0dcc250c2b18d9fd27c5581ef33c7c", + {}, 'https://www.instagram.com/explore/', + lambda d: d['data']['user']['edge_web_discover_media'])) def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]: """Get Posts associated with a #hashtag.""" - yield from (Post(self, node) for node in - self.graphql_node_list("298b92c8d7cad703f7565aa892ede943", {'tag_name': hashtag}, - 'https://www.instagram.com/explore/tags/{0}/'.format(hashtag), - lambda d: d['data']['hashtag']['edge_hashtag_to_media'])) + yield from (Post(self.context, node) + for node in self.context.graphql_node_list("298b92c8d7cad703f7565aa892ede943", + {'tag_name': hashtag}, + 'https://www.instagram.com/explore/tags/{0}/' + .format(hashtag), + lambda d: d['data']['hashtag']['edge_hashtag_to_media'])) def download_hashtag(self, hashtag: str, max_count: Optional[int] = None, @@ -1470,17 +1522,17 @@ class Instaloader: :param fast_update: If true, abort when first already-downloaded picture is encountered """ hashtag = hashtag.lower() - self._log("Retrieving pictures with hashtag {}...".format(hashtag)) + self.context.log("Retrieving pictures with hashtag {}...".format(hashtag)) count = 1 for post in self.get_hashtag_posts(hashtag): if max_count is not None and count > max_count: break - self._log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True) + self.context.log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True) if filter_func is not None and not filter_func(post): - self._log('') + self.context.log('') continue count += 1 - with self._error_catcher('Download hashtag #{}'.format(hashtag)): + with self.context.error_catcher('Download hashtag #{}'.format(hashtag)): downloaded = self.download_post(post, target='#' + hashtag) if fast_update and not downloaded: break @@ -1507,11 +1559,13 @@ class Instaloader: if (not profile_exists) or \ (profile_id != profile.userid): if profile_exists: - self._log("Profile {0} does not match the stored unique ID {1}.".format(profile_name, profile_id)) + self.context.log("Profile {0} does not match the stored unique ID {1}.".format(profile_name, + profile_id)) else: - self._log("Trying to find profile {0} using its unique ID {1}.".format(profile_name, profile_id)) + self.context.log("Trying to find profile {0} using its unique ID {1}.".format(profile_name, + profile_id)) newname = self.get_username_by_id(profile_id) - self._log("Profile {0} has changed its name to {1}.".format(profile_name, newname)) + self.context.log("Profile {0} has changed its name to {1}.".format(profile_name, newname)) if ((format_string_contains_key(self.dirname_pattern, 'profile') or format_string_contains_key(self.dirname_pattern, 'target'))): os.rename(self.dirname_pattern.format(profile=profile_name.lower(), @@ -1530,7 +1584,7 @@ class Instaloader: target=profile_name.lower()), exist_ok=True) with open(id_filename, 'w') as text_file: text_file.write(str(profile.userid) + "\n") - self._log("Stored ID {0} for profile {1}.".format(profile.userid, profile_name)) + self.context.log("Stored ID {0} for profile {1}.".format(profile.userid, profile_name)) return profile_name raise ProfileNotExistsException("Profile {0} does not exist.".format(profile_name)) @@ -1547,55 +1601,55 @@ class Instaloader: with suppress(ProfileNotExistsException): # ProfileNotExistsException is raised again later in check_profile_id() when we search the profile, so we # must suppress it here. - profile = Profile(self, profile_name) + profile = Profile(self.context, profile_name) # check if profile does exist or name has changed since last download # and update name and json data if necessary name_updated = self.check_profile_id(profile_name, profile) if name_updated != profile_name: profile_name = name_updated - profile = Profile(self, profile_name) + profile = Profile(self.context, profile_name) - if self.is_logged_in and profile.has_blocked_viewer and not profile.is_private: + if self.context.is_logged_in and profile.has_blocked_viewer and not profile.is_private: # raising ProfileNotExistsException invokes "trying again anonymously" logic raise ProfileNotExistsException("Profile {} has blocked you".format(profile_name)) # Download profile picture if profile_pic or profile_pic_only: - with self._error_catcher('Download profile picture of {}'.format(profile_name)): + with self.context.error_catcher('Download profile picture of {}'.format(profile_name)): self.download_profilepic(profile) if profile_pic_only: return # Catch some errors if profile.is_private: - if not self.is_logged_in: + if not self.context.is_logged_in: raise LoginRequiredException("profile %s requires login" % profile_name) if not profile.followed_by_viewer and \ - self.username != profile.username: + self.context.username != profile.username: raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % profile_name) else: - if self.is_logged_in and not (download_stories or download_stories_only): - self._log("profile %s could also be downloaded anonymously." % profile_name) + if self.context.is_logged_in and not (download_stories or download_stories_only): + self.context.log("profile %s could also be downloaded anonymously." % profile_name) # Download stories, if requested if download_stories or download_stories_only: - with self._error_catcher("Download stories of {}".format(profile_name)): + with self.context.error_catcher("Download stories of {}".format(profile_name)): self.download_stories(userids=[profile.userid], filename_target=profile_name, fast_update=fast_update) if download_stories_only: return # Iterate over pictures and download them - self._log("Retrieving posts from profile {}.".format(profile_name)) + self.context.log("Retrieving posts from profile {}.".format(profile_name)) totalcount = profile.mediacount count = 1 for post in profile.get_posts(): - self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True) + self.context.log("[%3i/%3i] " % (count, totalcount), end="", flush=True) count += 1 if filter_func is not None and not filter_func(post): - self._log('') + self.context.log('') continue - with self._error_catcher('Download profile {}'.format(profile_name)): + with self.context.error_catcher('Download profile {}'.format(profile_name)): downloaded = self.download_post(post, target=profile_name) if fast_update and not downloaded: break @@ -1604,7 +1658,7 @@ class Instaloader: """Logs in and internally stores session, asking user for password interactively. :raises LoginRequiredException: when in quiet mode.""" - if self.quiet: + if self.context.quiet: raise LoginRequiredException("Quiet mode requires given password or valid session file.") password = None while password is None: @@ -1625,7 +1679,7 @@ class Instaloader: # Parse and generate filter function if filter_str is not None: filter_func = filterstr_to_filterfunc(filter_str, username is not None) - self._log('Only download posts with property "{}".'.format(filter_str)) + self.context.log('Only download posts with property "{}".'.format(filter_str)) else: filter_func = None # Login, if desired @@ -1635,13 +1689,13 @@ class Instaloader: except FileNotFoundError as err: if sessionfile is not None: print(err, file=sys.stderr) - self._log("Session file does not exist yet - Logging in.") - if not self.is_logged_in or username != self.test_login(): + self.context.log("Session file does not exist yet - Logging in.") + if not self.context.is_logged_in or username != self.test_login(): if password is not None: self.login(username, password) else: self.interactive_login(username) - self._log("Logged in as %s." % username) + self.context.log("Logged in as %s." % username) # Try block for KeyboardInterrupt (save session on ^C) profiles = set() try: @@ -1649,9 +1703,9 @@ class Instaloader: for target in targetlist: # strip '/' characters to be more shell-autocompletion-friendly target = target.rstrip('/') - with self._error_catcher(target): + with self.context.error_catcher(target): if target[0] == '@': - self._log("Retrieving followees of %s..." % target[1:]) + self.context.log("Retrieving followees of %s..." % target[1:]) profiles.update([followee['username'] for followee in self.get_followees(target[1:])]) elif target[0] == '#': self.download_hashtag(hashtag=target[1:], max_count=max_count, fast_update=fast_update, @@ -1667,19 +1721,19 @@ class Instaloader: else: profiles.add(target) if len(profiles) > 1: - self._log("Downloading {} profiles: {}".format(len(profiles), ','.join(profiles))) + self.context.log("Downloading {} profiles: {}".format(len(profiles), ','.join(profiles))) # Iterate through profiles list and download them for target in profiles: - with self._error_catcher(target): + with self.context.error_catcher(target): try: self.download_profile(target, profile_pic, profile_pic_only, fast_update, stories, stories_only, filter_func=filter_func) except ProfileNotExistsException as err: - if not self.is_logged_in: - self._log(err) - self._log("Trying again anonymously, helps in case you are just blocked.") + if not self.context.is_logged_in: + self.context.log(err) + self.context.log("Trying again anonymously, helps in case you are just blocked.") with self.anonymous_copy() as anonymous_loader: - with self._error_catcher(): + with self.context.error_catcher(): anonymous_loader.download_profile(target, profile_pic, profile_pic_only, fast_update, filter_func=filter_func) else: @@ -1687,20 +1741,16 @@ class Instaloader: except KeyboardInterrupt: print("\nInterrupted by user.", file=sys.stderr) # Save session if it is useful - if self.is_logged_in: + if self.context.is_logged_in: self.save_session_to_file(sessionfile) # User might be confused if Instaloader does nothing if not targetlist: - if self.is_logged_in: + if self.context.is_logged_in: # Instaloader did at least save a session file - self._log("No targets were specified, thus nothing has been downloaded.") + self.context.log("No targets were specified, thus nothing has been downloaded.") else: # Instloader did not do anything - self._log("usage:"+USAGE_STRING) - if self.error_log: - print("\nErrors occured:", file=sys.stderr) - for err in self.error_log: - print(err, file=sys.stderr) + self.context.log("usage:"+USAGE_STRING) def main(): @@ -1856,6 +1906,7 @@ def main(): stories=args.stories, stories_only=args.stories_only, filter_str=args.only_if) + loader.close() except InstaloaderException as err: raise SystemExit("Fatal error: %s" % err) diff --git a/test/instaloader-unittests.py b/test/instaloader-unittests.py index 9a5688c..846200e 100644 --- a/test/instaloader-unittests.py +++ b/test/instaloader-unittests.py @@ -25,10 +25,10 @@ class TestInstaloader(unittest.TestCase): self.L = instaloader.Instaloader(download_geotags=instaloader.Tristate.always, download_comments=instaloader.Tristate.always, save_metadata=instaloader.Tristate.always) - self.L.raise_all_errors = True + self.L.context.raise_all_errors = True def tearDown(self): - self.L.session.close() + self.L.close() os.chdir('/') print("Removing {}".format(self.dir)) shutil.rmtree(self.dir) @@ -40,14 +40,12 @@ class TestInstaloader(unittest.TestCase): @unittest.SkipTest def test_stories_download(self): - if not self.L.is_logged_in: - self.L.load_session_from_file(OWN_USERNAME) + self.L.load_session_from_file(OWN_USERNAME) self.L.download_stories() @unittest.SkipTest def test_private_profile_download(self): - if not self.L.is_logged_in: - self.L.load_session_from_file(OWN_USERNAME) + self.L.load_session_from_file(OWN_USERNAME) self.L.download_profile(PRIVATE_PROFILE, download_stories=True) def test_profile_pic_download(self): @@ -57,45 +55,38 @@ class TestInstaloader(unittest.TestCase): self.L.download_hashtag(HASHTAG, NORMAL_MAX_COUNT) def test_feed_download(self): - if not self.L.is_logged_in: - self.L.load_session_from_file(OWN_USERNAME) + self.L.load_session_from_file(OWN_USERNAME) self.L.download_feed_posts(NORMAL_MAX_COUNT) def test_feed_paging(self): - if not self.L.is_logged_in: - self.L.load_session_from_file(OWN_USERNAME) + self.L.load_session_from_file(OWN_USERNAME) for count, post in enumerate(self.L.get_feed_posts()): print(post) if count == PAGING_MAX_COUNT: break def test_saved_download(self): - if not self.L.is_logged_in: - self.L.load_session_from_file(OWN_USERNAME) + self.L.load_session_from_file(OWN_USERNAME) self.L.download_saved_posts(NORMAL_MAX_COUNT) def test_saved_paging(self): - if not self.L.is_logged_in: - self.L.load_session_from_file(OWN_USERNAME) - for count, post in enumerate(instaloader.Profile(self.L, OWN_USERNAME).get_saved_posts()): + self.L.load_session_from_file(OWN_USERNAME) + for count, post in enumerate(instaloader.Profile(self.L.context, OWN_USERNAME).get_saved_posts()): print(post) if count == PAGING_MAX_COUNT: break def test_test_login(self): - if not self.L.is_logged_in: - self.L.load_session_from_file(OWN_USERNAME) + self.L.load_session_from_file(OWN_USERNAME) self.assertEqual(OWN_USERNAME, self.L.test_login()) def test_get_followees(self): - if not self.L.is_logged_in: - self.L.load_session_from_file(OWN_USERNAME) + self.L.load_session_from_file(OWN_USERNAME) for f in self.L.get_followees(OWN_USERNAME): print(f['username']) def test_get_followers(self): - if not self.L.is_logged_in: - self.L.load_session_from_file(OWN_USERNAME) + self.L.load_session_from_file(OWN_USERNAME) for f in self.L.get_followers(OWN_USERNAME): print(f['username']) @@ -106,16 +97,15 @@ class TestInstaloader(unittest.TestCase): self.assertEqual(PUBLIC_PROFILE_ID, self.L.get_id_by_username(PUBLIC_PROFILE)) def test_get_likes(self): - if not self.L.is_logged_in: - self.L.load_session_from_file(OWN_USERNAME) - for post in instaloader.Profile(self.L, OWN_USERNAME).get_posts(): + self.L.load_session_from_file(OWN_USERNAME) + for post in instaloader.Profile(self.L.context, OWN_USERNAME).get_posts(): for like in post.get_likes(): print(like['username']) break def test_post_from_mediaid(self): - for post in instaloader.Profile(self.L, PUBLIC_PROFILE).get_posts(): - post2 = instaloader.Post.from_mediaid(self.L, post.mediaid) + for post in instaloader.Profile(self.L.context, PUBLIC_PROFILE).get_posts(): + post2 = instaloader.Post.from_mediaid(self.L.context, post.mediaid) self.assertEqual(post, post2) break