
This fetch was done for sidecar posts that contain a video when going through a profile. The fetched information is already present with the new profile query introduced in the last commit, making this full metadata fetch query unnecessary. Instaloader now better evaluates if that fetch must be done or not. The fetch was also (eventually unnecessarily) made when accessing get_sidecar_posts() on a Post that has been loaded with load_structure_from_file().
1571 lines
62 KiB
Python
1571 lines
62 KiB
Python
import json
|
|
import lzma
|
|
import re
|
|
from base64 import b64decode, b64encode
|
|
from collections import namedtuple
|
|
from datetime import datetime
|
|
from typing import Any, Dict, Iterator, List, Optional, Union
|
|
|
|
from . import __version__
|
|
from .exceptions import *
|
|
from .instaloadercontext import InstaloaderContext
|
|
from .nodeiterator import FrozenNodeIterator, NodeIterator
|
|
|
|
PostSidecarNode = namedtuple('PostSidecarNode', ['is_video', 'display_url', 'video_url'])
|
|
PostSidecarNode.__doc__ = "Item of a Sidecar Post."
|
|
PostSidecarNode.is_video.__doc__ = "Whether this node is a video."
|
|
PostSidecarNode.display_url.__doc__ = "URL of image or video thumbnail."
|
|
PostSidecarNode.video_url.__doc__ = "URL of video or None."
|
|
|
|
PostCommentAnswer = namedtuple('PostCommentAnswer', ['id', 'created_at_utc', 'text', 'owner', 'likes_count'])
|
|
PostCommentAnswer.id.__doc__ = "ID number of comment."
|
|
PostCommentAnswer.created_at_utc.__doc__ = ":class:`~datetime.datetime` when comment was created (UTC)."
|
|
PostCommentAnswer.text.__doc__ = "Comment text."
|
|
PostCommentAnswer.owner.__doc__ = "Owner :class:`Profile` of the comment."
|
|
PostCommentAnswer.likes_count.__doc__ = "Number of likes on comment."
|
|
|
|
PostComment = namedtuple('PostComment', (*PostCommentAnswer._fields, 'answers')) # type: ignore
|
|
for field in PostCommentAnswer._fields:
|
|
getattr(PostComment, field).__doc__ = getattr(PostCommentAnswer, field).__doc__ # pylint: disable=no-member
|
|
PostComment.answers.__doc__ = r"Iterator which yields all :class:`PostCommentAnswer`\ s for the comment." # type: ignore
|
|
|
|
PostLocation = namedtuple('PostLocation', ['id', 'name', 'slug', 'has_public_page', 'lat', 'lng'])
|
|
PostLocation.id.__doc__ = "ID number of location."
|
|
PostLocation.name.__doc__ = "Location name."
|
|
PostLocation.slug.__doc__ = "URL friendly variant of location name."
|
|
PostLocation.has_public_page.__doc__ = "Whether location has a public page."
|
|
PostLocation.lat.__doc__ = "Latitude (:class:`float`)."
|
|
PostLocation.lng.__doc__ = "Longitude (:class:`float`)."
|
|
|
|
|
|
class Post:
|
|
"""
|
|
Structure containing information about an Instagram post.
|
|
|
|
Created by methods :meth:`Profile.get_posts`, :meth:`Instaloader.get_hashtag_posts`,
|
|
:meth:`Instaloader.get_feed_posts` and :meth:`Profile.get_saved_posts`, which return iterators of Posts::
|
|
|
|
L = Instaloader()
|
|
for post in L.get_hashtag_posts(HASHTAG):
|
|
L.download_post(post, target='#'+HASHTAG)
|
|
|
|
Might also be created with::
|
|
|
|
post = Post.from_shortcode(L.context, SHORTCODE)
|
|
|
|
This class unifies access to the properties associated with a post. It implements == and is
|
|
hashable.
|
|
|
|
:param context: :attr:`Instaloader.context` used for additional queries if neccessary..
|
|
:param node: Node structure, as returned by Instagram.
|
|
:param owner_profile: The Profile of the owner, if already known at creation.
|
|
"""
|
|
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any],
|
|
owner_profile: Optional['Profile'] = None):
|
|
assert 'shortcode' in node or 'code' in node
|
|
|
|
self._context = context
|
|
self._node = node
|
|
self._owner_profile = owner_profile
|
|
self._full_metadata_dict = None # type: Optional[Dict[str, Any]]
|
|
self._location = None # type: Optional[PostLocation]
|
|
self._iphone_struct_ = None
|
|
if 'iphone_struct' in node:
|
|
# if loaded from JSON with load_structure_from_file()
|
|
self._iphone_struct_ = node['iphone_struct']
|
|
|
|
@classmethod
|
|
def from_shortcode(cls, context: InstaloaderContext, shortcode: str):
|
|
"""Create a post object from a given shortcode"""
|
|
# pylint:disable=protected-access
|
|
post = cls(context, {'shortcode': shortcode})
|
|
post._node = post._full_metadata
|
|
return post
|
|
|
|
@classmethod
|
|
def from_mediaid(cls, context: InstaloaderContext, mediaid: int):
|
|
"""Create a post object from a given mediaid"""
|
|
return cls.from_shortcode(context, Post.mediaid_to_shortcode(mediaid))
|
|
|
|
@staticmethod
|
|
def shortcode_to_mediaid(code: str) -> int:
|
|
if len(code) > 11:
|
|
raise InvalidArgumentException("Wrong shortcode \"{0}\", unable to convert to mediaid.".format(code))
|
|
code = 'A' * (12 - len(code)) + code
|
|
return int.from_bytes(b64decode(code.encode(), b'-_'), 'big')
|
|
|
|
@staticmethod
|
|
def mediaid_to_shortcode(mediaid: int) -> str:
|
|
if mediaid.bit_length() > 64:
|
|
raise InvalidArgumentException("Wrong mediaid {0}, unable to convert to shortcode".format(str(mediaid)))
|
|
return b64encode(mediaid.to_bytes(9, 'big'), b'-_').decode().replace('A', ' ').lstrip().replace(' ', 'A')
|
|
|
|
@staticmethod
|
|
def supported_graphql_types() -> List[str]:
|
|
"""The values of __typename fields that the :class:`Post` class can handle."""
|
|
return ["GraphImage", "GraphVideo", "GraphSidecar"]
|
|
|
|
def _asdict(self):
|
|
node = self._node
|
|
if self._full_metadata_dict:
|
|
node.update(self._full_metadata_dict)
|
|
if self._owner_profile:
|
|
node['owner'] = self.owner_profile._asdict()
|
|
if self._location:
|
|
node['location'] = self._location._asdict()
|
|
if self._iphone_struct_:
|
|
node['iphone_struct'] = self._iphone_struct_
|
|
return node
|
|
|
|
@property
|
|
def shortcode(self) -> str:
|
|
"""Media shortcode. URL of the post is instagram.com/p/<shortcode>/."""
|
|
return self._node['shortcode'] if 'shortcode' in self._node else self._node['code']
|
|
|
|
@property
|
|
def mediaid(self) -> int:
|
|
"""The mediaid is a decimal representation of the media shortcode."""
|
|
return int(self._node['id'])
|
|
|
|
def __repr__(self):
|
|
return '<Post {}>'.format(self.shortcode)
|
|
|
|
def __eq__(self, o: object) -> bool:
|
|
if isinstance(o, Post):
|
|
return self.shortcode == o.shortcode
|
|
return NotImplemented
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.shortcode)
|
|
|
|
def _obtain_metadata(self):
|
|
if not self._full_metadata_dict:
|
|
pic_json = self._context.graphql_query(
|
|
'2b0673e0dc4580674a88d426fe00ea90',
|
|
{'shortcode': self.shortcode}
|
|
)
|
|
self._full_metadata_dict = pic_json['data']['shortcode_media']
|
|
if self._full_metadata_dict is None:
|
|
raise BadResponseException("Fetching Post metadata failed.")
|
|
if self.shortcode != self._full_metadata_dict['shortcode']:
|
|
self._node.update(self._full_metadata_dict)
|
|
raise PostChangedException
|
|
|
|
@property
|
|
def _full_metadata(self) -> Dict[str, Any]:
|
|
self._obtain_metadata()
|
|
assert self._full_metadata_dict is not None
|
|
return self._full_metadata_dict
|
|
|
|
@property
|
|
def _iphone_struct(self) -> Dict[str, Any]:
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to access iPhone media info endpoint.")
|
|
if not self._iphone_struct_:
|
|
data = self._context.get_iphone_json(path='api/v1/media/{}/info/'.format(self.mediaid), params={})
|
|
self._iphone_struct_ = data['items'][0]
|
|
return self._iphone_struct_
|
|
|
|
def _field(self, *keys) -> Any:
|
|
"""Lookups given fields in _node, and if not found in _full_metadata. Raises KeyError if not found anywhere."""
|
|
try:
|
|
d = self._node
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
except KeyError:
|
|
d = self._full_metadata
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
|
|
@property
|
|
def owner_profile(self) -> 'Profile':
|
|
""":class:`Profile` instance of the Post's owner."""
|
|
if not self._owner_profile:
|
|
if 'username' in self._node['owner']:
|
|
owner_struct = self._node['owner']
|
|
else:
|
|
# Sometimes, the 'owner' structure does not contain the username, only the user's ID. In that case,
|
|
# this call triggers downloading of the complete Post metadata struct, where the owner username
|
|
# is contained.
|
|
# Note that we cannot use Profile.from_id() here since that would lead us into a recursion.
|
|
owner_struct = self._full_metadata['owner']
|
|
self._owner_profile = Profile(self._context, owner_struct)
|
|
return self._owner_profile
|
|
|
|
@property
|
|
def owner_username(self) -> str:
|
|
"""The Post's lowercase owner name."""
|
|
return self.owner_profile.username
|
|
|
|
@property
|
|
def owner_id(self) -> int:
|
|
"""The ID of the Post's owner."""
|
|
# The ID may already be available, e.g. if the post instance was created
|
|
# from an `hashtag.get_posts()` iterator, so no need to make another
|
|
# http request.
|
|
if 'owner' in self._node and 'id' in self._node['owner']:
|
|
return self._node['owner']['id']
|
|
else:
|
|
return self.owner_profile.userid
|
|
|
|
@property
|
|
def date_local(self) -> datetime:
|
|
"""Timestamp when the post was created (local time zone)."""
|
|
return datetime.fromtimestamp(self._node["date"]
|
|
if "date" in self._node
|
|
else self._node["taken_at_timestamp"])
|
|
|
|
@property
|
|
def date_utc(self) -> datetime:
|
|
"""Timestamp when the post was created (UTC)."""
|
|
return datetime.utcfromtimestamp(self._node["date"]
|
|
if "date" in self._node
|
|
else self._node["taken_at_timestamp"])
|
|
|
|
@property
|
|
def date(self) -> datetime:
|
|
"""Synonym to :attr:`~Post.date_utc`"""
|
|
return self.date_utc
|
|
|
|
@property
|
|
def profile(self) -> str:
|
|
"""Synonym to :attr:`~Post.owner_username`"""
|
|
return self.owner_username
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
"""URL of the picture / video thumbnail of the post"""
|
|
if self.typename == "GraphImage" and self._context.is_logged_in:
|
|
try:
|
|
orig_url = self._iphone_struct['image_versions2']['candidates'][0]['url']
|
|
url = re.sub(r'&se=\d+(&?)', r'\1', orig_url)
|
|
return url
|
|
except (InstaloaderException, KeyError, IndexError) as err:
|
|
self._context.error('{} Unable to fetch high quality image version of {}.'.format(err, self))
|
|
return self._node["display_url"] if "display_url" in self._node else self._node["display_src"]
|
|
|
|
@property
|
|
def typename(self) -> str:
|
|
"""Type of post, GraphImage, GraphVideo or GraphSidecar"""
|
|
return self._field('__typename')
|
|
|
|
@property
|
|
def mediacount(self) -> int:
|
|
"""
|
|
The number of media in a sidecar Post, or 1 if the Post it not a sidecar.
|
|
|
|
.. versionadded:: 4.6
|
|
"""
|
|
if self.typename == 'GraphSidecar':
|
|
edges = self._field('edge_sidecar_to_children', 'edges')
|
|
return len(edges)
|
|
return 1
|
|
|
|
def get_sidecar_nodes(self, start=0, end=-1) -> Iterator[PostSidecarNode]:
|
|
"""
|
|
Sidecar nodes of a Post with typename==GraphSidecar.
|
|
|
|
.. versionchanged:: 4.6
|
|
Added parameters *start* and *end* to specify a slice of sidecar media.
|
|
"""
|
|
if self.typename == 'GraphSidecar':
|
|
edges = self._field('edge_sidecar_to_children', 'edges')
|
|
if any(edge['node']['is_video'] and 'video_url' not in edge['node'] for edge in edges):
|
|
# video_url is only present in full metadata, issue #558.
|
|
edges = self._full_metadata['edge_sidecar_to_children']['edges']
|
|
if end < 0:
|
|
end = len(edges)-1
|
|
if start < 0:
|
|
start = len(edges)-1
|
|
for idx, edge in enumerate(edges):
|
|
if start <= idx <= end:
|
|
node = edge['node']
|
|
is_video = node['is_video']
|
|
display_url = node['display_url']
|
|
if not is_video and self._context.is_logged_in:
|
|
try:
|
|
carousel_media = self._iphone_struct['carousel_media']
|
|
orig_url = carousel_media[idx]['image_versions2']['candidates'][0]['url']
|
|
display_url = re.sub(r'&se=\d+(&?)', r'\1', orig_url)
|
|
except (InstaloaderException, KeyError, IndexError) as err:
|
|
self._context.error('{} Unable to fetch high quality image version of {}.'.format(
|
|
err, self))
|
|
yield PostSidecarNode(is_video=is_video, display_url=display_url,
|
|
video_url=node['video_url'] if is_video else None)
|
|
|
|
@property
|
|
def caption(self) -> Optional[str]:
|
|
"""Caption."""
|
|
if "edge_media_to_caption" in self._node and self._node["edge_media_to_caption"]["edges"]:
|
|
return self._node["edge_media_to_caption"]["edges"][0]["node"]["text"]
|
|
elif "caption" in self._node:
|
|
return self._node["caption"]
|
|
return None
|
|
|
|
@property
|
|
def caption_hashtags(self) -> List[str]:
|
|
"""List of all lowercased hashtags (without preceeding #) that occur in the Post's caption."""
|
|
if not self.caption:
|
|
return []
|
|
# This regular expression is from jStassen, adjusted to use Python's \w to support Unicode
|
|
# http://blog.jstassen.com/2016/03/code-regex-for-instagram-username-and-hashtags/
|
|
hashtag_regex = re.compile(r"(?:#)(\w(?:(?:\w|(?:\.(?!\.))){0,28}(?:\w))?)")
|
|
return re.findall(hashtag_regex, self.caption.lower())
|
|
|
|
@property
|
|
def caption_mentions(self) -> List[str]:
|
|
"""List of all lowercased profiles that are mentioned in the Post's caption, without preceeding @."""
|
|
if not self.caption:
|
|
return []
|
|
# This regular expression is from jStassen, adjusted to use Python's \w to support Unicode
|
|
# http://blog.jstassen.com/2016/03/code-regex-for-instagram-username-and-hashtags/
|
|
mention_regex = re.compile(r"(?:@)(\w(?:(?:\w|(?:\.(?!\.))){0,28}(?:\w))?)")
|
|
return re.findall(mention_regex, self.caption.lower())
|
|
|
|
@property
|
|
def pcaption(self) -> str:
|
|
"""Printable caption, useful as a format specifier for --filename-pattern.
|
|
|
|
.. versionadded:: 4.2.6"""
|
|
def _elliptify(caption):
|
|
pcaption = ' '.join([s.replace('/', '\u2215') for s in caption.splitlines() if s]).strip()
|
|
return (pcaption[:30] + u"\u2026") if len(pcaption) > 31 else pcaption
|
|
return _elliptify(self.caption) if self.caption else ''
|
|
|
|
@property
|
|
def tagged_users(self) -> List[str]:
|
|
"""List of all lowercased users that are tagged in the Post."""
|
|
try:
|
|
return [edge['node']['user']['username'].lower() for edge in self._field('edge_media_to_tagged_user',
|
|
'edges')]
|
|
except KeyError:
|
|
return []
|
|
|
|
@property
|
|
def is_video(self) -> bool:
|
|
"""True if the Post is a video."""
|
|
return self._node['is_video']
|
|
|
|
@property
|
|
def video_url(self) -> Optional[str]:
|
|
"""URL of the video, or None."""
|
|
if self.is_video:
|
|
if self._context.is_logged_in:
|
|
try:
|
|
url = self._iphone_struct['video_versions'][0]['url']
|
|
return url
|
|
except (InstaloaderException, KeyError, IndexError) as err:
|
|
self._context.error('{} Unable to fetch high quality video version of {}.'.format(err, self))
|
|
return self._field('video_url')
|
|
return None
|
|
|
|
@property
|
|
def video_view_count(self) -> Optional[int]:
|
|
"""View count of the video, or None.
|
|
|
|
.. versionadded:: 4.2.6"""
|
|
if self.is_video:
|
|
return self._field('video_view_count')
|
|
return None
|
|
|
|
@property
|
|
def video_duration(self) -> Optional[float]:
|
|
"""Duration of the video in seconds, or None.
|
|
|
|
.. versionadded:: 4.2.6"""
|
|
if self.is_video:
|
|
return self._field('video_duration')
|
|
return None
|
|
|
|
@property
|
|
def viewer_has_liked(self) -> Optional[bool]:
|
|
"""Whether the viewer has liked the post, or None if not logged in."""
|
|
if not self._context.is_logged_in:
|
|
return None
|
|
if 'likes' in self._node and 'viewer_has_liked' in self._node['likes']:
|
|
return self._node['likes']['viewer_has_liked']
|
|
return self._field('viewer_has_liked')
|
|
|
|
@property
|
|
def likes(self) -> int:
|
|
"""Likes count"""
|
|
return self._field('edge_media_preview_like', 'count')
|
|
|
|
@property
|
|
def comments(self) -> int:
|
|
"""Comment count including answers"""
|
|
# If the count is already present in `self._node`, do not use `self._field` which could trigger fetching the
|
|
# full metadata dict.
|
|
comments = self._node.get('edge_media_to_comment')
|
|
if comments and 'count' in comments:
|
|
return comments['count']
|
|
try:
|
|
return self._field('edge_media_to_parent_comment', 'count')
|
|
except KeyError:
|
|
return self._field('edge_media_to_comment', 'count')
|
|
|
|
def get_comments(self) -> Iterator[PostComment]:
|
|
r"""Iterate over all comments of the post.
|
|
|
|
Each comment is represented by a PostComment namedtuple with fields text (string), created_at (datetime),
|
|
id (int), owner (:class:`Profile`) and answers (:class:`~typing.Iterator`\ [:class:`PostCommentAnswer`])
|
|
if available.
|
|
"""
|
|
def _postcommentanswer(node):
|
|
return PostCommentAnswer(id=int(node['id']),
|
|
created_at_utc=datetime.utcfromtimestamp(node['created_at']),
|
|
text=node['text'],
|
|
owner=Profile(self._context, node['owner']),
|
|
likes_count=node.get('edge_liked_by', {}).get('count', 0))
|
|
|
|
def _postcommentanswers(node):
|
|
if 'edge_threaded_comments' not in node:
|
|
return
|
|
answer_count = node['edge_threaded_comments']['count']
|
|
if answer_count == 0:
|
|
# Avoid doing additional requests if there are no comment answers
|
|
return
|
|
answer_edges = node['edge_threaded_comments']['edges']
|
|
if answer_count == len(answer_edges):
|
|
# If the answer's metadata already contains all comments, don't do GraphQL requests to obtain them
|
|
yield from (_postcommentanswer(comment['node']) for comment in answer_edges)
|
|
return
|
|
yield from NodeIterator(
|
|
self._context,
|
|
'51fdd02b67508306ad4484ff574a0b62',
|
|
lambda d: d['data']['comment']['edge_threaded_comments'],
|
|
_postcommentanswer,
|
|
{'comment_id': node['id']},
|
|
'https://www.instagram.com/p/{0}/'.format(self.shortcode),
|
|
)
|
|
|
|
def _postcomment(node):
|
|
return PostComment(*_postcommentanswer(node),
|
|
answers=_postcommentanswers(node))
|
|
if self.comments == 0:
|
|
# Avoid doing additional requests if there are no comments
|
|
return
|
|
|
|
comment_edges = self._field('edge_media_to_comment', 'edges')
|
|
answers_count = sum([edge['node'].get('edge_threaded_comments', {}).get('count', 0) for edge in comment_edges])
|
|
|
|
if self.comments == len(comment_edges) + answers_count:
|
|
# If the Post's metadata already contains all parent comments, don't do GraphQL requests to obtain them
|
|
yield from (_postcomment(comment['node']) for comment in comment_edges)
|
|
return
|
|
yield from NodeIterator(
|
|
self._context,
|
|
'97b41c52301f77ce508f55e66d17620e',
|
|
lambda d: d['data']['shortcode_media']['edge_media_to_parent_comment'],
|
|
_postcomment,
|
|
{'shortcode': self.shortcode},
|
|
'https://www.instagram.com/p/{0}/'.format(self.shortcode),
|
|
)
|
|
|
|
def get_likes(self) -> Iterator['Profile']:
|
|
"""
|
|
Iterate over all likes of the post. A :class:`Profile` instance of each likee is yielded.
|
|
|
|
.. versionchanged:: 4.5.4
|
|
Require being logged in (as required by Instagram).
|
|
"""
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to access likes of a post.")
|
|
if self.likes == 0:
|
|
# Avoid doing additional requests if there are no comments
|
|
return
|
|
likes_edges = self._field('edge_media_preview_like', 'edges')
|
|
if self.likes == len(likes_edges):
|
|
# If the Post's metadata already contains all likes, don't do GraphQL requests to obtain them
|
|
yield from (Profile(self._context, like['node']) for like in likes_edges)
|
|
return
|
|
yield from NodeIterator(
|
|
self._context,
|
|
'1cb6ec562846122743b61e492c85999f',
|
|
lambda d: d['data']['shortcode_media']['edge_liked_by'],
|
|
lambda n: Profile(self._context, n),
|
|
{'shortcode': self.shortcode},
|
|
'https://www.instagram.com/p/{0}/'.format(self.shortcode),
|
|
)
|
|
|
|
@property
|
|
def is_sponsored(self) -> bool:
|
|
"""
|
|
Whether Post is a sponsored post, equivalent to non-empty :meth:`Post.sponsor_users`.
|
|
|
|
.. versionadded:: 4.4
|
|
"""
|
|
try:
|
|
sponsor_edges = self._field('edge_media_to_sponsor_user', 'edges')
|
|
except KeyError:
|
|
return False
|
|
return bool(sponsor_edges)
|
|
|
|
@property
|
|
def sponsor_users(self) -> List['Profile']:
|
|
"""
|
|
The Post's sponsors.
|
|
|
|
.. versionadded:: 4.4
|
|
"""
|
|
return ([] if not self.is_sponsored else
|
|
[Profile(self._context, edge['node']['sponsor']) for edge in
|
|
self._field('edge_media_to_sponsor_user', 'edges')])
|
|
|
|
@property
|
|
def location(self) -> Optional[PostLocation]:
|
|
"""
|
|
If the Post has a location, returns PostLocation namedtuple with fields 'id', 'lat' and 'lng' and 'name'.
|
|
|
|
.. versionchanged:: 4.2.9
|
|
Require being logged in (as required by Instagram), return None if not logged-in.
|
|
"""
|
|
loc = self._field("location")
|
|
if self._location or not loc:
|
|
return self._location
|
|
if not self._context.is_logged_in:
|
|
return None
|
|
location_id = int(loc['id'])
|
|
if any(k not in loc for k in ('name', 'slug', 'has_public_page', 'lat', 'lng')):
|
|
loc = self._context.get_json("explore/locations/{0}/".format(location_id),
|
|
params={'__a': 1})['graphql']['location']
|
|
self._location = PostLocation(location_id, loc['name'], loc['slug'], loc['has_public_page'],
|
|
loc['lat'], loc['lng'])
|
|
return self._location
|
|
|
|
|
|
class Profile:
|
|
"""
|
|
An Instagram Profile.
|
|
|
|
Provides methods for accessing profile properties, as well as :meth:`Profile.get_posts` and for own profile
|
|
:meth:`Profile.get_saved_posts`.
|
|
|
|
Get instances with :meth:`Post.owner_profile`, :meth:`StoryItem.owner_profile`, :meth:`Profile.get_followees`,
|
|
:meth:`Profile.get_followers` or::
|
|
|
|
L = Instaloader()
|
|
profile = Profile.from_username(L.context, USERNAME)
|
|
|
|
Provides :meth:`Profile.get_posts` and for own profile :meth:`Profile.get_saved_posts` to iterate over associated
|
|
:class:`Post` objects::
|
|
|
|
for post in profile.get_posts():
|
|
L.download_post(post, target=profile.username)
|
|
|
|
:meth:`Profile.get_followees` and :meth:`Profile.get_followers`::
|
|
|
|
print("{} follows these profiles:".format(profile.username))
|
|
for followee in profile.get_followees():
|
|
print(followee.username)
|
|
|
|
Also, this class implements == and is hashable.
|
|
"""
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
|
|
assert 'username' in node
|
|
self._context = context
|
|
self._has_public_story = None # type: Optional[bool]
|
|
self._node = node
|
|
self._has_full_metadata = False
|
|
self._iphone_struct_ = None
|
|
if 'iphone_struct' in node:
|
|
# if loaded from JSON with load_structure_from_file()
|
|
self._iphone_struct_ = node['iphone_struct']
|
|
|
|
@classmethod
|
|
def from_username(cls, context: InstaloaderContext, username: str):
|
|
"""Create a Profile instance from a given username, raise exception if it does not exist.
|
|
|
|
See also :meth:`Instaloader.check_profile_id`.
|
|
|
|
:param context: :attr:`Instaloader.context`
|
|
:param username: Username
|
|
:raises: :class:`ProfileNotExistsException`
|
|
"""
|
|
# pylint:disable=protected-access
|
|
profile = cls(context, {'username': username.lower()})
|
|
profile._obtain_metadata() # to raise ProfileNotExistsException now in case username is invalid
|
|
return profile
|
|
|
|
@classmethod
|
|
def from_id(cls, context: InstaloaderContext, profile_id: int):
|
|
"""Create a Profile instance from a given userid. If possible, use :meth:`Profile.from_username`
|
|
or constructor directly rather than this method, since it requires more requests.
|
|
|
|
:param context: :attr:`Instaloader.context`
|
|
:param profile_id: userid
|
|
:raises: :class:`ProfileNotExistsException`
|
|
"""
|
|
if profile_id in context.profile_id_cache:
|
|
return context.profile_id_cache[profile_id]
|
|
data = context.graphql_query('7c16654f22c819fb63d1183034a5162f',
|
|
{'user_id': str(profile_id),
|
|
'include_chaining': False,
|
|
'include_reel': True,
|
|
'include_suggested_users': False,
|
|
'include_logged_out_extras': False,
|
|
'include_highlight_reels': False},
|
|
rhx_gis=context.root_rhx_gis)['data']['user']
|
|
if data:
|
|
profile = cls(context, data['reel']['owner'])
|
|
else:
|
|
raise ProfileNotExistsException("No profile found, the user may have blocked you (ID: " +
|
|
str(profile_id) + ").")
|
|
context.profile_id_cache[profile_id] = profile
|
|
return profile
|
|
|
|
@classmethod
|
|
def own_profile(cls, context: InstaloaderContext):
|
|
"""Return own profile if logged-in.
|
|
|
|
:param context: :attr:`Instaloader.context`
|
|
|
|
.. versionadded:: 4.5.2"""
|
|
if not context.is_logged_in:
|
|
raise LoginRequiredException("--login required to access own profile.")
|
|
return cls(context, context.graphql_query("d6f4427fbe92d846298cf93df0b937d3", {})["data"]["user"])
|
|
|
|
def _asdict(self):
|
|
json_node = self._node.copy()
|
|
# remove posts to avoid "Circular reference detected" exception
|
|
json_node.pop('edge_media_collections', None)
|
|
json_node.pop('edge_owner_to_timeline_media', None)
|
|
json_node.pop('edge_saved_media', None)
|
|
json_node.pop('edge_felix_video_timeline', None)
|
|
if self._iphone_struct_:
|
|
json_node['iphone_struct'] = self._iphone_struct_
|
|
return json_node
|
|
|
|
def _obtain_metadata(self):
|
|
try:
|
|
if not self._has_full_metadata:
|
|
metadata = self._context.get_json('{}/feed/'.format(self.username), params={})
|
|
self._node = metadata['entry_data']['ProfilePage'][0]['graphql']['user']
|
|
self._has_full_metadata = True
|
|
except (QueryReturnedNotFoundException, KeyError) as err:
|
|
top_search_results = TopSearchResults(self._context, self.username)
|
|
similar_profiles = [profile.username for profile in top_search_results.get_profiles()]
|
|
if similar_profiles:
|
|
raise ProfileNotExistsException('Profile {} does not exist.\nThe most similar profile{}: {}.'
|
|
.format(self.username,
|
|
's are' if len(similar_profiles) > 1 else ' is',
|
|
', '.join(similar_profiles[0:5]))) from err
|
|
raise ProfileNotExistsException('Profile {} does not exist.'.format(self.username)) from err
|
|
|
|
def _metadata(self, *keys) -> Any:
|
|
try:
|
|
d = self._node
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
except KeyError:
|
|
self._obtain_metadata()
|
|
d = self._node
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
|
|
@property
|
|
def _iphone_struct(self) -> Dict[str, Any]:
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to access iPhone profile info endpoint.")
|
|
if not self._iphone_struct_:
|
|
data = self._context.get_iphone_json(path='api/v1/users/{}/info/'.format(self.userid), params={})
|
|
self._iphone_struct_ = data['user']
|
|
return self._iphone_struct_
|
|
|
|
@property
|
|
def userid(self) -> int:
|
|
"""User ID"""
|
|
return int(self._metadata('id'))
|
|
|
|
@property
|
|
def username(self) -> str:
|
|
"""Profile Name"""
|
|
return self._metadata('username').lower()
|
|
|
|
def __repr__(self):
|
|
return '<Profile {} ({})>'.format(self.username, self.userid)
|
|
|
|
def __eq__(self, o: object) -> bool:
|
|
if isinstance(o, Profile):
|
|
return self.userid == o.userid
|
|
return NotImplemented
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.userid)
|
|
|
|
@property
|
|
def is_private(self) -> bool:
|
|
return self._metadata('is_private')
|
|
|
|
@property
|
|
def followed_by_viewer(self) -> bool:
|
|
return self._metadata('followed_by_viewer')
|
|
|
|
@property
|
|
def mediacount(self) -> int:
|
|
return self._metadata('edge_owner_to_timeline_media', 'count')
|
|
|
|
@property
|
|
def igtvcount(self) -> int:
|
|
return self._metadata('edge_felix_video_timeline', 'count')
|
|
|
|
@property
|
|
def followers(self) -> int:
|
|
return self._metadata('edge_followed_by', 'count')
|
|
|
|
@property
|
|
def followees(self) -> int:
|
|
return self._metadata('edge_follow', 'count')
|
|
|
|
@property
|
|
def external_url(self) -> Optional[str]:
|
|
return self._metadata('external_url')
|
|
|
|
@property
|
|
def is_business_account(self) -> bool:
|
|
""".. versionadded:: 4.4"""
|
|
return self._metadata('is_business_account')
|
|
|
|
@property
|
|
def business_category_name(self) -> str:
|
|
""".. versionadded:: 4.4"""
|
|
return self._metadata('business_category_name')
|
|
|
|
@property
|
|
def biography(self) -> str:
|
|
return self._metadata('biography')
|
|
|
|
@property
|
|
def blocked_by_viewer(self) -> bool:
|
|
return self._metadata('blocked_by_viewer')
|
|
|
|
@property
|
|
def follows_viewer(self) -> bool:
|
|
return self._metadata('follows_viewer')
|
|
|
|
@property
|
|
def full_name(self) -> str:
|
|
return self._metadata('full_name')
|
|
|
|
@property
|
|
def has_blocked_viewer(self) -> bool:
|
|
return self._metadata('has_blocked_viewer')
|
|
|
|
@property
|
|
def has_highlight_reels(self) -> bool:
|
|
"""
|
|
.. deprecated:: 4.0.6
|
|
Always returns `True` since :issue:`153`.
|
|
|
|
Before broken, this indicated whether the :class:`Profile` had available stories.
|
|
"""
|
|
return True
|
|
|
|
@property
|
|
def has_public_story(self) -> bool:
|
|
if not self._has_public_story:
|
|
self._obtain_metadata()
|
|
# query not rate limited if invoked anonymously:
|
|
with self._context.anonymous_copy() as anonymous_context:
|
|
data = anonymous_context.graphql_query('9ca88e465c3f866a76f7adee3871bdd8',
|
|
{'user_id': self.userid, 'include_chaining': False,
|
|
'include_reel': False, 'include_suggested_users': False,
|
|
'include_logged_out_extras': True,
|
|
'include_highlight_reels': False},
|
|
'https://www.instagram.com/{}/'.format(self.username))
|
|
self._has_public_story = data['data']['user']['has_public_story']
|
|
assert self._has_public_story is not None
|
|
return self._has_public_story
|
|
|
|
@property
|
|
def has_viewable_story(self) -> bool:
|
|
"""
|
|
.. deprecated:: 4.0.6
|
|
|
|
Some stories are private. This property determines if the :class:`Profile`
|
|
has at least one story which can be viewed using the associated :class:`InstaloaderContext`,
|
|
i.e. the viewer has privileges to view it.
|
|
"""
|
|
return self.has_public_story or self.followed_by_viewer and self.has_highlight_reels
|
|
|
|
@property
|
|
def has_requested_viewer(self) -> bool:
|
|
return self._metadata('has_requested_viewer')
|
|
|
|
@property
|
|
def is_verified(self) -> bool:
|
|
return self._metadata('is_verified')
|
|
|
|
@property
|
|
def requested_by_viewer(self) -> bool:
|
|
return self._metadata('requested_by_viewer')
|
|
|
|
@property
|
|
def profile_pic_url(self) -> str:
|
|
"""Return URL of profile picture. If logged in, the HD version is returned, otherwise a lower-quality version.
|
|
|
|
.. versionadded:: 4.0.3
|
|
|
|
.. versionchanged:: 4.2.1
|
|
Require being logged in for HD version (as required by Instagram)."""
|
|
if self._context.is_logged_in:
|
|
try:
|
|
return self._iphone_struct['hd_profile_pic_url_info']['url']
|
|
except (InstaloaderException, KeyError) as err:
|
|
self._context.error('{} Unable to fetch high quality profile pic.'.format(err))
|
|
return self._metadata("profile_pic_url_hd")
|
|
else:
|
|
return self._metadata("profile_pic_url_hd")
|
|
|
|
def get_profile_pic_url(self) -> str:
|
|
""".. deprecated:: 4.0.3
|
|
|
|
Use :attr:`profile_pic_url`."""
|
|
return self.profile_pic_url
|
|
|
|
def get_posts(self) -> NodeIterator[Post]:
|
|
"""Retrieve all posts from a profile.
|
|
|
|
:rtype: NodeIterator[Post]"""
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'003056d32c2554def87228bc3fd9668a',
|
|
lambda d: d['data']['user']['edge_owner_to_timeline_media'],
|
|
lambda n: Post(self._context, n, self),
|
|
{'id': self.userid},
|
|
'https://www.instagram.com/{0}/'.format(self.username),
|
|
self._metadata('edge_owner_to_timeline_media'),
|
|
)
|
|
|
|
def get_saved_posts(self) -> NodeIterator[Post]:
|
|
"""Get Posts that are marked as saved by the user.
|
|
|
|
:rtype: NodeIterator[Post]"""
|
|
|
|
if self.username != self._context.username:
|
|
raise LoginRequiredException("--login={} required to get that profile's saved posts.".format(self.username))
|
|
|
|
return NodeIterator(
|
|
self._context,
|
|
'f883d95537fbcd400f466f63d42bd8a1',
|
|
lambda d: d['data']['user']['edge_saved_media'],
|
|
lambda n: Post(self._context, n),
|
|
{'id': self.userid},
|
|
'https://www.instagram.com/{0}/'.format(self.username),
|
|
)
|
|
|
|
def get_tagged_posts(self) -> NodeIterator[Post]:
|
|
"""Retrieve all posts where a profile is tagged.
|
|
|
|
:rtype: NodeIterator[Post]
|
|
|
|
.. versionadded:: 4.0.7"""
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'e31a871f7301132ceaab56507a66bbb7',
|
|
lambda d: d['data']['user']['edge_user_to_photos_of_you'],
|
|
lambda n: Post(self._context, n, self if int(n['owner']['id']) == self.userid else None),
|
|
{'id': self.userid},
|
|
'https://www.instagram.com/{0}/'.format(self.username),
|
|
)
|
|
|
|
def get_igtv_posts(self) -> NodeIterator[Post]:
|
|
"""Retrieve all IGTV posts.
|
|
|
|
:rtype: NodeIterator[Post]
|
|
|
|
.. versionadded:: 4.3"""
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'bc78b344a68ed16dd5d7f264681c4c76',
|
|
lambda d: d['data']['user']['edge_felix_video_timeline'],
|
|
lambda n: Post(self._context, n, self),
|
|
{'id': self.userid},
|
|
'https://www.instagram.com/{0}/channel/'.format(self.username),
|
|
self._metadata('edge_felix_video_timeline'),
|
|
)
|
|
|
|
def get_followers(self) -> NodeIterator['Profile']:
|
|
"""
|
|
Retrieve list of followers of given profile.
|
|
To use this, one needs to be logged in and private profiles has to be followed.
|
|
|
|
:rtype: NodeIterator[Profile]
|
|
"""
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to get a profile's followers.")
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'37479f2b8209594dde7facb0d904896a',
|
|
lambda d: d['data']['user']['edge_followed_by'],
|
|
lambda n: Profile(self._context, n),
|
|
{'id': str(self.userid)},
|
|
'https://www.instagram.com/{0}/'.format(self.username),
|
|
)
|
|
|
|
def get_followees(self) -> NodeIterator['Profile']:
|
|
"""
|
|
Retrieve list of followees (followings) of given profile.
|
|
To use this, one needs to be logged in and private profiles has to be followed.
|
|
|
|
:rtype: NodeIterator[Profile]
|
|
"""
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to get a profile's followees.")
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'58712303d941c6855d4e888c5f0cd22f',
|
|
lambda d: d['data']['user']['edge_follow'],
|
|
lambda n: Profile(self._context, n),
|
|
{'id': str(self.userid)},
|
|
'https://www.instagram.com/{0}/'.format(self.username),
|
|
)
|
|
|
|
def get_similar_accounts(self) -> Iterator['Profile']:
|
|
"""
|
|
Retrieve list of suggested / similar accounts for this profile.
|
|
To use this, one needs to be logged in.
|
|
|
|
.. versionadded:: 4.4
|
|
"""
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to get a profile's similar accounts.")
|
|
self._obtain_metadata()
|
|
yield from (Profile(self._context, edge["node"]) for edge in
|
|
self._context.graphql_query("ad99dd9d3646cc3c0dda65debcd266a7",
|
|
{"user_id": str(self.userid), "include_chaining": True},
|
|
"https://www.instagram.com/{0}/"
|
|
.format(self.username))["data"]["user"]["edge_chaining"]["edges"])
|
|
|
|
|
|
class StoryItem:
|
|
"""
|
|
Structure containing information about a user story item i.e. image or video.
|
|
|
|
Created by method :meth:`Story.get_items`. This class implements == and is hashable.
|
|
|
|
:param context: :class:`InstaloaderContext` instance used for additional queries if necessary.
|
|
:param node: Dictionary containing the available information of the story item.
|
|
:param owner_profile: :class:`Profile` instance representing the story owner.
|
|
"""
|
|
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any], owner_profile: Optional[Profile] = None):
|
|
self._context = context
|
|
self._node = node
|
|
self._owner_profile = owner_profile
|
|
self._iphone_struct_ = None
|
|
if 'iphone_struct' in node:
|
|
# if loaded from JSON with load_structure_from_file()
|
|
self._iphone_struct_ = node['iphone_struct']
|
|
|
|
def _asdict(self):
|
|
node = self._node
|
|
if self._owner_profile:
|
|
node['owner'] = self._owner_profile._asdict()
|
|
if self._iphone_struct_:
|
|
node['iphone_struct'] = self._iphone_struct_
|
|
return node
|
|
|
|
@property
|
|
def mediaid(self) -> int:
|
|
"""The mediaid is a decimal representation of the media shortcode."""
|
|
return int(self._node['id'])
|
|
|
|
@property
|
|
def shortcode(self) -> str:
|
|
"""Convert :attr:`~StoryItem.mediaid` to a shortcode-like string, allowing ``{shortcode}`` to be used with
|
|
:option:`--filename-pattern`."""
|
|
return Post.mediaid_to_shortcode(self.mediaid)
|
|
|
|
def __repr__(self):
|
|
return '<StoryItem {}>'.format(self.mediaid)
|
|
|
|
def __eq__(self, o: object) -> bool:
|
|
if isinstance(o, StoryItem):
|
|
return self.mediaid == o.mediaid
|
|
return NotImplemented
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.mediaid)
|
|
|
|
@property
|
|
def _iphone_struct(self) -> Dict[str, Any]:
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to access iPhone media info endpoint.")
|
|
if not self._iphone_struct_:
|
|
data = self._context.get_iphone_json(path='api/v1/media/{}/info/'.format(self.mediaid), params={})
|
|
self._iphone_struct_ = data['items'][0]
|
|
return self._iphone_struct_
|
|
|
|
@property
|
|
def owner_profile(self) -> Profile:
|
|
""":class:`Profile` instance of the story item's owner."""
|
|
if not self._owner_profile:
|
|
self._owner_profile = Profile.from_id(self._context, self._node['owner']['id'])
|
|
assert self._owner_profile is not None
|
|
return self._owner_profile
|
|
|
|
@property
|
|
def owner_username(self) -> str:
|
|
"""The StoryItem owner's lowercase name."""
|
|
return self.owner_profile.username
|
|
|
|
@property
|
|
def owner_id(self) -> int:
|
|
"""The ID of the StoryItem owner."""
|
|
return self.owner_profile.userid
|
|
|
|
@property
|
|
def date_local(self) -> datetime:
|
|
"""Timestamp when the StoryItem was created (local time zone)."""
|
|
return datetime.fromtimestamp(self._node['taken_at_timestamp'])
|
|
|
|
@property
|
|
def date_utc(self) -> datetime:
|
|
"""Timestamp when the StoryItem was created (UTC)."""
|
|
return datetime.utcfromtimestamp(self._node['taken_at_timestamp'])
|
|
|
|
@property
|
|
def date(self) -> datetime:
|
|
"""Synonym to :attr:`~StoryItem.date_utc`"""
|
|
return self.date_utc
|
|
|
|
@property
|
|
def profile(self) -> str:
|
|
"""Synonym to :attr:`~StoryItem.owner_username`"""
|
|
return self.owner_username
|
|
|
|
@property
|
|
def expiring_local(self) -> datetime:
|
|
"""Timestamp when the StoryItem will get unavailable (local time zone)."""
|
|
return datetime.fromtimestamp(self._node['expiring_at_timestamp'])
|
|
|
|
@property
|
|
def expiring_utc(self) -> datetime:
|
|
"""Timestamp when the StoryItem will get unavailable (UTC)."""
|
|
return datetime.utcfromtimestamp(self._node['expiring_at_timestamp'])
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
"""URL of the picture / video thumbnail of the StoryItem"""
|
|
if self.typename == "GraphStoryImage" and self._context.is_logged_in:
|
|
try:
|
|
orig_url = self._iphone_struct['image_versions2']['candidates'][0]['url']
|
|
url = re.sub(r'&se=\d+(&?)', r'\1', orig_url)
|
|
return url
|
|
except (InstaloaderException, KeyError, IndexError) as err:
|
|
self._context.error('{} Unable to fetch high quality image version of {}.'.format(err, self))
|
|
return self._node['display_resources'][-1]['src']
|
|
|
|
@property
|
|
def typename(self) -> str:
|
|
"""Type of post, GraphStoryImage or GraphStoryVideo"""
|
|
return self._node['__typename']
|
|
|
|
@property
|
|
def is_video(self) -> bool:
|
|
"""True if the StoryItem is a video."""
|
|
return self._node['is_video']
|
|
|
|
@property
|
|
def video_url(self) -> Optional[str]:
|
|
"""URL of the video, or None."""
|
|
if self.is_video:
|
|
return self._node['video_resources'][-1]['src']
|
|
return None
|
|
|
|
|
|
class Story:
|
|
"""
|
|
Structure representing a user story with its associated items.
|
|
|
|
Provides methods for accessing story properties, as well as :meth:`Story.get_items` to request associated
|
|
:class:`StoryItem` nodes. Stories are returned by :meth:`Instaloader.get_stories`.
|
|
|
|
With a logged-in :class:`Instaloader` instance `L`, you may download all your visible user stories with::
|
|
|
|
for story in L.get_stories():
|
|
# story is a Story object
|
|
for item in story.get_items():
|
|
# item is a StoryItem object
|
|
L.download_storyitem(item, ':stories')
|
|
|
|
This class implements == and is hashable.
|
|
|
|
:param context: :class:`InstaloaderContext` instance used for additional queries if necessary.
|
|
:param node: Dictionary containing the available information of the story as returned by Instagram.
|
|
"""
|
|
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
|
|
self._context = context
|
|
self._node = node
|
|
self._unique_id = None # type: Optional[str]
|
|
self._owner_profile = None # type: Optional[Profile]
|
|
|
|
def __repr__(self):
|
|
return '<Story by {} changed {:%Y-%m-%d_%H-%M-%S_UTC}>'.format(self.owner_username, self.latest_media_utc)
|
|
|
|
def __eq__(self, o: object) -> bool:
|
|
if isinstance(o, Story):
|
|
return self.unique_id == o.unique_id
|
|
return NotImplemented
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.unique_id)
|
|
|
|
@property
|
|
def unique_id(self) -> Union[str, int]:
|
|
"""
|
|
This ID only equals amongst :class:`Story` instances which have the same owner and the same set of
|
|
:class:`StoryItem`. For all other :class:`Story` instances this ID is different.
|
|
"""
|
|
if not self._unique_id:
|
|
id_list = [item.mediaid for item in self.get_items()]
|
|
id_list.sort()
|
|
self._unique_id = str().join([str(self.owner_id)] + list(map(str, id_list)))
|
|
return self._unique_id
|
|
|
|
@property
|
|
def last_seen_local(self) -> Optional[datetime]:
|
|
"""Timestamp of the most recent StoryItem that has been watched or None (local time zone)."""
|
|
if self._node['seen']:
|
|
return datetime.fromtimestamp(self._node['seen'])
|
|
return None
|
|
|
|
@property
|
|
def last_seen_utc(self) -> Optional[datetime]:
|
|
"""Timestamp of the most recent StoryItem that has been watched or None (UTC)."""
|
|
if self._node['seen']:
|
|
return datetime.utcfromtimestamp(self._node['seen'])
|
|
return None
|
|
|
|
@property
|
|
def latest_media_local(self) -> datetime:
|
|
"""Timestamp when the last item of the story was created (local time zone)."""
|
|
return datetime.fromtimestamp(self._node['latest_reel_media'])
|
|
|
|
@property
|
|
def latest_media_utc(self) -> datetime:
|
|
"""Timestamp when the last item of the story was created (UTC)."""
|
|
return datetime.utcfromtimestamp(self._node['latest_reel_media'])
|
|
|
|
@property
|
|
def itemcount(self) -> int:
|
|
"""Count of items associated with the :class:`Story` instance."""
|
|
return len(self._node['items'])
|
|
|
|
@property
|
|
def owner_profile(self) -> Profile:
|
|
""":class:`Profile` instance of the story owner."""
|
|
if not self._owner_profile:
|
|
self._owner_profile = Profile(self._context, self._node['user'])
|
|
return self._owner_profile
|
|
|
|
@property
|
|
def owner_username(self) -> str:
|
|
"""The story owner's lowercase username."""
|
|
return self.owner_profile.username
|
|
|
|
@property
|
|
def owner_id(self) -> int:
|
|
"""The story owner's ID."""
|
|
return self.owner_profile.userid
|
|
|
|
def get_items(self) -> Iterator[StoryItem]:
|
|
"""Retrieve all items from a story."""
|
|
yield from (StoryItem(self._context, item, self.owner_profile) for item in reversed(self._node['items']))
|
|
|
|
|
|
class Highlight(Story):
|
|
"""
|
|
Structure representing a user's highlight with its associated story items.
|
|
|
|
Provides methods for accessing highlight properties, as well as :meth:`Highlight.get_items` to request associated
|
|
:class:`StoryItem` nodes. Highlights are returned by :meth:`Instaloader.get_highlights`.
|
|
|
|
With a logged-in :class:`Instaloader` instance `L`, you may download all highlights of a :class:`Profile` instance
|
|
USER with::
|
|
|
|
for highlight in L.get_highlights(USER):
|
|
# highlight is a Highlight object
|
|
for item in highlight.get_items():
|
|
# item is a StoryItem object
|
|
L.download_storyitem(item, '{}/{}'.format(highlight.owner_username, highlight.title))
|
|
|
|
This class implements == and is hashable.
|
|
|
|
:param context: :class:`InstaloaderContext` instance used for additional queries if necessary.
|
|
:param node: Dictionary containing the available information of the highlight as returned by Instagram.
|
|
:param owner: :class:`Profile` instance representing the owner profile of the highlight.
|
|
"""
|
|
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any], owner: Optional[Profile] = None):
|
|
super().__init__(context, node)
|
|
self._owner_profile = owner
|
|
self._items = None # type: Optional[List[Dict[str, Any]]]
|
|
|
|
def __repr__(self):
|
|
return '<Highlight by {}: {}>'.format(self.owner_username, self.title)
|
|
|
|
@property
|
|
def unique_id(self) -> int:
|
|
"""A unique ID identifying this set of highlights."""
|
|
return int(self._node['id'])
|
|
|
|
@property
|
|
def owner_profile(self) -> Profile:
|
|
""":class:`Profile` instance of the highlights' owner."""
|
|
if not self._owner_profile:
|
|
self._owner_profile = Profile(self._context, self._node['owner'])
|
|
return self._owner_profile
|
|
|
|
@property
|
|
def title(self) -> str:
|
|
"""The title of these highlights."""
|
|
return self._node['title']
|
|
|
|
@property
|
|
def cover_url(self) -> str:
|
|
"""URL of the highlights' cover."""
|
|
return self._node['cover_media']['thumbnail_src']
|
|
|
|
@property
|
|
def cover_cropped_url(self) -> str:
|
|
"""URL of the cropped version of the cover."""
|
|
return self._node['cover_media_cropped_thumbnail']['url']
|
|
|
|
def _fetch_items(self):
|
|
if not self._items:
|
|
self._items = self._context.graphql_query("45246d3fe16ccc6577e0bd297a5db1ab",
|
|
{"reel_ids": [], "tag_names": [], "location_ids": [],
|
|
"highlight_reel_ids": [str(self.unique_id)],
|
|
"precomposed_overlay": False})['data']['reels_media'][0]['items']
|
|
|
|
@property
|
|
def itemcount(self) -> int:
|
|
"""Count of items associated with the :class:`Highlight` instance."""
|
|
self._fetch_items()
|
|
assert self._items is not None
|
|
return len(self._items)
|
|
|
|
def get_items(self) -> Iterator[StoryItem]:
|
|
"""Retrieve all associated highlight items."""
|
|
self._fetch_items()
|
|
assert self._items is not None
|
|
yield from (StoryItem(self._context, item, self.owner_profile) for item in self._items)
|
|
|
|
|
|
class Hashtag:
|
|
"""
|
|
An Hashtag.
|
|
|
|
Analogous to :class:`Profile`, get an instance with::
|
|
|
|
L = Instaloader()
|
|
hashtag = Hashtag.from_name(L.context, HASHTAG)
|
|
|
|
To then download the Hashtag's Posts, do::
|
|
|
|
for post in hashtag.get_posts():
|
|
L.download_post(post, target="#"+hashtag.name)
|
|
|
|
Also, this class implements == and is hashable.
|
|
"""
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
|
|
assert "name" in node
|
|
self._context = context
|
|
self._node = node
|
|
self._has_full_metadata = False
|
|
|
|
@classmethod
|
|
def from_name(cls, context: InstaloaderContext, name: str):
|
|
"""
|
|
Create a Hashtag instance from a given hashtag name, without preceeding '#'. Raises an Exception if there is no
|
|
hashtag with the given name.
|
|
|
|
:param context: :attr:`Instaloader.context`
|
|
:param name: Hashtag, without preceeding '#'
|
|
:raises: :class:`QueryReturnedNotFoundException`
|
|
"""
|
|
# pylint:disable=protected-access
|
|
hashtag = cls(context, {'name': name.lower()})
|
|
hashtag._obtain_metadata()
|
|
return hashtag
|
|
|
|
@property
|
|
def name(self):
|
|
"""Hashtag name lowercased, without preceeding '#'"""
|
|
return self._node["name"].lower()
|
|
|
|
def _query(self, params):
|
|
return self._context.get_json("explore/tags/{0}/".format(self.name),
|
|
params)["graphql"]["hashtag"]
|
|
|
|
def _obtain_metadata(self):
|
|
if not self._has_full_metadata:
|
|
self._node = self._query({"__a": 1})
|
|
self._has_full_metadata = True
|
|
|
|
def _asdict(self):
|
|
json_node = self._node.copy()
|
|
# remove posts
|
|
json_node.pop("edge_hashtag_to_top_posts", None)
|
|
json_node.pop("edge_hashtag_to_media", None)
|
|
return json_node
|
|
|
|
def __repr__(self):
|
|
return "<Hashtag #{}>".format(self.name)
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
if isinstance(other, Hashtag):
|
|
return self.name == other.name
|
|
return NotImplemented
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.name)
|
|
|
|
def _metadata(self, *keys) -> Any:
|
|
try:
|
|
d = self._node
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
except KeyError:
|
|
self._obtain_metadata()
|
|
d = self._node
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
|
|
@property
|
|
def hashtagid(self) -> int:
|
|
return int(self._metadata("id"))
|
|
|
|
@property
|
|
def profile_pic_url(self) -> str:
|
|
return self._metadata("profile_pic_url")
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return self._metadata("description")
|
|
|
|
@property
|
|
def allow_following(self) -> bool:
|
|
return self._metadata("allow_following")
|
|
|
|
@property
|
|
def is_following(self) -> bool:
|
|
return self._metadata("is_following")
|
|
|
|
@property
|
|
def is_top_media_only(self) -> bool:
|
|
return self._metadata("is_top_media_only")
|
|
|
|
def get_related_tags(self) -> Iterator["Hashtag"]:
|
|
"""Yields similar hashtags."""
|
|
yield from (Hashtag(self._context, edge["node"])
|
|
for edge in self._metadata("edge_hashtag_to_related_tags", "edges"))
|
|
|
|
def get_top_posts(self) -> Iterator[Post]:
|
|
"""Yields the top posts of the hashtag."""
|
|
yield from (Post(self._context, edge["node"])
|
|
for edge in self._metadata("edge_hashtag_to_top_posts", "edges"))
|
|
|
|
@property
|
|
def mediacount(self) -> int:
|
|
"""
|
|
The count of all media associated with this hashtag.
|
|
|
|
The number of posts with a certain hashtag may differ from the number of posts that can actually be accessed, as
|
|
the hashtag count might include private posts
|
|
"""
|
|
return self._metadata("edge_hashtag_to_media", "count")
|
|
|
|
def get_posts(self) -> Iterator[Post]:
|
|
"""Yields the posts associated with this hashtag."""
|
|
self._metadata("edge_hashtag_to_media", "edges")
|
|
self._metadata("edge_hashtag_to_media", "page_info")
|
|
conn = self._metadata("edge_hashtag_to_media")
|
|
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
|
|
while conn["page_info"]["has_next_page"]:
|
|
data = self._query({'__a': 1, 'max_id': conn["page_info"]["end_cursor"]})
|
|
conn = data["edge_hashtag_to_media"]
|
|
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
|
|
|
|
def get_all_posts(self) -> Iterator[Post]:
|
|
"""Yields all posts, i.e. all most recent posts and the top posts, in almost-chronological order."""
|
|
sorted_top_posts = iter(sorted(self.get_top_posts(), key=lambda p: p.date_utc, reverse=True))
|
|
other_posts = self.get_posts()
|
|
next_top = next(sorted_top_posts, None)
|
|
next_other = next(other_posts, None)
|
|
while next_top is not None or next_other is not None:
|
|
if next_other is None:
|
|
assert next_top is not None
|
|
yield next_top
|
|
yield from sorted_top_posts
|
|
break
|
|
if next_top is None:
|
|
assert next_other is not None
|
|
yield next_other
|
|
yield from other_posts
|
|
break
|
|
if next_top == next_other:
|
|
yield next_top
|
|
next_top = next(sorted_top_posts, None)
|
|
next_other = next(other_posts, None)
|
|
continue
|
|
if next_top.date_utc > next_other.date_utc:
|
|
yield next_top
|
|
next_top = next(sorted_top_posts, None)
|
|
else:
|
|
yield next_other
|
|
next_other = next(other_posts, None)
|
|
|
|
|
|
class TopSearchResults:
|
|
"""
|
|
An invocation of this class triggers a search on Instagram for the provided search string.
|
|
|
|
Provides methods to access the search results as profiles (:class:`Profile`), locations (:class:`PostLocation`) and
|
|
hashtags.
|
|
|
|
:param context: :attr:`Instaloader.context` used to send the query for the search.
|
|
:param searchstring: String to search for with Instagram's "top search".
|
|
"""
|
|
|
|
def __init__(self, context: InstaloaderContext, searchstring: str):
|
|
self._context = context
|
|
self._searchstring = searchstring
|
|
# The `__a` param is only needed to prevent `get_json()` from searching for 'window._sharedData'.
|
|
self._node = context.get_json('web/search/topsearch/',
|
|
params={'context': 'blended',
|
|
'query': searchstring,
|
|
'include_reel': False,
|
|
'__a': 1})
|
|
|
|
def get_profiles(self) -> Iterator[Profile]:
|
|
"""
|
|
Provides the :class:`Profile` instances from the search result.
|
|
"""
|
|
for user in self._node.get('users', []):
|
|
user_node = user['user']
|
|
if 'pk' in user_node:
|
|
user_node['id'] = user_node['pk']
|
|
yield Profile(self._context, user_node)
|
|
|
|
def get_prefixed_usernames(self) -> Iterator[str]:
|
|
"""
|
|
Provides all profile names from the search result that start with the search string.
|
|
"""
|
|
for user in self._node.get('users', []):
|
|
username = user.get('user', {}).get('username', '')
|
|
if username.startswith(self._searchstring):
|
|
yield username
|
|
|
|
def get_locations(self) -> Iterator[PostLocation]:
|
|
"""
|
|
Provides instances of :class:`PostLocation` from the search result.
|
|
"""
|
|
for location in self._node.get('places', []):
|
|
place = location.get('place', {})
|
|
slug = place.get('slug')
|
|
loc = place.get('location', {})
|
|
yield PostLocation(int(loc['pk']), loc['name'], slug, None, loc['lat'], loc['lng'])
|
|
|
|
def get_hashtag_strings(self) -> Iterator[str]:
|
|
"""
|
|
Provides the hashtags from the search result as strings.
|
|
"""
|
|
for hashtag in self._node.get('hashtags', []):
|
|
name = hashtag.get('hashtag', {}).get('name')
|
|
if name:
|
|
yield name
|
|
|
|
def get_hashtags(self) -> Iterator[Hashtag]:
|
|
"""
|
|
Provides the hashtags from the search result.
|
|
|
|
.. versionadded:: 4.4
|
|
"""
|
|
for hashtag in self._node.get('hashtags', []):
|
|
node = hashtag.get('hashtag', {})
|
|
if 'name' in node:
|
|
yield Hashtag(self._context, node)
|
|
|
|
@property
|
|
def searchstring(self) -> str:
|
|
"""
|
|
The string that was searched for on Instagram to produce this :class:`TopSearchResults` instance.
|
|
"""
|
|
return self._searchstring
|
|
|
|
|
|
JsonExportable = Union[Post, Profile, StoryItem, Hashtag, FrozenNodeIterator]
|
|
|
|
|
|
def save_structure_to_file(structure: JsonExportable, filename: str) -> None:
|
|
"""Saves a :class:`Post`, :class:`Profile`, :class:`StoryItem` or :class:`Hashtag` to a '.json' or '.json.xz' file
|
|
such that it can later be loaded by :func:`load_structure_from_file`.
|
|
|
|
If the specified filename ends in '.xz', the file will be LZMA compressed. Otherwise, a pretty-printed JSON file
|
|
will be created.
|
|
|
|
:param structure: :class:`Post`, :class:`Profile`, :class:`StoryItem` or :class:`Hashtag`
|
|
:param filename: Filename, ends in '.json' or '.json.xz'
|
|
"""
|
|
json_structure = {'node': structure._asdict(),
|
|
'instaloader': {'version': __version__, 'node_type': structure.__class__.__name__}}
|
|
compress = filename.endswith('.xz')
|
|
if compress:
|
|
with lzma.open(filename, 'wt', check=lzma.CHECK_NONE) as fp:
|
|
json.dump(json_structure, fp=fp, separators=(',', ':'))
|
|
else:
|
|
with open(filename, 'wt') as fp:
|
|
json.dump(json_structure, fp=fp, indent=4, sort_keys=True)
|
|
|
|
|
|
def load_structure_from_file(context: InstaloaderContext, filename: str) -> JsonExportable:
|
|
"""Loads a :class:`Post`, :class:`Profile`, :class:`StoryItem` or :class:`Hashtag` from a '.json' or '.json.xz' file
|
|
that has been saved by :func:`save_structure_to_file`.
|
|
|
|
:param context: :attr:`Instaloader.context` linked to the new object, used for additional queries if neccessary.
|
|
:param filename: Filename, ends in '.json' or '.json.xz'
|
|
"""
|
|
compressed = filename.endswith('.xz')
|
|
if compressed:
|
|
fp = lzma.open(filename, 'rt')
|
|
else:
|
|
fp = open(filename, 'rt')
|
|
json_structure = json.load(fp)
|
|
fp.close()
|
|
if 'node' in json_structure and 'instaloader' in json_structure and \
|
|
'node_type' in json_structure['instaloader']:
|
|
node_type = json_structure['instaloader']['node_type']
|
|
if node_type == "Post":
|
|
return Post(context, json_structure['node'])
|
|
elif node_type == "Profile":
|
|
return Profile(context, json_structure['node'])
|
|
elif node_type == "StoryItem":
|
|
return StoryItem(context, json_structure['node'])
|
|
elif node_type == "Hashtag":
|
|
return Hashtag(context, json_structure['node'])
|
|
elif node_type == "FrozenNodeIterator":
|
|
return FrozenNodeIterator(**json_structure['node'])
|
|
else:
|
|
raise InvalidArgumentException("{}: Not an Instaloader JSON.".format(filename))
|
|
elif 'shortcode' in json_structure:
|
|
# Post JSON created with Instaloader v3
|
|
return Post.from_shortcode(context, json_structure['shortcode'])
|
|
else:
|
|
raise InvalidArgumentException("{}: Not an Instaloader JSON.".format(filename))
|