Post class representing an Instagram Post
This simplifies accessing properties of a Post. Method download_post()
remains to class Instaloader rather than Post, as it fits there better.
Also, since it is now easily possible, all download_*() functions now
have a filter_func parameter. Its meaning has been reverted to be
consistent of how a filter is commonly understood: A post is downloaded
iff filter_func is None or evaluates to True.
Post.get_comments() foreports commit 86fb80d
("Avoid GraphQL queries if
all comments in metadata").
This commit is contained in:
parent
ccdac0305f
commit
0f64768dd8
@ -260,10 +260,7 @@ You could also download your last 20 liked pics with
|
||||
.. code:: python
|
||||
|
||||
loader.download_feed_posts(max_count=20, fast_update=True,
|
||||
filter_func=lambda node:
|
||||
not node["likes"]["viewer_has_liked"]
|
||||
if "likes" in node else
|
||||
not node["viewer_has_liked"])
|
||||
filter_func=lambda post: post.viewer_has_liked)
|
||||
|
||||
To download the last 20 pictures with hashtag #cat, do
|
||||
|
||||
|
345
instaloader.py
345
instaloader.py
@ -123,6 +123,144 @@ def format_string_contains_key(format_string: str, key: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
class Post:
|
||||
"""
|
||||
Structure containing information about an Instagram post.
|
||||
|
||||
Created by Instaloader methods get_profile_posts(), get_hashtag_posts(), get_feed_posts(). Posts are linked to
|
||||
an Instaloader instance which is used for error logging and obtaining of additional metadata, if required.
|
||||
This class unifies access to the properties associated with a post. It implements == and is hashable.
|
||||
"""
|
||||
|
||||
def __init__(self, instaloader: 'Instaloader', node: Dict[str, Any], profile: Optional[str] = None):
|
||||
"""Create a Post instance from a node structure as returned by Instagram.
|
||||
|
||||
:param instaloader: Instaloader instance used for additional queries if neccessary.
|
||||
:param node: Node structure.
|
||||
:param profile: The name of the owner, if already known at creation.
|
||||
"""
|
||||
self._instaloader = instaloader
|
||||
self._node = node
|
||||
self._profile = profile
|
||||
self._full_metadata_dict = None
|
||||
|
||||
@classmethod
|
||||
def from_shortcode(cls, instaloader: 'Instaloader', shortcode: str):
|
||||
"""Create a post object from a given shortcode"""
|
||||
# pylint:disable=protected-access
|
||||
post = cls(instaloader, {'shortcode': shortcode})
|
||||
post._node = post._full_metadata
|
||||
return post
|
||||
|
||||
@classmethod
|
||||
def from_mediaid(cls, instaloader: 'Instaloader', mediaid: int):
|
||||
"""Create a post object from a given mediaid"""
|
||||
return cls.from_shortcode(instaloader, mediaid_to_shortcode(mediaid))
|
||||
|
||||
@property
|
||||
def shortcode(self) -> str:
|
||||
return self._node['shortcode'] if 'shortcode' in self._node else self._node['code']
|
||||
|
||||
def __repr__(self):
|
||||
return '<Post {}>'.format(self.shortcode)
|
||||
|
||||
def __eq__(self, o: object) -> bool:
|
||||
if isinstance(o, Post):
|
||||
return self.shortcode == o.shortcode
|
||||
return NotImplemented
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(self.shortcode)
|
||||
|
||||
@property
|
||||
def _full_metadata(self) -> Dict[str, Any]:
|
||||
if not self._full_metadata_dict:
|
||||
pic_json = self._instaloader.get_json("p/{0}/".format(self.shortcode), params={'__a': 1})
|
||||
if "graphql" in pic_json:
|
||||
self._full_metadata_dict = pic_json["graphql"]["shortcode_media"]
|
||||
else:
|
||||
self._full_metadata_dict = pic_json["media"]
|
||||
return self._full_metadata_dict
|
||||
|
||||
@property
|
||||
def owner_username(self) -> str:
|
||||
"""The Post's lowercase owner name, or 'UNKNOWN'."""
|
||||
try:
|
||||
if self._profile:
|
||||
return self._profile.lower()
|
||||
if 'owner' in self._node and 'username' in self._node['owner']:
|
||||
return self._node['owner']['username'].lower()
|
||||
return self._full_metadata['owner']['username'].lower()
|
||||
except (InstaloaderException, KeyError, TypeError) as err:
|
||||
self._instaloader.error("Get owner name of {}: {} -- using \'UNKNOWN\'.".format(self, err))
|
||||
return 'UNKNOWN'
|
||||
|
||||
@property
|
||||
def date(self) -> datetime:
|
||||
return datetime.fromtimestamp(self._node["date"] if "date" in self._node else self._node["taken_at_timestamp"])
|
||||
|
||||
@property
|
||||
def url(self) -> str:
|
||||
return self._node["display_url"] if "display_url" in self._node else self._node["display_src"]
|
||||
|
||||
@property
|
||||
def typename(self) -> str:
|
||||
"""Type of post, GraphImage, GraphVideo or GraphSidecar"""
|
||||
if '__typename' in self._node:
|
||||
return self._node['__typename']
|
||||
# if __typename is not in node, it is an old image or video
|
||||
return 'GraphImage'
|
||||
|
||||
@property
|
||||
def sidecar_edges(self) -> List[Dict[str, Any]]:
|
||||
return self._full_metadata['edge_sidecar_to_children']['edges']
|
||||
|
||||
@property
|
||||
def caption(self) -> Optional[str]:
|
||||
if "edge_media_to_caption" in self._node and self._node["edge_media_to_caption"]["edges"]:
|
||||
return self._node["edge_media_to_caption"]["edges"][0]["node"]["text"]
|
||||
elif "caption" in self._node:
|
||||
return self._node["caption"]
|
||||
|
||||
@property
|
||||
def is_video(self) -> bool:
|
||||
return self._node['is_video']
|
||||
|
||||
@property
|
||||
def video_url(self) -> str:
|
||||
return self._full_metadata['video_url']
|
||||
|
||||
@property
|
||||
def viewer_has_liked(self) -> bool:
|
||||
"""Whether the viewer has liked the post.
|
||||
|
||||
:raises LoginRequiredException: if not logged in."""
|
||||
if not self._instaloader.is_logged_in:
|
||||
raise LoginRequiredException("Login required to obtain whether viewer has liked {}.".format(self))
|
||||
if 'likes' in self._node:
|
||||
return self._node['likes']['viewer_has_liked']
|
||||
if 'viewer_has_liked' in self._node:
|
||||
return self._node['viewer_has_liked']
|
||||
return self._full_metadata['viewer_has_liked']
|
||||
|
||||
def get_comments(self) -> Iterator[Dict[str, Any]]:
|
||||
comments_in_metadata = self._full_metadata['edge_media_to_comment']
|
||||
if comments_in_metadata['count'] == len(comments_in_metadata['edges']):
|
||||
# If the Post's metadata already contains all comments, don't do GraphQL requests to obtain them
|
||||
yield from (comment['node'] for comment in comments_in_metadata['edges'])
|
||||
yield from self._instaloader.graphql_node_list(17852405266163336, {'shortcode': self.shortcode},
|
||||
'https://www.instagram.com/p/' + self.shortcode + '/',
|
||||
lambda d: d['data']['shortcode_media']['edge_media_to_comment'])
|
||||
|
||||
def get_location(self) -> Optional[Dict[str, str]]:
|
||||
"""If the Post has a location, returns a dictionary with fields 'lat' and 'lng'."""
|
||||
loc_dict = self._full_metadata["location"]
|
||||
if loc_dict is not None:
|
||||
location_json = self._instaloader.get_json("explore/locations/{0}/".format(loc_dict["id"]),
|
||||
params={'__a': 1})
|
||||
return location_json["location"]
|
||||
|
||||
|
||||
class Tristate(Enum):
|
||||
"""Tri-state to encode whether we should save certain information, i.e. videos, captions, comments or geotags.
|
||||
|
||||
@ -161,9 +299,13 @@ class Instaloader:
|
||||
self.download_captions = download_captions
|
||||
self.download_comments = download_comments
|
||||
|
||||
# error log, filled with _error() and printed at the end of Instaloader.main()
|
||||
# error log, filled with error() and printed at the end of Instaloader.main()
|
||||
self.error_log = []
|
||||
|
||||
@property
|
||||
def is_logged_in(self) -> bool:
|
||||
return bool(self.username)
|
||||
|
||||
@contextmanager
|
||||
def anonymous_copy(self):
|
||||
"""Yield an anonymous, otherwise equally-configured copy of an Instaloader instance; Then copy its error log."""
|
||||
@ -179,7 +321,7 @@ class Instaloader:
|
||||
if not self.quiet:
|
||||
print(*msg, sep=sep, end=end, flush=flush)
|
||||
|
||||
def _error(self, msg):
|
||||
def error(self, msg):
|
||||
"""Log a non-fatal error message to stderr, which is repeated at program termination."""
|
||||
print(msg, file=sys.stderr)
|
||||
self.error_log.append(msg)
|
||||
@ -194,9 +336,9 @@ class Instaloader:
|
||||
yield
|
||||
except InstaloaderException as err:
|
||||
if extra_info:
|
||||
self._error('{}: {}'.format(extra_info, err))
|
||||
self.error('{}: {}'.format(extra_info, err))
|
||||
else:
|
||||
self._error('{}'.format(err))
|
||||
self.error('{}'.format(err))
|
||||
|
||||
def _sleep(self):
|
||||
"""Sleep a short, random time if self.sleep is set. Called before each request to the instagram.com."""
|
||||
@ -225,11 +367,11 @@ class Instaloader:
|
||||
if tries <= 1:
|
||||
raise ConnectionException(error_string)
|
||||
else:
|
||||
self._error(error_string + " [retrying]")
|
||||
self.error(error_string + " [retrying]")
|
||||
self._sleep()
|
||||
self._get_and_write_raw(url, filename, tries - 1)
|
||||
|
||||
def _get_json(self, url: str, params: Optional[Dict[str, Any]] = None,
|
||||
def get_json(self, url: str, params: Optional[Dict[str, Any]] = None,
|
||||
session: Optional[requests.Session] = None, tries: int = 3) -> Dict[str, Any]:
|
||||
"""JSON request to Instagram.
|
||||
|
||||
@ -262,9 +404,9 @@ class Instaloader:
|
||||
if tries <= 1:
|
||||
raise ConnectionException(error_string)
|
||||
else:
|
||||
self._error(error_string + " [retrying]")
|
||||
self.error(error_string + " [retrying]")
|
||||
self._sleep()
|
||||
self._get_json(url, params, sess, tries - 1)
|
||||
self.get_json(url, params, sess, tries - 1)
|
||||
|
||||
def _default_http_header(self, empty_session_only: bool = False) -> Dict[str, str]:
|
||||
"""Returns default HTTP header we use for requests."""
|
||||
@ -314,11 +456,11 @@ class Instaloader:
|
||||
tmpsession.headers['accept'] = '*/*'
|
||||
if referer is not None:
|
||||
tmpsession.headers['referer'] = referer
|
||||
resp_json = self._get_json('graphql/query', params={'query_id': query_id,
|
||||
resp_json = self.get_json('graphql/query', params={'query_id': query_id,
|
||||
'variables': json.dumps(variables, separators=(',', ':'))},
|
||||
session=tmpsession)
|
||||
if 'status' not in resp_json:
|
||||
self._error("GraphQL response did not contain a \"status\" field.")
|
||||
self.error("GraphQL response did not contain a \"status\" field.")
|
||||
return resp_json
|
||||
|
||||
def get_username_by_id(self, profile_id: int) -> str:
|
||||
@ -335,8 +477,7 @@ class Instaloader:
|
||||
else:
|
||||
raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").")
|
||||
else:
|
||||
shortcode = mediaid_to_shortcode(int(data['edges'][0]["node"]["id"]))
|
||||
return self.get_post_metadata(shortcode)['owner']['username']
|
||||
return Post.from_mediaid(self, int(data['edges'][0]["node"]["id"])).owner_username
|
||||
|
||||
def get_id_by_username(self, profile: str) -> int:
|
||||
"""Each Instagram profile has its own unique ID which stays unmodified even if a user changes
|
||||
@ -396,19 +537,13 @@ class Instaloader:
|
||||
os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
|
||||
return True
|
||||
|
||||
def get_comments(self, shortcode: str) -> Iterator[Dict[str, Any]]:
|
||||
"""Retrieve comments of node with given shortcode."""
|
||||
yield from self.graphql_node_list(17852405266163336, {'shortcode': shortcode},
|
||||
'https://www.instagram.com/p/' + shortcode + '/',
|
||||
lambda d: d['data']['shortcode_media']['edge_media_to_comment'])
|
||||
|
||||
def update_comments(self, filename: str, shortcode: str) -> None:
|
||||
def update_comments(self, filename: str, post: Post) -> None:
|
||||
filename += '_comments.json'
|
||||
try:
|
||||
comments = json.load(open(filename))
|
||||
except FileNotFoundError:
|
||||
comments = list()
|
||||
comments.extend(self.get_comments(shortcode))
|
||||
comments.extend(post.get_comments())
|
||||
if comments:
|
||||
with open(filename, 'w') as file:
|
||||
comments_list = sorted(sorted(list(comments), key=lambda t: t['id']),
|
||||
@ -533,7 +668,7 @@ class Instaloader:
|
||||
def test_login(self, session: Optional[requests.Session]) -> Optional[str]:
|
||||
"""Returns the Instagram username to which given requests.Session object belongs, or None."""
|
||||
if session:
|
||||
data = self._get_json('', params={'__a': 1}, session=session)
|
||||
data = self.get_json('', params={'__a': 1}, session=session)
|
||||
return data['graphql']['user']['username'] if 'graphql' in data else None
|
||||
|
||||
def login(self, user: str, passwd: str) -> None:
|
||||
@ -559,96 +694,63 @@ class Instaloader:
|
||||
else:
|
||||
raise ConnectionException('Login error! Connection error!')
|
||||
|
||||
def get_post_metadata(self, shortcode: str) -> Dict[str, Any]:
|
||||
"""Get full metadata of the post associated with given shortcode."""
|
||||
pic_json = self._get_json("p/{0}/".format(shortcode), params={'__a': 1})
|
||||
media = pic_json["graphql"]["shortcode_media"] if "graphql" in pic_json else pic_json["media"]
|
||||
return media
|
||||
|
||||
def get_location(self, post_metadata: Dict[str, Any]) -> Optional[Dict[str, str]]:
|
||||
if post_metadata["location"] is not None:
|
||||
location_json = self._get_json("explore/locations/{0}/".format(post_metadata["location"]["id"]),
|
||||
params={'__a': 1})
|
||||
return location_json["location"]
|
||||
|
||||
def download_post(self, node: Dict[str, Any], profile: Optional[str], target: str) -> bool:
|
||||
def download_post(self, post: Post, target: str) -> bool:
|
||||
"""
|
||||
Download everything associated with one instagram post node, i.e. picture, caption and video.
|
||||
|
||||
:param node: Node, as from media->nodes list in instagram's JSONs
|
||||
:param profile: Name of profile to which this node belongs
|
||||
:param post: Post to download.
|
||||
:param target: Target name, i.e. profile name, #hashtag, :feed; for filename.
|
||||
:return: True if something was downloaded, False otherwise, i.e. file was already there
|
||||
"""
|
||||
already_has_profilename = profile is not None or ('owner' in node and 'username' in node['owner'])
|
||||
|
||||
# Format dirname and filename. post.owner_username might do an additional request, so only access it, if
|
||||
# {profile} is part of the dirname pattern or filename pattern.
|
||||
needs_profilename = (format_string_contains_key(self.dirname_pattern, 'profile') or
|
||||
format_string_contains_key(self.filename_pattern, 'profile'))
|
||||
shortcode = node['shortcode'] if 'shortcode' in node else node['code']
|
||||
post_metadata = None
|
||||
if needs_profilename:
|
||||
if already_has_profilename:
|
||||
profilename = profile if profile is not None else node['owner']['username']
|
||||
profilename = profilename.lower()
|
||||
else:
|
||||
try:
|
||||
post_metadata = self.get_post_metadata(shortcode)
|
||||
profilename = post_metadata['owner']['username'].lower()
|
||||
except (InstaloaderException, KeyError, TypeError) as err:
|
||||
self._error("Unable to get owner name of post {}: {} -- using \'UNKNOWN\'.".format(shortcode, err))
|
||||
profilename = 'UNKNOWN'
|
||||
else:
|
||||
profilename = None
|
||||
date = datetime.fromtimestamp(node["date"] if "date" in node else node["taken_at_timestamp"])
|
||||
profilename = post.owner_username if needs_profilename else None
|
||||
dirname = self.dirname_pattern.format(profile=profilename, target=target.lower())
|
||||
filename = dirname + '/' + self.filename_pattern.format(profile=profilename, target=target.lower(),
|
||||
date=date,
|
||||
shortcode=shortcode)
|
||||
date=post.date, shortcode=post.shortcode)
|
||||
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
||||
url = node["display_url"] if "display_url" in node else node["display_src"]
|
||||
if '__typename' in node:
|
||||
if node['__typename'] == 'GraphSidecar':
|
||||
if not post_metadata:
|
||||
post_metadata = self.get_post_metadata(shortcode)
|
||||
|
||||
# Download the image(s) / video thumbnail
|
||||
if post.typename == 'GraphSidecar':
|
||||
edge_number = 1
|
||||
downloaded = True
|
||||
for edge in post_metadata['edge_sidecar_to_children']['edges']:
|
||||
for edge in post.sidecar_edges:
|
||||
edge_downloaded = self.download_pic(filename=filename,
|
||||
url=edge['node']['display_url'],
|
||||
mtime=date,
|
||||
mtime=post.date,
|
||||
filename_suffix=str(edge_number))
|
||||
downloaded = downloaded and edge_downloaded
|
||||
edge_number += 1
|
||||
elif node['__typename'] in ['GraphImage', 'GraphVideo']:
|
||||
downloaded = self.download_pic(filename=filename,
|
||||
url=url,
|
||||
mtime=date)
|
||||
elif post.typename in ['GraphImage', 'GraphVideo']:
|
||||
downloaded = self.download_pic(filename=filename, url=post.url, mtime=post.date)
|
||||
else:
|
||||
self._log("Warning: Unknown typename discovered:" + node['__typename'])
|
||||
self.error("Warning: {0} has unknown typename: {1}".format(post, post.typename))
|
||||
downloaded = False
|
||||
else:
|
||||
# Node is an old image or video.
|
||||
downloaded = self.download_pic(filename=filename, url=url, mtime=date)
|
||||
|
||||
# Save caption if desired
|
||||
if self.download_captions is not Tristate.never:
|
||||
if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]:
|
||||
self.save_caption(filename, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"])
|
||||
elif "caption" in node:
|
||||
self.save_caption(filename, date, node["caption"])
|
||||
if post.caption:
|
||||
self.save_caption(filename, post.date, post.caption)
|
||||
else:
|
||||
self._log("<no caption>", end=' ', flush=True)
|
||||
if node["is_video"] and self.download_videos is Tristate.always:
|
||||
if not post_metadata:
|
||||
post_metadata = self.get_post_metadata(shortcode)
|
||||
self.download_pic(filename=filename,
|
||||
url=post_metadata['video_url'],
|
||||
mtime=date)
|
||||
|
||||
# Download video if desired
|
||||
if post.is_video and self.download_videos is Tristate.always:
|
||||
self.download_pic(filename=filename, url=post.video_url, mtime=post.date)
|
||||
|
||||
# Download geotags if desired
|
||||
if self.download_geotags is Tristate.always:
|
||||
if not post_metadata:
|
||||
post_metadata = self.get_post_metadata(shortcode)
|
||||
location = self.get_location(post_metadata)
|
||||
location = post.get_location()
|
||||
if location:
|
||||
self.save_location(filename, location, date)
|
||||
self.save_location(filename, location, post.date)
|
||||
|
||||
# Update comments if desired
|
||||
if self.download_comments is Tristate.always:
|
||||
self.update_comments(filename, shortcode)
|
||||
self.update_comments(filename, post)
|
||||
|
||||
self._log()
|
||||
return downloaded
|
||||
|
||||
@ -701,7 +803,7 @@ class Instaloader:
|
||||
:param filename_target: Replacement for {target} in dirname_pattern and filename_pattern
|
||||
"""
|
||||
|
||||
if self.username is None:
|
||||
if not self.is_logged_in:
|
||||
raise LoginRequiredException('Login required to download stories')
|
||||
|
||||
for user_stories in self.get_stories(userids):
|
||||
@ -759,10 +861,10 @@ class Instaloader:
|
||||
if fast_update and not downloaded:
|
||||
break
|
||||
|
||||
def get_feed_posts(self) -> Iterator[Dict[str, Any]]:
|
||||
def get_feed_posts(self) -> Iterator[Post]:
|
||||
"""Get Posts of the user's feed."""
|
||||
|
||||
data = self._get_json('', params={'__a': 1})
|
||||
data = self.get_json('', params={'__a': 1})
|
||||
|
||||
while True:
|
||||
if "graphql" in data:
|
||||
@ -776,9 +878,9 @@ class Instaloader:
|
||||
feed = data["feed"]["media"]
|
||||
|
||||
if is_edge:
|
||||
yield from [edge["node"] for edge in feed["edges"]]
|
||||
yield from (Post(self, edge["node"]) for edge in feed["edges"])
|
||||
else:
|
||||
yield from [node for node in feed["nodes"]]
|
||||
yield from (Post(self, node) for node in feed["nodes"])
|
||||
|
||||
if not feed["page_info"]["has_next_page"]:
|
||||
break
|
||||
@ -788,7 +890,7 @@ class Instaloader:
|
||||
'fetch_like': 10})
|
||||
|
||||
def download_feed_posts(self, max_count: int = None, fast_update: bool = False,
|
||||
filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None) -> None:
|
||||
filter_func: Optional[Callable[[Post], bool]] = None) -> None:
|
||||
"""
|
||||
Download pictures from the user's feed.
|
||||
|
||||
@ -796,39 +898,37 @@ class Instaloader:
|
||||
>>> loader = Instaloader()
|
||||
>>> loader.load_session_from_file('USER')
|
||||
>>> loader.download_feed_posts(max_count=20, fast_update=True,
|
||||
>>> filter_func=lambda post:
|
||||
>>> not post["likes"]["viewer_has_liked"]
|
||||
>>> if "likes" in post else
|
||||
>>> not post["viewer_has_liked"])
|
||||
>>> filter_func=lambda post: post.viewer_has_liked)
|
||||
|
||||
:param max_count: Maximum count of pictures to download
|
||||
:param fast_update: If true, abort when first already-downloaded picture is encountered
|
||||
:param filter_func: function(post), which returns True if given picture should not be downloaded
|
||||
:param filter_func: function(post), which returns True if given picture should be downloaded
|
||||
"""
|
||||
count = 1
|
||||
for post in self.get_feed_posts():
|
||||
if max_count is not None and count > max_count:
|
||||
break
|
||||
name = post["owner"]["username"]
|
||||
if filter_func is not None and filter_func(post):
|
||||
name = post.owner_username
|
||||
if filter_func is not None and not filter_func(post):
|
||||
self._log("<pic by %s skipped>" % name, flush=True)
|
||||
continue
|
||||
self._log("[%3i] %s " % (count, name), end="", flush=True)
|
||||
count += 1
|
||||
with self._error_catcher('Download feed'):
|
||||
downloaded = self.download_post(post, profile=name, target=':feed')
|
||||
downloaded = self.download_post(post, target=':feed')
|
||||
if fast_update and not downloaded:
|
||||
break
|
||||
|
||||
def get_hashtag_posts(self, hashtag: str) -> Iterator[Dict[str, Any]]:
|
||||
def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]:
|
||||
"""Get Posts associated with a #hashtag."""
|
||||
yield from self.graphql_node_list(17875800862117404, {'tag_name': hashtag},
|
||||
'https://www.instagram.com/explore/tags/' + hashtag + '/',
|
||||
lambda d: d['data']['hashtag']['edge_hashtag_to_media'])
|
||||
yield from (Post(self, node) for node in
|
||||
self.graphql_node_list(17875800862117404, {'tag_name': hashtag},
|
||||
'https://www.instagram.com/explore/tags/{0}/'.format(hashtag),
|
||||
lambda d: d['data']['hashtag']['edge_hashtag_to_media']))
|
||||
|
||||
def download_hashtag(self, hashtag: str,
|
||||
max_count: Optional[int] = None,
|
||||
filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None,
|
||||
filter_func: Optional[Callable[[Post], bool]] = None,
|
||||
fast_update: bool = False) -> None:
|
||||
"""Download pictures of one hashtag.
|
||||
|
||||
@ -838,7 +938,7 @@ class Instaloader:
|
||||
|
||||
:param hashtag: Hashtag to download, without leading '#'
|
||||
:param max_count: Maximum count of pictures to download
|
||||
:param filter_func: function(post), which returns True if given picture should not be downloaded
|
||||
:param filter_func: function(post), which returns True if given picture should be downloaded
|
||||
:param fast_update: If true, abort when first already-downloaded picture is encountered
|
||||
"""
|
||||
count = 1
|
||||
@ -846,12 +946,12 @@ class Instaloader:
|
||||
if max_count is not None and count > max_count:
|
||||
break
|
||||
self._log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True)
|
||||
if filter_func is not None and filter_func(post):
|
||||
if filter_func is not None and not filter_func(post):
|
||||
self._log('<skipped>')
|
||||
continue
|
||||
count += 1
|
||||
with self._error_catcher('Download hashtag #{}'.format(hashtag)):
|
||||
downloaded = self.download_post(node=post, profile=None, target='#' + hashtag)
|
||||
downloaded = self.download_post(post, target='#' + hashtag)
|
||||
if fast_update and not downloaded:
|
||||
break
|
||||
|
||||
@ -908,13 +1008,14 @@ class Instaloader:
|
||||
def get_profile_metadata(self, profile_name: str) -> Dict[str, Any]:
|
||||
"""Retrieves a profile's metadata, for use with e.g. get_profile_posts() and check_profile_id()."""
|
||||
try:
|
||||
return self._get_json('{}/'.format(profile_name), params={'__a': 1})
|
||||
return self.get_json('{}/'.format(profile_name), params={'__a': 1})
|
||||
except QueryReturnedNotFoundException:
|
||||
raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name))
|
||||
|
||||
def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
|
||||
def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Post]:
|
||||
"""Retrieve all posts from a profile."""
|
||||
yield from profile_metadata['user']['media']['nodes']
|
||||
profile_name = profile_metadata['user']['username']
|
||||
yield from (Post(self, node, profile=profile_name) for node in profile_metadata['user']['media']['nodes'])
|
||||
has_next_page = profile_metadata['user']['media']['page_info']['has_next_page']
|
||||
end_cursor = profile_metadata['user']['media']['page_info']['end_cursor']
|
||||
while has_next_page:
|
||||
@ -923,15 +1024,16 @@ class Instaloader:
|
||||
data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'],
|
||||
'first': 500,
|
||||
'after': end_cursor},
|
||||
'https://www.instagram.com/{0}/'.format(profile_metadata['user']['username']))
|
||||
'https://www.instagram.com/{0}/'.format(profile_name))
|
||||
media = data['data']['user']['edge_owner_to_timeline_media']
|
||||
yield from [edge['node'] for edge in media['edges']]
|
||||
yield from (Post(self, edge['node'], profile=profile_name) for edge in media['edges'])
|
||||
has_next_page = media['page_info']['has_next_page']
|
||||
end_cursor = media['page_info']['end_cursor']
|
||||
|
||||
def download_profile(self, name: str,
|
||||
profile_pic_only: bool = False, fast_update: bool = False,
|
||||
download_stories: bool = False, download_stories_only: bool = False) -> None:
|
||||
download_stories: bool = False, download_stories_only: bool = False,
|
||||
filter_func: Optional[Callable[[Post], bool]] = None) -> None:
|
||||
"""Download one profile"""
|
||||
|
||||
# Get profile main page json
|
||||
@ -956,13 +1058,13 @@ class Instaloader:
|
||||
|
||||
# Catch some errors
|
||||
if profile_metadata["user"]["is_private"]:
|
||||
if self.username is None:
|
||||
if not self.is_logged_in:
|
||||
raise LoginRequiredException("profile %s requires login" % name)
|
||||
if not profile_metadata["user"]["followed_by_viewer"] and \
|
||||
self.username != profile_metadata["user"]["username"]:
|
||||
raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name)
|
||||
else:
|
||||
if self.username is not None and not (download_stories or download_stories_only):
|
||||
if self.is_logged_in and not (download_stories or download_stories_only):
|
||||
self._log("profile %s could also be downloaded anonymously." % name)
|
||||
|
||||
# Download stories, if requested
|
||||
@ -982,8 +1084,11 @@ class Instaloader:
|
||||
for post in self.get_profile_posts(profile_metadata):
|
||||
self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
|
||||
count += 1
|
||||
if filter_func is not None and not filter_func(post):
|
||||
self._log('<skipped>')
|
||||
continue
|
||||
with self._error_catcher('Download profile {}'.format(name)):
|
||||
downloaded = self.download_post(node=post, profile=name, target=name)
|
||||
downloaded = self.download_post(post, target=name)
|
||||
if fast_update and not downloaded:
|
||||
break
|
||||
|
||||
@ -1015,7 +1120,7 @@ class Instaloader:
|
||||
if sessionfile is not None:
|
||||
print(err, file=sys.stderr)
|
||||
self._log("Session file does not exist yet - Logging in.")
|
||||
if self.username is None or username != self.test_login(self.session):
|
||||
if not self.is_logged_in or username != self.test_login(self.session):
|
||||
if password is not None:
|
||||
self.login(username, password)
|
||||
else:
|
||||
@ -1048,13 +1153,9 @@ class Instaloader:
|
||||
elif pentry == ":feed-liked":
|
||||
if username is not None:
|
||||
self._log("Retrieving pictures you liked from your feed...")
|
||||
def liked_filter(node):
|
||||
if "likes" in node:
|
||||
return not node["likes"]["viewer_has_liked"]
|
||||
return not node["viewer_has_liked"]
|
||||
with self._error_catcher():
|
||||
self.download_feed_posts(fast_update=fast_update, max_count=max_count,
|
||||
filter_func=liked_filter)
|
||||
filter_func=lambda post: post.viewer_has_liked)
|
||||
else:
|
||||
print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr)
|
||||
elif pentry == ":stories":
|
||||
|
Loading…
Reference in New Issue
Block a user