From 73ec884ea49cf099570b6b2dd4c6f1f751ae4b96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Koch-Kramer?= Date: Wed, 11 Apr 2018 14:19:24 +0200 Subject: [PATCH] Fix anonymous GraphQL queries Closes #94. --- instaloader.py | 73 +++++++++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/instaloader.py b/instaloader.py index e39ae4b..d53c30e 100755 --- a/instaloader.py +++ b/instaloader.py @@ -3,6 +3,7 @@ """Download pictures (or videos) along with their captions and other metadata from Instagram.""" import ast import getpass +import hashlib import json import os import pickle @@ -201,6 +202,7 @@ class Post: self._profile = profile self._profile_id = profile_id self._full_metadata_dict = None + self._rhx_gis = None @classmethod def from_shortcode(cls, instaloader: 'Instaloader', shortcode: str): @@ -239,11 +241,9 @@ class Post: @property def _full_metadata(self) -> Dict[str, Any]: if not self._full_metadata_dict: - pic_json = self._instaloader.get_json("p/{0}/".format(self.shortcode), params={'__a': 1}) - if "graphql" in pic_json: - self._full_metadata_dict = pic_json["graphql"]["shortcode_media"] - else: - self._full_metadata_dict = pic_json["media"] + pic_json = self._instaloader.get_json("p/{0}/".format(self.shortcode), params={}) + self._full_metadata_dict = pic_json['entry_data']['PostPage'][0]['graphql']['shortcode_media'] + self._rhx_gis = pic_json['rhx_gis'] return self._full_metadata_dict def _field(self, *keys) -> Any: @@ -387,7 +387,8 @@ class Post: return yield from self._instaloader.graphql_node_list(17852405266163336, {'shortcode': self.shortcode}, 'https://www.instagram.com/p/' + self.shortcode + '/', - lambda d: d['data']['shortcode_media']['edge_media_to_comment']) + lambda d: d['data']['shortcode_media']['edge_media_to_comment'], + rhx_gis=self._rhx_gis) def get_likes(self) -> Iterator[Dict[str, Any]]: """Iterate over all likes of the post. @@ -405,7 +406,8 @@ class Post: return yield from self._instaloader.graphql_node_list("1cb6ec562846122743b61e492c85999f", {'shortcode': self.shortcode}, 'https://www.instagram.com/p/' + self.shortcode + '/', - lambda d: d['data']['shortcode_media']['edge_liked_by']) + lambda d: d['data']['shortcode_media']['edge_liked_by'], + rhx_gis=self._rhx_gis) def get_location(self) -> Optional[Dict[str, str]]: """If the Post has a location, returns a dictionary with fields 'lat' and 'lng'.""" @@ -602,7 +604,7 @@ class Instaloader: :raises QueryReturnedNotFoundException: When the server responds with a 404. :raises ConnectionException: When query repeatedly failed. """ - def graphql_query_waittime(query_id: int, untracked_queries: bool = False) -> int: + def graphql_query_waittime(query_id: Union[int, str], untracked_queries: bool = False) -> int: sliding_window = 660 timestamps = self.previous_queries.get(query_id) if not timestamps: @@ -613,9 +615,9 @@ class Instaloader: if len(timestamps) < 100 and not untracked_queries: return 0 return round(min(timestamps) + sliding_window - current_time) + 6 - is_graphql_query = 'query_id' in params and 'graphql/query' in path + is_graphql_query = 'graphql/query' in path if is_graphql_query: - query_id = params['query_id'] + query_id = params['query_id'] if 'query_id' in params else params['query_hash'] waittime = graphql_query_waittime(query_id) if waittime > 0: self._log('\nToo many queries in the last time. Need to wait {} seconds.'.format(waittime)) @@ -635,7 +637,13 @@ class Instaloader: raise TooManyRequests("429 - Too Many Requests") if resp.status_code != 200: raise ConnectionException("HTTP error code {}.".format(resp.status_code)) - resp_json = resp.json() + if not is_graphql_query and not "__a" in params and host == "www.instagram.com": + match = re.search(r'window\._sharedData = (.*);', resp.text) + if match is None: + raise ConnectionException("Could not find \"window._sharedData\" in html response.") + return json.loads(match.group(1)) + else: + resp_json = resp.json() if 'status' in resp_json and resp_json['status'] != "ok": if 'message' in resp_json: raise ConnectionException("Returned \"{}\" status, message \"{}\".".format(resp_json['status'], @@ -695,7 +703,7 @@ class Instaloader: return session def graphql_query(self, query_identifier: Union[int, str], variables: Dict[str, Any], - referer: Optional[str] = None) -> Dict[str, Any]: + referer: Optional[str] = None, rhx_gis: Optional[str] = None) -> Dict[str, Any]: """ Do a GraphQL Query. @@ -713,9 +721,18 @@ class Instaloader: tmpsession.headers['accept'] = '*/*' if referer is not None: tmpsession.headers['referer'] = urllib.parse.quote(referer) + + variables_json = json.dumps(variables, separators=(',', ':')) + + if rhx_gis: + values = "{}:{}:{}:{}".format(rhx_gis, tmpsession.cookies['csrftoken'], self.user_agent, variables_json) + x_instagram_gis = hashlib.md5(values.encode()).hexdigest() + tmpsession.cookies.set('ig_pr', '2') + tmpsession.headers['x-instagram-gis'] = x_instagram_gis + resp_json = self.get_json('graphql/query', params={'query_id' if isinstance(query_identifier, int) else 'query_hash': query_identifier, - 'variables': json.dumps(variables, separators=(',', ':'))}, + 'variables': variables_json}, session=tmpsession) if 'status' not in resp_json: self.error("GraphQL response did not contain a \"status\" field.") @@ -740,20 +757,21 @@ class Instaloader: def get_id_by_username(self, profile: str) -> int: """Each Instagram profile has its own unique ID which stays unmodified even if a user changes his/her username. To get said ID, given the profile's name, you may call this function.""" - return int(self.get_profile_metadata(profile)['user']['id']) + return int(self.get_profile_metadata(profile)[0]['user']['id']) def graphql_node_list(self, query_identifier: Union[int, str], query_variables: Dict[str, Any], query_referer: Optional[str], - edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]]) -> Iterator[Dict[str, Any]]: + edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]], + rhx_gis: Optional[str] = None) -> Iterator[Dict[str, Any]]: """Retrieve a list of GraphQL nodes.""" query_variables['first'] = Instaloader.GRAPHQL_PAGE_LENGTH - data = self.graphql_query(query_identifier, query_variables, query_referer) + data = self.graphql_query(query_identifier, query_variables, query_referer, rhx_gis) while True: edge_struct = edge_extractor(data) yield from [edge['node'] for edge in edge_struct['edges']] if edge_struct['page_info']['has_next_page']: query_variables['after'] = edge_struct['page_info']['end_cursor'] - data = self.graphql_query(query_identifier, query_variables, query_referer) + data = self.graphql_query(query_identifier, query_variables, query_referer, rhx_gis) else: break @@ -1257,7 +1275,7 @@ class Instaloader: if not self.is_logged_in: return - data = self.get_profile_metadata(self.username) + data, full_metadata = self.get_profile_metadata(self.username) user_id = data["user"]["id"] while True: @@ -1277,7 +1295,8 @@ class Instaloader: break data = self.graphql_query("f883d95537fbcd400f466f63d42bd8a1", {'id': user_id, 'first': Instaloader.GRAPHQL_PAGE_LENGTH, - 'after': saved_media["page_info"]["end_cursor"]})['data'] + 'after': saved_media["page_info"]["end_cursor"]}, + rhx_gis=full_metadata['rhx_gis'])['data'] def download_saved_posts(self, max_count: int = None, fast_update: bool = False, filter_func: Optional[Callable[[Post], bool]] = None) -> None: @@ -1403,15 +1422,15 @@ class Instaloader: return profile, profile_id raise ProfileNotExistsException("Profile {0} does not exist.".format(profile)) - def get_profile_metadata(self, profile_name: str) -> Dict[str, Any]: + def get_profile_metadata(self, profile_name: str) -> Tuple[Dict[str, Any], Dict[str, Any]]: """Retrieves a profile's metadata, for use with e.g. :meth:`get_profile_posts` and :meth:`check_profile_id`.""" try: - metadata = self.get_json('{}/'.format(profile_name), params={'__a': 1}) - return metadata['graphql'] if 'graphql' in metadata else metadata + metadata = self.get_json('{}/'.format(profile_name), params={}) + return metadata['entry_data']['ProfilePage'][0]['graphql'], metadata except QueryReturnedNotFoundException: raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name)) - def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Post]: + def get_profile_posts(self, profile_metadata: Dict[str, Any], rhx_gis: str) -> Iterator[Post]: """Retrieve all posts from a profile.""" profile_name = profile_metadata['user']['username'] profile_id = int(profile_metadata['user']['id']) @@ -1432,7 +1451,7 @@ class Instaloader: data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'], 'first': Instaloader.GRAPHQL_PAGE_LENGTH, 'after': end_cursor}, - 'https://www.instagram.com/{0}/'.format(profile_name)) + referer='https://www.instagram.com/{0}/'.format(profile_name), rhx_gis=rhx_gis) media = data['data']['user']['edge_owner_to_timeline_media'] yield from (Post(self, edge['node'], profile=profile_name, profile_id=profile_id) for edge in media['edges']) @@ -1452,14 +1471,14 @@ class Instaloader: with suppress(ProfileNotExistsException): # ProfileNotExistsException is raised again later in check_profile_id() when we search the profile, so we # must suppress it here. - profile_metadata = self.get_profile_metadata(name) + profile_metadata, full_metadata = self.get_profile_metadata(name) # check if profile does exist or name has changed since last download # and update name and json data if necessary name_updated, profile_id = self.check_profile_id(name, profile_metadata) if name_updated != name: name = name_updated - profile_metadata = self.get_profile_metadata(name) + profile_metadata, full_metadata = self.get_profile_metadata(name) # Download profile picture if profile_pic or profile_pic_only: @@ -1494,7 +1513,7 @@ class Instaloader: else: totalcount = profile_metadata["user"]["edge_owner_to_timeline_media"]["count"] count = 1 - for post in self.get_profile_posts(profile_metadata): + for post in self.get_profile_posts(profile_metadata, rhx_gis=full_metadata['rhx_gis']): self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True) count += 1 if filter_func is not None and not filter_func(post):