From b1edaddb3101882f6dd188679d89df686d6324eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Koch-Kramer?= Date: Tue, 13 Mar 2018 12:02:45 +0100 Subject: [PATCH] Support for new target :saved If logged in, Instaloader is now able to download posts which are marked as saved. This feature was suggested in #78. --- instaloader.py | 69 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/instaloader.py b/instaloader.py index 872c249..51aec45 100755 --- a/instaloader.py +++ b/instaloader.py @@ -22,7 +22,7 @@ from datetime import datetime from enum import Enum from io import BytesIO -from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple +from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union import requests import requests.utils @@ -651,12 +651,12 @@ class Instaloader: session.headers.update(self._default_http_header(empty_session_only=True)) return session - def graphql_query(self, query_id: int, variables: Dict[str, Any], + def graphql_query(self, query_identifier: Union[int, str], variables: Dict[str, Any], referer: Optional[str] = None) -> Dict[str, Any]: """ Do a GraphQL Query. - :param query_id: Query ID. + :param query_identifier: Query ID or Hash. :param variables: Variables for the Query. :param referer: HTTP Referer, or None. :return: The server's response dictionary. @@ -670,8 +670,9 @@ class Instaloader: tmpsession.headers['accept'] = '*/*' if referer is not None: tmpsession.headers['referer'] = urllib.parse.quote(referer) - resp_json = self.get_json('graphql/query', params={'query_id': query_id, - 'variables': json.dumps(variables, separators=(',', ':'))}, + resp_json = self.get_json('graphql/query', + params={'query_id' if isinstance(query_identifier, int) else 'query_hash': query_identifier, + 'variables': json.dumps(variables, separators=(',', ':'))}, session=tmpsession) if 'status' not in resp_json: self.error("GraphQL response did not contain a \"status\" field.") @@ -1214,6 +1215,56 @@ class Instaloader: if fast_update and not downloaded: break + def get_saved_posts(self) -> Iterator[Post]: + """Get Posts that are marked as saved by the user.""" + + data = self.get_profile_metadata(self.username) + user_id = data["user"]["id"] + + while True: + if "graphql" in data: + is_edge = True + saved_media = data["graphql"]["user"]["edge_saved_media"] + elif "data" in data: + is_edge = True + saved_media = data["data"]["user"]["edge_saved_media"] + else: + is_edge = False + saved_media = data["user"]["saved_media"] + + if is_edge: + yield from (Post(self, edge["node"]) for edge in saved_media["edges"]) + else: + yield from (Post(self, node) for node in saved_media["nodes"]) + + if not saved_media["page_info"]["has_next_page"]: + break + data = self.graphql_query("f883d95537fbcd400f466f63d42bd8a1", + {'id': user_id, 'first': 200, 'after': saved_media["page_info"]["end_cursor"]}) + + def download_saved_posts(self, max_count: int = None, fast_update: bool = False, + filter_func: Optional[Callable[[Post], bool]] = None) -> None: + """Download user's saved pictures. + + :param max_count: Maximum count of pictures to download + :param fast_update: If true, abort when first already-downloaded picture is encountered + :param filter_func: function(post), which returns True if given picture should be downloaded + """ + count = 1 + for post in self.get_saved_posts(): + if max_count is not None and count > max_count: + break + name = post.owner_username + if filter_func is not None and not filter_func(post): + self._log("3}] {} ".format(count, name), end=str(), flush=True) + count += 1 + with self._error_catcher('Download saved posts'): + downloaded = self.download_post(post, target=':saved') + if fast_update and not downloaded: + break + def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]: """Get Posts associated with a #hashtag.""" yield from (Post(self, node) for node in @@ -1471,6 +1522,14 @@ class Instaloader: self.download_stories(fast_update=fast_update) else: self.error("--login=USERNAME required to download {}.".format(pentry)) + elif pentry == ":saved": + if username is not None: + self._log("Retrieving saved posts...") + with self._error_catcher(): + self.download_saved_posts(fast_update=fast_update, max_count=max_count, + filter_func=filter_func) + else: + self.error("--login=USERNAME required to download {}.".format(pentry)) else: targets.add(pentry) if len(targets) > 1: