1498 lines
59 KiB
Python
1498 lines
59 KiB
Python
import json
|
|
import lzma
|
|
import re
|
|
from base64 import b64decode, b64encode
|
|
from collections import namedtuple
|
|
from datetime import datetime
|
|
from typing import Any, Dict, Iterator, List, Optional, Union
|
|
|
|
from . import __version__
|
|
from .exceptions import *
|
|
from .instaloadercontext import InstaloaderContext
|
|
from .nodeiterator import FrozenNodeIterator, NodeIterator
|
|
|
|
PostSidecarNode = namedtuple('PostSidecarNode', ['is_video', 'display_url', 'video_url'])
|
|
PostSidecarNode.__doc__ = "Item of a Sidecar Post."
|
|
PostSidecarNode.is_video.__doc__ = "Whether this node is a video."
|
|
PostSidecarNode.display_url.__doc__ = "URL of image or video thumbnail."
|
|
PostSidecarNode.video_url.__doc__ = "URL of video or None."
|
|
|
|
PostCommentAnswer = namedtuple('PostCommentAnswer', ['id', 'created_at_utc', 'text', 'owner', 'likes_count'])
|
|
PostCommentAnswer.id.__doc__ = "ID number of comment."
|
|
PostCommentAnswer.created_at_utc.__doc__ = ":class:`~datetime.datetime` when comment was created (UTC)."
|
|
PostCommentAnswer.text.__doc__ = "Comment text."
|
|
PostCommentAnswer.owner.__doc__ = "Owner :class:`Profile` of the comment."
|
|
PostCommentAnswer.likes_count.__doc__ = "Number of likes on comment."
|
|
|
|
PostComment = namedtuple('PostComment', (*PostCommentAnswer._fields, 'answers')) # type: ignore
|
|
for field in PostCommentAnswer._fields:
|
|
getattr(PostComment, field).__doc__ = getattr(PostCommentAnswer, field).__doc__ # pylint: disable=no-member
|
|
PostComment.answers.__doc__ = r"Iterator which yields all :class:`PostCommentAnswer`\ s for the comment." # type: ignore
|
|
|
|
PostLocation = namedtuple('PostLocation', ['id', 'name', 'slug', 'has_public_page', 'lat', 'lng'])
|
|
PostLocation.id.__doc__ = "ID number of location."
|
|
PostLocation.name.__doc__ = "Location name."
|
|
PostLocation.slug.__doc__ = "URL friendly variant of location name."
|
|
PostLocation.has_public_page.__doc__ = "Whether location has a public page."
|
|
PostLocation.lat.__doc__ = "Latitude (:class:`float`)."
|
|
PostLocation.lng.__doc__ = "Longitude (:class:`float`)."
|
|
|
|
|
|
class Post:
|
|
"""
|
|
Structure containing information about an Instagram post.
|
|
|
|
Created by methods :meth:`Profile.get_posts`, :meth:`Instaloader.get_hashtag_posts`,
|
|
:meth:`Instaloader.get_feed_posts` and :meth:`Profile.get_saved_posts`, which return iterators of Posts::
|
|
|
|
L = Instaloader()
|
|
for post in L.get_hashtag_posts(HASHTAG):
|
|
L.download_post(post, target='#'+HASHTAG)
|
|
|
|
Might also be created with::
|
|
|
|
post = Post.from_shortcode(L.context, SHORTCODE)
|
|
|
|
This class unifies access to the properties associated with a post. It implements == and is
|
|
hashable.
|
|
|
|
:param context: :attr:`Instaloader.context` used for additional queries if neccessary..
|
|
:param node: Node structure, as returned by Instagram.
|
|
:param owner_profile: The Profile of the owner, if already known at creation.
|
|
"""
|
|
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any],
|
|
owner_profile: Optional['Profile'] = None):
|
|
assert 'shortcode' in node or 'code' in node
|
|
|
|
self._context = context
|
|
self._node = node
|
|
self._owner_profile = owner_profile
|
|
self._full_metadata_dict = None # type: Optional[Dict[str, Any]]
|
|
self._location = None # type: Optional[PostLocation]
|
|
self._iphone_struct_ = None
|
|
if 'iphone_struct' in node:
|
|
# if loaded from JSON with load_structure_from_file()
|
|
self._iphone_struct_ = node['iphone_struct']
|
|
|
|
@classmethod
|
|
def from_shortcode(cls, context: InstaloaderContext, shortcode: str):
|
|
"""Create a post object from a given shortcode"""
|
|
# pylint:disable=protected-access
|
|
post = cls(context, {'shortcode': shortcode})
|
|
post._node = post._full_metadata
|
|
return post
|
|
|
|
@classmethod
|
|
def from_mediaid(cls, context: InstaloaderContext, mediaid: int):
|
|
"""Create a post object from a given mediaid"""
|
|
return cls.from_shortcode(context, Post.mediaid_to_shortcode(mediaid))
|
|
|
|
@staticmethod
|
|
def shortcode_to_mediaid(code: str) -> int:
|
|
if len(code) > 11:
|
|
raise InvalidArgumentException("Wrong shortcode \"{0}\", unable to convert to mediaid.".format(code))
|
|
code = 'A' * (12 - len(code)) + code
|
|
return int.from_bytes(b64decode(code.encode(), b'-_'), 'big')
|
|
|
|
@staticmethod
|
|
def mediaid_to_shortcode(mediaid: int) -> str:
|
|
if mediaid.bit_length() > 64:
|
|
raise InvalidArgumentException("Wrong mediaid {0}, unable to convert to shortcode".format(str(mediaid)))
|
|
return b64encode(mediaid.to_bytes(9, 'big'), b'-_').decode().replace('A', ' ').lstrip().replace(' ', 'A')
|
|
|
|
@staticmethod
|
|
def supported_graphql_types() -> List[str]:
|
|
"""The values of __typename fields that the :class:`Post` class can handle."""
|
|
return ["GraphImage", "GraphVideo", "GraphSidecar"]
|
|
|
|
def _asdict(self):
|
|
node = self._node
|
|
if self._full_metadata_dict:
|
|
node.update(self._full_metadata_dict)
|
|
if self._owner_profile:
|
|
node['owner'] = self.owner_profile._asdict()
|
|
if self._location:
|
|
node['location'] = self._location._asdict()
|
|
if self._iphone_struct_:
|
|
node['iphone_struct'] = self._iphone_struct_
|
|
return node
|
|
|
|
@property
|
|
def shortcode(self) -> str:
|
|
"""Media shortcode. URL of the post is instagram.com/p/<shortcode>/."""
|
|
return self._node['shortcode'] if 'shortcode' in self._node else self._node['code']
|
|
|
|
@property
|
|
def mediaid(self) -> int:
|
|
"""The mediaid is a decimal representation of the media shortcode."""
|
|
return int(self._node['id'])
|
|
|
|
def __repr__(self):
|
|
return '<Post {}>'.format(self.shortcode)
|
|
|
|
def __eq__(self, o: object) -> bool:
|
|
if isinstance(o, Post):
|
|
return self.shortcode == o.shortcode
|
|
return NotImplemented
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.shortcode)
|
|
|
|
def _obtain_metadata(self):
|
|
if not self._full_metadata_dict:
|
|
pic_json = self._context.graphql_query(
|
|
'2b0673e0dc4580674a88d426fe00ea90',
|
|
{'shortcode': self.shortcode}
|
|
)
|
|
self._full_metadata_dict = pic_json['data']['shortcode_media']
|
|
if self._full_metadata_dict is None:
|
|
# issue #449
|
|
self._context.error("Fetching Post metadata failed (issue #449). "
|
|
"The following data has been returned:\n"
|
|
+ json.dumps(pic_json['entry_data'], indent=2))
|
|
raise BadResponseException("Fetching Post metadata failed.")
|
|
if self.shortcode != self._full_metadata_dict['shortcode']:
|
|
self._node.update(self._full_metadata_dict)
|
|
raise PostChangedException
|
|
|
|
@property
|
|
def _full_metadata(self) -> Dict[str, Any]:
|
|
self._obtain_metadata()
|
|
assert self._full_metadata_dict is not None
|
|
return self._full_metadata_dict
|
|
|
|
@property
|
|
def _iphone_struct(self) -> Dict[str, Any]:
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to access iPhone media info endpoint.")
|
|
if not self._iphone_struct_:
|
|
data = self._context.get_iphone_json(path='api/v1/media/{}/info/'.format(self.mediaid), params={})
|
|
self._iphone_struct_ = data['items'][0]
|
|
return self._iphone_struct_
|
|
|
|
def _field(self, *keys) -> Any:
|
|
"""Lookups given fields in _node, and if not found in _full_metadata. Raises KeyError if not found anywhere."""
|
|
try:
|
|
d = self._node
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
except KeyError:
|
|
d = self._full_metadata
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
|
|
@property
|
|
def owner_profile(self) -> 'Profile':
|
|
""":class:`Profile` instance of the Post's owner."""
|
|
if not self._owner_profile:
|
|
if 'username' in self._node['owner']:
|
|
owner_struct = self._node['owner']
|
|
else:
|
|
# Sometimes, the 'owner' structure does not contain the username, only the user's ID. In that case,
|
|
# this call triggers downloading of the complete Post metadata struct, where the owner username
|
|
# is contained.
|
|
# Note that we cannot use Profile.from_id() here since that would lead us into a recursion.
|
|
owner_struct = self._full_metadata['owner']
|
|
self._owner_profile = Profile(self._context, owner_struct)
|
|
return self._owner_profile
|
|
|
|
@property
|
|
def owner_username(self) -> str:
|
|
"""The Post's lowercase owner name."""
|
|
return self.owner_profile.username
|
|
|
|
@property
|
|
def owner_id(self) -> int:
|
|
"""The ID of the Post's owner."""
|
|
return self.owner_profile.userid
|
|
|
|
@property
|
|
def date_local(self) -> datetime:
|
|
"""Timestamp when the post was created (local time zone)."""
|
|
return datetime.fromtimestamp(self._node["date"]
|
|
if "date" in self._node
|
|
else self._node["taken_at_timestamp"])
|
|
|
|
@property
|
|
def date_utc(self) -> datetime:
|
|
"""Timestamp when the post was created (UTC)."""
|
|
return datetime.utcfromtimestamp(self._node["date"]
|
|
if "date" in self._node
|
|
else self._node["taken_at_timestamp"])
|
|
|
|
@property
|
|
def date(self) -> datetime:
|
|
"""Synonym to :attr:`~Post.date_utc`"""
|
|
return self.date_utc
|
|
|
|
@property
|
|
def profile(self) -> str:
|
|
"""Synonym to :attr:`~Post.owner_username`"""
|
|
return self.owner_username
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
"""URL of the picture / video thumbnail of the post"""
|
|
if self.typename == "GraphImage" and self._context.is_logged_in:
|
|
try:
|
|
orig_url = self._iphone_struct['image_versions2']['candidates'][0]['url']
|
|
url = re.sub(r'&se=\d+(&?)', r'\1', orig_url)
|
|
return url
|
|
except (InstaloaderException, KeyError, IndexError) as err:
|
|
self._context.error('{} Unable to fetch high quality image version of {}.'.format(err, self))
|
|
return self._node["display_url"] if "display_url" in self._node else self._node["display_src"]
|
|
|
|
@property
|
|
def typename(self) -> str:
|
|
"""Type of post, GraphImage, GraphVideo or GraphSidecar"""
|
|
return self._field('__typename')
|
|
|
|
def get_sidecar_nodes(self) -> Iterator[PostSidecarNode]:
|
|
"""Sidecar nodes of a Post with typename==GraphSidecar."""
|
|
if self.typename == 'GraphSidecar':
|
|
edges = self._field('edge_sidecar_to_children', 'edges')
|
|
if any(edge['node']['is_video'] for edge in edges):
|
|
# video_url is only present in full metadata, issue #558.
|
|
edges = self._full_metadata['edge_sidecar_to_children']['edges']
|
|
for idx, edge in enumerate(edges):
|
|
node = edge['node']
|
|
is_video = node['is_video']
|
|
display_url = node['display_url']
|
|
if not is_video and self._context.is_logged_in:
|
|
try:
|
|
carousel_media = self._iphone_struct['carousel_media']
|
|
orig_url = carousel_media[idx]['image_versions2']['candidates'][0]['url']
|
|
display_url = re.sub(r'&se=\d+(&?)', r'\1', orig_url)
|
|
except (InstaloaderException, KeyError, IndexError) as err:
|
|
self._context.error('{} Unable to fetch high quality image version of {}.'.format(err, self))
|
|
yield PostSidecarNode(is_video=is_video, display_url=display_url,
|
|
video_url=node['video_url'] if is_video else None)
|
|
|
|
@property
|
|
def caption(self) -> Optional[str]:
|
|
"""Caption."""
|
|
if "edge_media_to_caption" in self._node and self._node["edge_media_to_caption"]["edges"]:
|
|
return self._node["edge_media_to_caption"]["edges"][0]["node"]["text"]
|
|
elif "caption" in self._node:
|
|
return self._node["caption"]
|
|
return None
|
|
|
|
@property
|
|
def caption_hashtags(self) -> List[str]:
|
|
"""List of all lowercased hashtags (without preceeding #) that occur in the Post's caption."""
|
|
if not self.caption:
|
|
return []
|
|
# This regular expression is from jStassen, adjusted to use Python's \w to support Unicode
|
|
# http://blog.jstassen.com/2016/03/code-regex-for-instagram-username-and-hashtags/
|
|
hashtag_regex = re.compile(r"(?:#)(\w(?:(?:\w|(?:\.(?!\.))){0,28}(?:\w))?)")
|
|
return re.findall(hashtag_regex, self.caption.lower())
|
|
|
|
@property
|
|
def caption_mentions(self) -> List[str]:
|
|
"""List of all lowercased profiles that are mentioned in the Post's caption, without preceeding @."""
|
|
if not self.caption:
|
|
return []
|
|
# This regular expression is from jStassen, adjusted to use Python's \w to support Unicode
|
|
# http://blog.jstassen.com/2016/03/code-regex-for-instagram-username-and-hashtags/
|
|
mention_regex = re.compile(r"(?:@)(\w(?:(?:\w|(?:\.(?!\.))){0,28}(?:\w))?)")
|
|
return re.findall(mention_regex, self.caption.lower())
|
|
|
|
@property
|
|
def pcaption(self) -> str:
|
|
"""Printable caption, useful as a format specifier for --filename-pattern.
|
|
|
|
.. versionadded:: 4.2.6"""
|
|
def _elliptify(caption):
|
|
pcaption = ' '.join([s.replace('/', '\u2215') for s in caption.splitlines() if s]).strip()
|
|
return (pcaption[:30] + u"\u2026") if len(pcaption) > 31 else pcaption
|
|
return _elliptify(self.caption) if self.caption else ''
|
|
|
|
@property
|
|
def tagged_users(self) -> List[str]:
|
|
"""List of all lowercased users that are tagged in the Post."""
|
|
try:
|
|
return [edge['node']['user']['username'].lower() for edge in self._field('edge_media_to_tagged_user',
|
|
'edges')]
|
|
except KeyError:
|
|
return []
|
|
|
|
@property
|
|
def is_video(self) -> bool:
|
|
"""True if the Post is a video."""
|
|
return self._node['is_video']
|
|
|
|
@property
|
|
def video_url(self) -> Optional[str]:
|
|
"""URL of the video, or None."""
|
|
if self.is_video:
|
|
return self._field('video_url')
|
|
return None
|
|
|
|
@property
|
|
def video_view_count(self) -> Optional[int]:
|
|
"""View count of the video, or None.
|
|
|
|
.. versionadded:: 4.2.6"""
|
|
if self.is_video:
|
|
return self._field('video_view_count')
|
|
return None
|
|
|
|
@property
|
|
def video_duration(self) -> Optional[float]:
|
|
"""Duration of the video in seconds, or None.
|
|
|
|
.. versionadded:: 4.2.6"""
|
|
if self.is_video:
|
|
return self._field('video_duration')
|
|
return None
|
|
|
|
@property
|
|
def viewer_has_liked(self) -> Optional[bool]:
|
|
"""Whether the viewer has liked the post, or None if not logged in."""
|
|
if not self._context.is_logged_in:
|
|
return None
|
|
if 'likes' in self._node and 'viewer_has_liked' in self._node['likes']:
|
|
return self._node['likes']['viewer_has_liked']
|
|
return self._field('viewer_has_liked')
|
|
|
|
@property
|
|
def likes(self) -> int:
|
|
"""Likes count"""
|
|
return self._field('edge_media_preview_like', 'count')
|
|
|
|
@property
|
|
def comments(self) -> int:
|
|
"""Comment count including answers"""
|
|
# If the count is already present in `self._node`, do not use `self._field` which could trigger fetching the
|
|
# full metadata dict.
|
|
comments = self._node.get('edge_media_to_comment')
|
|
if comments and 'count' in comments:
|
|
return comments['count']
|
|
try:
|
|
return self._field('edge_media_to_parent_comment', 'count')
|
|
except KeyError:
|
|
return self._field('edge_media_to_comment', 'count')
|
|
|
|
def get_comments(self) -> Iterator[PostComment]:
|
|
r"""Iterate over all comments of the post.
|
|
|
|
Each comment is represented by a PostComment namedtuple with fields text (string), created_at (datetime),
|
|
id (int), owner (:class:`Profile`) and answers (:class:`~typing.Iterator`\ [:class:`PostCommentAnswer`])
|
|
if available.
|
|
"""
|
|
def _postcommentanswer(node):
|
|
return PostCommentAnswer(id=int(node['id']),
|
|
created_at_utc=datetime.utcfromtimestamp(node['created_at']),
|
|
text=node['text'],
|
|
owner=Profile(self._context, node['owner']),
|
|
likes_count=node.get('edge_liked_by', {}).get('count', 0))
|
|
|
|
def _postcommentanswers(node):
|
|
if 'edge_threaded_comments' not in node:
|
|
return
|
|
answer_count = node['edge_threaded_comments']['count']
|
|
if answer_count == 0:
|
|
# Avoid doing additional requests if there are no comment answers
|
|
return
|
|
answer_edges = node['edge_threaded_comments']['edges']
|
|
if answer_count == len(answer_edges):
|
|
# If the answer's metadata already contains all comments, don't do GraphQL requests to obtain them
|
|
yield from (_postcommentanswer(comment['node']) for comment in answer_edges)
|
|
return
|
|
yield from NodeIterator(
|
|
self._context,
|
|
'51fdd02b67508306ad4484ff574a0b62',
|
|
lambda d: d['data']['comment']['edge_threaded_comments'],
|
|
_postcommentanswer,
|
|
{'comment_id': node['id']},
|
|
'https://www.instagram.com/p/{0}/'.format(self.shortcode),
|
|
)
|
|
|
|
def _postcomment(node):
|
|
return PostComment(*_postcommentanswer(node),
|
|
answers=_postcommentanswers(node))
|
|
if self.comments == 0:
|
|
# Avoid doing additional requests if there are no comments
|
|
return
|
|
|
|
comment_edges = self._field('edge_media_to_comment', 'edges')
|
|
answers_count = sum([edge['node'].get('edge_threaded_comments', {}).get('count', 0) for edge in comment_edges])
|
|
|
|
if self.comments == len(comment_edges) + answers_count:
|
|
# If the Post's metadata already contains all parent comments, don't do GraphQL requests to obtain them
|
|
yield from (_postcomment(comment['node']) for comment in comment_edges)
|
|
return
|
|
yield from NodeIterator(
|
|
self._context,
|
|
'97b41c52301f77ce508f55e66d17620e',
|
|
lambda d: d['data']['shortcode_media']['edge_media_to_parent_comment'],
|
|
_postcomment,
|
|
{'shortcode': self.shortcode},
|
|
'https://www.instagram.com/p/{0}/'.format(self.shortcode),
|
|
)
|
|
|
|
def get_likes(self) -> Iterator['Profile']:
|
|
"""Iterate over all likes of the post. A :class:`Profile` instance of each likee is yielded."""
|
|
if self.likes == 0:
|
|
# Avoid doing additional requests if there are no comments
|
|
return
|
|
likes_edges = self._field('edge_media_preview_like', 'edges')
|
|
if self.likes == len(likes_edges):
|
|
# If the Post's metadata already contains all likes, don't do GraphQL requests to obtain them
|
|
yield from (Profile(self._context, like['node']) for like in likes_edges)
|
|
return
|
|
yield from NodeIterator(
|
|
self._context,
|
|
'1cb6ec562846122743b61e492c85999f',
|
|
lambda d: d['data']['shortcode_media']['edge_liked_by'],
|
|
lambda n: Profile(self._context, n),
|
|
{'shortcode': self.shortcode},
|
|
'https://www.instagram.com/p/{0}/'.format(self.shortcode),
|
|
)
|
|
|
|
@property
|
|
def is_sponsored(self) -> bool:
|
|
"""
|
|
Whether Post is a sponsored post, equivalent to non-empty :meth:`Post.sponsor_users`.
|
|
|
|
.. versionadded:: 4.4
|
|
"""
|
|
try:
|
|
sponsor_edges = self._field('edge_media_to_sponsor_user', 'edges')
|
|
except KeyError:
|
|
return False
|
|
return bool(sponsor_edges)
|
|
|
|
@property
|
|
def sponsor_users(self) -> List['Profile']:
|
|
"""
|
|
The Post's sponsors.
|
|
|
|
.. versionadded:: 4.4
|
|
"""
|
|
return ([] if not self.is_sponsored else
|
|
[Profile(self._context, edge['node']['sponsor']) for edge in
|
|
self._field('edge_media_to_sponsor_user', 'edges')])
|
|
|
|
@property
|
|
def location(self) -> Optional[PostLocation]:
|
|
"""
|
|
If the Post has a location, returns PostLocation namedtuple with fields 'id', 'lat' and 'lng' and 'name'.
|
|
|
|
.. versionchanged:: 4.2.9
|
|
Require being logged in (as required by Instagram), return None if not logged-in.
|
|
"""
|
|
loc = self._field("location")
|
|
if self._location or not loc:
|
|
return self._location
|
|
if not self._context.is_logged_in:
|
|
return None
|
|
location_id = int(loc['id'])
|
|
if any(k not in loc for k in ('name', 'slug', 'has_public_page', 'lat', 'lng')):
|
|
loc = self._context.get_json("explore/locations/{0}/".format(location_id),
|
|
params={'__a': 1})['graphql']['location']
|
|
self._location = PostLocation(location_id, loc['name'], loc['slug'], loc['has_public_page'],
|
|
loc['lat'], loc['lng'])
|
|
return self._location
|
|
|
|
|
|
class Profile:
|
|
"""
|
|
An Instagram Profile.
|
|
|
|
Provides methods for accessing profile properties, as well as :meth:`Profile.get_posts` and for own profile
|
|
:meth:`Profile.get_saved_posts`.
|
|
|
|
Get instances with :meth:`Post.owner_profile`, :meth:`StoryItem.owner_profile`, :meth:`Profile.get_followees`,
|
|
:meth:`Profile.get_followers` or::
|
|
|
|
L = Instaloader()
|
|
profile = Profile.from_username(L.context, USERNAME)
|
|
|
|
Provides :meth:`Profile.get_posts` and for own profile :meth:`Profile.get_saved_posts` to iterate over associated
|
|
:class:`Post` objects::
|
|
|
|
for post in profile.get_posts():
|
|
L.download_post(post, target=profile.username)
|
|
|
|
:meth:`Profile.get_followees` and :meth:`Profile.get_followers`::
|
|
|
|
print("{} follows these profiles:".format(profile.username))
|
|
for followee in profile.get_followees():
|
|
print(followee.username)
|
|
|
|
Also, this class implements == and is hashable.
|
|
"""
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
|
|
assert 'username' in node
|
|
self._context = context
|
|
self._has_public_story = None # type: Optional[bool]
|
|
self._node = node
|
|
self._has_full_metadata = False
|
|
self._iphone_struct_ = None
|
|
if 'iphone_struct' in node:
|
|
# if loaded from JSON with load_structure_from_file()
|
|
self._iphone_struct_ = node['iphone_struct']
|
|
|
|
@classmethod
|
|
def from_username(cls, context: InstaloaderContext, username: str):
|
|
"""Create a Profile instance from a given username, raise exception if it does not exist.
|
|
|
|
See also :meth:`Instaloader.check_profile_id`.
|
|
|
|
:param context: :attr:`Instaloader.context`
|
|
:param username: Username
|
|
:raises: :class:`ProfileNotExistsException`
|
|
"""
|
|
# pylint:disable=protected-access
|
|
profile = cls(context, {'username': username.lower()})
|
|
profile._obtain_metadata() # to raise ProfileNotExistException now in case username is invalid
|
|
return profile
|
|
|
|
@classmethod
|
|
def from_id(cls, context: InstaloaderContext, profile_id: int):
|
|
"""Create a Profile instance from a given userid. If possible, use :meth:`Profile.from_username`
|
|
or constructor directly rather than this method, since it requires more requests.
|
|
|
|
:param context: :attr:`Instaloader.context`
|
|
:param profile_id: userid
|
|
:raises: :class:`ProfileNotExistsException`
|
|
"""
|
|
if profile_id in context.profile_id_cache:
|
|
return context.profile_id_cache[profile_id]
|
|
data = context.graphql_query('7c16654f22c819fb63d1183034a5162f',
|
|
{'user_id': str(profile_id),
|
|
'include_chaining': False,
|
|
'include_reel': True,
|
|
'include_suggested_users': False,
|
|
'include_logged_out_extras': False,
|
|
'include_highlight_reels': False},
|
|
rhx_gis=context.root_rhx_gis)['data']['user']
|
|
if data:
|
|
profile = cls(context, data['reel']['owner'])
|
|
else:
|
|
raise ProfileNotExistsException("No profile found, the user may have blocked you (ID: " +
|
|
str(profile_id) + ").")
|
|
context.profile_id_cache[profile_id] = profile
|
|
return profile
|
|
|
|
def _asdict(self):
|
|
json_node = self._node.copy()
|
|
# remove posts to avoid "Circular reference detected" exception
|
|
json_node.pop('edge_media_collections', None)
|
|
json_node.pop('edge_owner_to_timeline_media', None)
|
|
json_node.pop('edge_saved_media', None)
|
|
json_node.pop('edge_felix_video_timeline', None)
|
|
if self._iphone_struct_:
|
|
json_node['iphone_struct'] = self._iphone_struct_
|
|
return json_node
|
|
|
|
def _obtain_metadata(self):
|
|
try:
|
|
if not self._has_full_metadata:
|
|
metadata = self._context.get_json('{}/feed/'.format(self.username), params={})
|
|
self._node = metadata['entry_data']['ProfilePage'][0]['graphql']['user']
|
|
self._has_full_metadata = True
|
|
except (QueryReturnedNotFoundException, KeyError) as err:
|
|
top_search_results = TopSearchResults(self._context, self.username)
|
|
similar_profiles = [profile.username for profile in top_search_results.get_profiles()]
|
|
if similar_profiles:
|
|
raise ProfileNotExistsException('Profile {} does not exist.\nThe most similar profile{}: {}.'
|
|
.format(self.username,
|
|
's are' if len(similar_profiles) > 1 else ' is',
|
|
', '.join(similar_profiles[0:5]))) from err
|
|
raise ProfileNotExistsException('Profile {} does not exist.'.format(self.username)) from err
|
|
|
|
def _metadata(self, *keys) -> Any:
|
|
try:
|
|
d = self._node
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
except KeyError:
|
|
self._obtain_metadata()
|
|
d = self._node
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
|
|
@property
|
|
def _iphone_struct(self) -> Dict[str, Any]:
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to access iPhone profile info endpoint.")
|
|
if not self._iphone_struct_:
|
|
data = self._context.get_iphone_json(path='api/v1/users/{}/info/'.format(self.userid), params={})
|
|
self._iphone_struct_ = data['user']
|
|
return self._iphone_struct_
|
|
|
|
@property
|
|
def userid(self) -> int:
|
|
"""User ID"""
|
|
return int(self._metadata('id'))
|
|
|
|
@property
|
|
def username(self) -> str:
|
|
"""Profile Name"""
|
|
return self._metadata('username').lower()
|
|
|
|
def __repr__(self):
|
|
return '<Profile {} ({})>'.format(self.username, self.userid)
|
|
|
|
def __eq__(self, o: object) -> bool:
|
|
if isinstance(o, Profile):
|
|
return self.userid == o.userid
|
|
return NotImplemented
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.userid)
|
|
|
|
@property
|
|
def is_private(self) -> bool:
|
|
return self._metadata('is_private')
|
|
|
|
@property
|
|
def followed_by_viewer(self) -> bool:
|
|
return self._metadata('followed_by_viewer')
|
|
|
|
@property
|
|
def mediacount(self) -> int:
|
|
return self._metadata('edge_owner_to_timeline_media', 'count')
|
|
|
|
@property
|
|
def igtvcount(self) -> int:
|
|
return self._metadata('edge_felix_video_timeline', 'count')
|
|
|
|
@property
|
|
def followers(self) -> int:
|
|
return self._metadata('edge_followed_by', 'count')
|
|
|
|
@property
|
|
def followees(self) -> int:
|
|
return self._metadata('edge_follow', 'count')
|
|
|
|
@property
|
|
def external_url(self) -> Optional[str]:
|
|
return self._metadata('external_url')
|
|
|
|
@property
|
|
def is_business_account(self) -> bool:
|
|
""".. versionadded:: 4.4"""
|
|
return self._metadata('is_business_account')
|
|
|
|
@property
|
|
def business_category_name(self) -> str:
|
|
""".. versionadded:: 4.4"""
|
|
return self._metadata('business_category_name')
|
|
|
|
@property
|
|
def biography(self) -> str:
|
|
return self._metadata('biography')
|
|
|
|
@property
|
|
def blocked_by_viewer(self) -> bool:
|
|
return self._metadata('blocked_by_viewer')
|
|
|
|
@property
|
|
def follows_viewer(self) -> bool:
|
|
return self._metadata('follows_viewer')
|
|
|
|
@property
|
|
def full_name(self) -> str:
|
|
return self._metadata('full_name')
|
|
|
|
@property
|
|
def has_blocked_viewer(self) -> bool:
|
|
return self._metadata('has_blocked_viewer')
|
|
|
|
@property
|
|
def has_highlight_reels(self) -> bool:
|
|
"""
|
|
.. deprecated:: 4.0.6
|
|
Always returns `True` since :issue:`153`.
|
|
|
|
Before broken, this indicated whether the :class:`Profile` had available stories.
|
|
"""
|
|
return True
|
|
|
|
@property
|
|
def has_public_story(self) -> bool:
|
|
if not self._has_public_story:
|
|
self._obtain_metadata()
|
|
# query not rate limited if invoked anonymously:
|
|
with self._context.anonymous_copy() as anonymous_context:
|
|
data = anonymous_context.graphql_query('9ca88e465c3f866a76f7adee3871bdd8',
|
|
{'user_id': self.userid, 'include_chaining': False,
|
|
'include_reel': False, 'include_suggested_users': False,
|
|
'include_logged_out_extras': True,
|
|
'include_highlight_reels': False},
|
|
'https://www.instagram.com/{}/'.format(self.username))
|
|
self._has_public_story = data['data']['user']['has_public_story']
|
|
assert self._has_public_story is not None
|
|
return self._has_public_story
|
|
|
|
@property
|
|
def has_viewable_story(self) -> bool:
|
|
"""
|
|
.. deprecated:: 4.0.6
|
|
|
|
Some stories are private. This property determines if the :class:`Profile`
|
|
has at least one story which can be viewed using the associated :class:`InstaloaderContext`,
|
|
i.e. the viewer has privileges to view it.
|
|
"""
|
|
return self.has_public_story or self.followed_by_viewer and self.has_highlight_reels
|
|
|
|
@property
|
|
def has_requested_viewer(self) -> bool:
|
|
return self._metadata('has_requested_viewer')
|
|
|
|
@property
|
|
def is_verified(self) -> bool:
|
|
return self._metadata('is_verified')
|
|
|
|
@property
|
|
def requested_by_viewer(self) -> bool:
|
|
return self._metadata('requested_by_viewer')
|
|
|
|
@property
|
|
def profile_pic_url(self) -> str:
|
|
"""Return URL of profile picture. If logged in, the HD version is returned, otherwise a lower-quality version.
|
|
|
|
.. versionadded:: 4.0.3
|
|
|
|
.. versionchanged:: 4.2.1
|
|
Require being logged in for HD version (as required by Instagram)."""
|
|
if self._context.is_logged_in:
|
|
try:
|
|
return self._iphone_struct['hd_profile_pic_url_info']['url']
|
|
except (InstaloaderException, KeyError) as err:
|
|
self._context.error('{} Unable to fetch high quality profile pic.'.format(err))
|
|
return self._metadata("profile_pic_url_hd")
|
|
else:
|
|
return self._metadata("profile_pic_url_hd")
|
|
|
|
def get_profile_pic_url(self) -> str:
|
|
""".. deprecated:: 4.0.3
|
|
|
|
Use :attr:`profile_pic_url`."""
|
|
return self.profile_pic_url
|
|
|
|
def get_posts(self) -> NodeIterator[Post]:
|
|
"""Retrieve all posts from a profile.
|
|
|
|
:rtype: NodeIterator[Post]"""
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'472f257a40c653c64c666ce877d59d2b',
|
|
lambda d: d['data']['user']['edge_owner_to_timeline_media'],
|
|
lambda n: Post(self._context, n, self),
|
|
{'id': self.userid},
|
|
'https://www.instagram.com/{0}/'.format(self.username),
|
|
self._metadata('edge_owner_to_timeline_media'),
|
|
)
|
|
|
|
def get_saved_posts(self) -> NodeIterator[Post]:
|
|
"""Get Posts that are marked as saved by the user.
|
|
|
|
:rtype: NodeIterator[Post]"""
|
|
|
|
if self.username != self._context.username:
|
|
raise LoginRequiredException("--login={} required to get that profile's saved posts.".format(self.username))
|
|
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'f883d95537fbcd400f466f63d42bd8a1',
|
|
lambda d: d['data']['user']['edge_saved_media'],
|
|
lambda n: Post(self._context, n),
|
|
{'id': self.userid},
|
|
'https://www.instagram.com/{0}/'.format(self.username),
|
|
self._metadata('edge_saved_media'),
|
|
)
|
|
|
|
def get_tagged_posts(self) -> NodeIterator[Post]:
|
|
"""Retrieve all posts where a profile is tagged.
|
|
|
|
:rtype: NodeIterator[Post]
|
|
|
|
.. versionadded:: 4.0.7"""
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'e31a871f7301132ceaab56507a66bbb7',
|
|
lambda d: d['data']['user']['edge_user_to_photos_of_you'],
|
|
lambda n: Post(self._context, n, self if int(n['owner']['id']) == self.userid else None),
|
|
{'id': self.userid},
|
|
'https://www.instagram.com/{0}/'.format(self.username),
|
|
)
|
|
|
|
def get_igtv_posts(self) -> NodeIterator[Post]:
|
|
"""Retrieve all IGTV posts.
|
|
|
|
:rtype: NodeIterator[Post]
|
|
|
|
.. versionadded:: 4.3"""
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'bc78b344a68ed16dd5d7f264681c4c76',
|
|
lambda d: d['data']['user']['edge_felix_video_timeline'],
|
|
lambda n: Post(self._context, n, self),
|
|
{'id': self.userid},
|
|
'https://www.instagram.com/{0}/channel/'.format(self.username),
|
|
self._metadata('edge_felix_video_timeline'),
|
|
)
|
|
|
|
def get_followers(self) -> NodeIterator['Profile']:
|
|
"""
|
|
Retrieve list of followers of given profile.
|
|
To use this, one needs to be logged in and private profiles has to be followed.
|
|
|
|
:rtype: NodeIterator[Profile]
|
|
"""
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to get a profile's followers.")
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'37479f2b8209594dde7facb0d904896a',
|
|
lambda d: d['data']['user']['edge_followed_by'],
|
|
lambda n: Profile(self._context, n),
|
|
{'id': str(self.userid)},
|
|
'https://www.instagram.com/{0}/'.format(self.username),
|
|
)
|
|
|
|
def get_followees(self) -> NodeIterator['Profile']:
|
|
"""
|
|
Retrieve list of followees (followings) of given profile.
|
|
To use this, one needs to be logged in and private profiles has to be followed.
|
|
|
|
:rtype: NodeIterator[Profile]
|
|
"""
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to get a profile's followees.")
|
|
self._obtain_metadata()
|
|
return NodeIterator(
|
|
self._context,
|
|
'58712303d941c6855d4e888c5f0cd22f',
|
|
lambda d: d['data']['user']['edge_follow'],
|
|
lambda n: Profile(self._context, n),
|
|
{'id': str(self.userid)},
|
|
'https://www.instagram.com/{0}/'.format(self.username),
|
|
)
|
|
|
|
def get_similar_accounts(self) -> Iterator['Profile']:
|
|
"""
|
|
Retrieve list of suggested / similar accounts for this profile.
|
|
To use this, one needs to be logged in.
|
|
|
|
.. versionadded:: 4.4
|
|
"""
|
|
if not self._context.is_logged_in:
|
|
raise LoginRequiredException("--login required to get a profile's similar accounts.")
|
|
self._obtain_metadata()
|
|
yield from (Profile(self._context, edge["node"]) for edge in
|
|
self._context.graphql_query("ad99dd9d3646cc3c0dda65debcd266a7",
|
|
{"user_id": str(self.userid), "include_chaining": True},
|
|
"https://www.instagram.com/{0}/"
|
|
.format(self.username))["data"]["user"]["edge_chaining"]["edges"])
|
|
|
|
|
|
class StoryItem:
|
|
"""
|
|
Structure containing information about a user story item i.e. image or video.
|
|
|
|
Created by method :meth:`Story.get_items`. This class implements == and is hashable.
|
|
|
|
:param context: :class:`InstaloaderContext` instance used for additional queries if necessary.
|
|
:param node: Dictionary containing the available information of the story item.
|
|
:param owner_profile: :class:`Profile` instance representing the story owner.
|
|
"""
|
|
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any], owner_profile: Optional[Profile] = None):
|
|
self._context = context
|
|
self._node = node
|
|
self._owner_profile = owner_profile
|
|
|
|
def _asdict(self):
|
|
node = self._node
|
|
if self._owner_profile:
|
|
node['owner'] = self._owner_profile._asdict()
|
|
return node
|
|
|
|
@property
|
|
def mediaid(self) -> int:
|
|
"""The mediaid is a decimal representation of the media shortcode."""
|
|
return int(self._node['id'])
|
|
|
|
@property
|
|
def shortcode(self) -> str:
|
|
"""Convert :attr:`~StoryItem.mediaid` to a shortcode-like string, allowing ``{shortcode}`` to be used with
|
|
:option:`--filename-pattern`."""
|
|
return Post.mediaid_to_shortcode(self.mediaid)
|
|
|
|
def __repr__(self):
|
|
return '<StoryItem {}>'.format(self.mediaid)
|
|
|
|
def __eq__(self, o: object) -> bool:
|
|
if isinstance(o, StoryItem):
|
|
return self.mediaid == o.mediaid
|
|
return NotImplemented
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.mediaid)
|
|
|
|
@property
|
|
def owner_profile(self) -> Profile:
|
|
""":class:`Profile` instance of the story item's owner."""
|
|
if not self._owner_profile:
|
|
self._owner_profile = Profile.from_id(self._context, self._node['owner']['id'])
|
|
assert self._owner_profile is not None
|
|
return self._owner_profile
|
|
|
|
@property
|
|
def owner_username(self) -> str:
|
|
"""The StoryItem owner's lowercase name."""
|
|
return self.owner_profile.username
|
|
|
|
@property
|
|
def owner_id(self) -> int:
|
|
"""The ID of the StoryItem owner."""
|
|
return self.owner_profile.userid
|
|
|
|
@property
|
|
def date_local(self) -> datetime:
|
|
"""Timestamp when the StoryItem was created (local time zone)."""
|
|
return datetime.fromtimestamp(self._node['taken_at_timestamp'])
|
|
|
|
@property
|
|
def date_utc(self) -> datetime:
|
|
"""Timestamp when the StoryItem was created (UTC)."""
|
|
return datetime.utcfromtimestamp(self._node['taken_at_timestamp'])
|
|
|
|
@property
|
|
def date(self) -> datetime:
|
|
"""Synonym to :attr:`~StoryItem.date_utc`"""
|
|
return self.date_utc
|
|
|
|
@property
|
|
def profile(self) -> str:
|
|
"""Synonym to :attr:`~StoryItem.owner_username`"""
|
|
return self.owner_username
|
|
|
|
@property
|
|
def expiring_local(self) -> datetime:
|
|
"""Timestamp when the StoryItem will get unavailable (local time zone)."""
|
|
return datetime.fromtimestamp(self._node['expiring_at_timestamp'])
|
|
|
|
@property
|
|
def expiring_utc(self) -> datetime:
|
|
"""Timestamp when the StoryItem will get unavailable (UTC)."""
|
|
return datetime.utcfromtimestamp(self._node['expiring_at_timestamp'])
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
"""URL of the picture / video thumbnail of the StoryItem"""
|
|
return self._node['display_resources'][-1]['src']
|
|
|
|
@property
|
|
def typename(self) -> str:
|
|
"""Type of post, GraphStoryImage or GraphStoryVideo"""
|
|
return self._node['__typename']
|
|
|
|
@property
|
|
def is_video(self) -> bool:
|
|
"""True if the StoryItem is a video."""
|
|
return self._node['is_video']
|
|
|
|
@property
|
|
def video_url(self) -> Optional[str]:
|
|
"""URL of the video, or None."""
|
|
if self.is_video:
|
|
return self._node['video_resources'][-1]['src']
|
|
return None
|
|
|
|
|
|
class Story:
|
|
"""
|
|
Structure representing a user story with its associated items.
|
|
|
|
Provides methods for accessing story properties, as well as :meth:`Story.get_items` to request associated
|
|
:class:`StoryItem` nodes. Stories are returned by :meth:`Instaloader.get_stories`.
|
|
|
|
With a logged-in :class:`Instaloader` instance `L`, you may download all your visible user stories with::
|
|
|
|
for story in L.get_stories():
|
|
# story is a Story object
|
|
for item in story.get_items():
|
|
# item is a StoryItem object
|
|
L.download_storyitem(item, ':stories')
|
|
|
|
This class implements == and is hashable.
|
|
|
|
:param context: :class:`InstaloaderContext` instance used for additional queries if necessary.
|
|
:param node: Dictionary containing the available information of the story as returned by Instagram.
|
|
"""
|
|
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
|
|
self._context = context
|
|
self._node = node
|
|
self._unique_id = None # type: Optional[str]
|
|
self._owner_profile = None # type: Optional[Profile]
|
|
|
|
def __repr__(self):
|
|
return '<Story by {} changed {:%Y-%m-%d_%H-%M-%S_UTC}>'.format(self.owner_username, self.latest_media_utc)
|
|
|
|
def __eq__(self, o: object) -> bool:
|
|
if isinstance(o, Story):
|
|
return self.unique_id == o.unique_id
|
|
return NotImplemented
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.unique_id)
|
|
|
|
@property
|
|
def unique_id(self) -> Union[str, int]:
|
|
"""
|
|
This ID only equals amongst :class:`Story` instances which have the same owner and the same set of
|
|
:class:`StoryItem`. For all other :class:`Story` instances this ID is different.
|
|
"""
|
|
if not self._unique_id:
|
|
id_list = [item.mediaid for item in self.get_items()]
|
|
id_list.sort()
|
|
self._unique_id = str().join([str(self.owner_id)] + list(map(str, id_list)))
|
|
return self._unique_id
|
|
|
|
@property
|
|
def last_seen_local(self) -> Optional[datetime]:
|
|
"""Timestamp when the story has last been watched or None (local time zone)."""
|
|
if self._node['seen']:
|
|
return datetime.fromtimestamp(self._node['seen'])
|
|
return None
|
|
|
|
@property
|
|
def last_seen_utc(self) -> Optional[datetime]:
|
|
"""Timestamp when the story has last been watched or None (UTC)."""
|
|
if self._node['seen']:
|
|
return datetime.utcfromtimestamp(self._node['seen'])
|
|
return None
|
|
|
|
@property
|
|
def latest_media_local(self) -> datetime:
|
|
"""Timestamp when the last item of the story was created (local time zone)."""
|
|
return datetime.fromtimestamp(self._node['latest_reel_media'])
|
|
|
|
@property
|
|
def latest_media_utc(self) -> datetime:
|
|
"""Timestamp when the last item of the story was created (UTC)."""
|
|
return datetime.utcfromtimestamp(self._node['latest_reel_media'])
|
|
|
|
@property
|
|
def itemcount(self) -> int:
|
|
"""Count of items associated with the :class:`Story` instance."""
|
|
return len(self._node['items'])
|
|
|
|
@property
|
|
def owner_profile(self) -> Profile:
|
|
""":class:`Profile` instance of the story owner."""
|
|
if not self._owner_profile:
|
|
self._owner_profile = Profile(self._context, self._node['user'])
|
|
return self._owner_profile
|
|
|
|
@property
|
|
def owner_username(self) -> str:
|
|
"""The story owner's lowercase username."""
|
|
return self.owner_profile.username
|
|
|
|
@property
|
|
def owner_id(self) -> int:
|
|
"""The story owner's ID."""
|
|
return self.owner_profile.userid
|
|
|
|
def get_items(self) -> Iterator[StoryItem]:
|
|
"""Retrieve all items from a story."""
|
|
yield from (StoryItem(self._context, item, self.owner_profile) for item in reversed(self._node['items']))
|
|
|
|
|
|
class Highlight(Story):
|
|
"""
|
|
Structure representing a user's highlight with its associated story items.
|
|
|
|
Provides methods for accessing highlight properties, as well as :meth:`Highlight.get_items` to request associated
|
|
:class:`StoryItem` nodes. Highlights are returned by :meth:`Instaloader.get_highlights`.
|
|
|
|
With a logged-in :class:`Instaloader` instance `L`, you may download all highlights of a :class:`Profile` instance
|
|
USER with::
|
|
|
|
for highlight in L.get_highlights(USER):
|
|
# highlight is a Highlight object
|
|
for item in highlight.get_items():
|
|
# item is a StoryItem object
|
|
L.download_storyitem(item, '{}/{}'.format(highlight.owner_username, highlight.title))
|
|
|
|
This class implements == and is hashable.
|
|
|
|
:param context: :class:`InstaloaderContext` instance used for additional queries if necessary.
|
|
:param node: Dictionary containing the available information of the highlight as returned by Instagram.
|
|
:param owner: :class:`Profile` instance representing the owner profile of the highlight.
|
|
"""
|
|
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any], owner: Optional[Profile] = None):
|
|
super().__init__(context, node)
|
|
self._owner_profile = owner
|
|
self._items = None # type: Optional[List[Dict[str, Any]]]
|
|
|
|
def __repr__(self):
|
|
return '<Highlight by {}: {}>'.format(self.owner_username, self.title)
|
|
|
|
@property
|
|
def unique_id(self) -> int:
|
|
"""A unique ID identifying this set of highlights."""
|
|
return int(self._node['id'])
|
|
|
|
@property
|
|
def owner_profile(self) -> Profile:
|
|
""":class:`Profile` instance of the highlights' owner."""
|
|
if not self._owner_profile:
|
|
self._owner_profile = Profile(self._context, self._node['owner'])
|
|
return self._owner_profile
|
|
|
|
@property
|
|
def title(self) -> str:
|
|
"""The title of these highlights."""
|
|
return self._node['title']
|
|
|
|
@property
|
|
def cover_url(self) -> str:
|
|
"""URL of the highlights' cover."""
|
|
return self._node['cover_media']['thumbnail_src']
|
|
|
|
@property
|
|
def cover_cropped_url(self) -> str:
|
|
"""URL of the cropped version of the cover."""
|
|
return self._node['cover_media_cropped_thumbnail']['url']
|
|
|
|
def _fetch_items(self):
|
|
if not self._items:
|
|
self._items = self._context.graphql_query("45246d3fe16ccc6577e0bd297a5db1ab",
|
|
{"reel_ids": [], "tag_names": [], "location_ids": [],
|
|
"highlight_reel_ids": [str(self.unique_id)],
|
|
"precomposed_overlay": False})['data']['reels_media'][0]['items']
|
|
|
|
@property
|
|
def itemcount(self) -> int:
|
|
"""Count of items associated with the :class:`Highlight` instance."""
|
|
self._fetch_items()
|
|
assert self._items is not None
|
|
return len(self._items)
|
|
|
|
def get_items(self) -> Iterator[StoryItem]:
|
|
"""Retrieve all associated highlight items."""
|
|
self._fetch_items()
|
|
assert self._items is not None
|
|
yield from (StoryItem(self._context, item, self.owner_profile) for item in self._items)
|
|
|
|
|
|
class Hashtag:
|
|
"""
|
|
An Hashtag.
|
|
|
|
Analogous to :class:`Profile`, get an instance with::
|
|
|
|
L = Instaloader()
|
|
hashtag = Hashtag.from_name(L.context, HASHTAG)
|
|
|
|
To then download the Hashtag's Posts, do::
|
|
|
|
for post in hashtag.get_posts():
|
|
L.download_post(post, target="#"+hashtag.name)
|
|
|
|
Also, this class implements == and is hashable.
|
|
"""
|
|
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
|
|
assert "name" in node
|
|
self._context = context
|
|
self._node = node
|
|
self._has_full_metadata = False
|
|
|
|
@classmethod
|
|
def from_name(cls, context: InstaloaderContext, name: str):
|
|
"""
|
|
Create a Hashtag instance from a given hashtag name, without preceeding '#'. Raises an Exception if there is no
|
|
hashtag with the given name.
|
|
|
|
:param context: :attr:`Instaloader.context`
|
|
:param name: Hashtag, without preceeding '#'
|
|
:raises: :class:`QueryReturnedNotFoundException`
|
|
"""
|
|
# pylint:disable=protected-access
|
|
hashtag = cls(context, {'name': name.lower()})
|
|
hashtag._obtain_metadata()
|
|
return hashtag
|
|
|
|
@property
|
|
def name(self):
|
|
"""Hashtag name lowercased, without preceeding '#'"""
|
|
return self._node["name"].lower()
|
|
|
|
def _query(self, params):
|
|
return self._context.get_json("explore/tags/{0}/".format(self.name),
|
|
params)["graphql"]["hashtag"]
|
|
|
|
def _obtain_metadata(self):
|
|
if not self._has_full_metadata:
|
|
self._node = self._query({"__a": 1})
|
|
self._has_full_metadata = True
|
|
|
|
def _asdict(self):
|
|
json_node = self._node.copy()
|
|
# remove posts
|
|
json_node.pop("edge_hashtag_to_top_posts", None)
|
|
json_node.pop("edge_hashtag_to_media", None)
|
|
return json_node
|
|
|
|
def __repr__(self):
|
|
return "<Hashtag #{}>".format(self.name)
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
if isinstance(other, Hashtag):
|
|
return self.name == other.name
|
|
return NotImplemented
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.name)
|
|
|
|
def _metadata(self, *keys) -> Any:
|
|
try:
|
|
d = self._node
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
except KeyError:
|
|
self._obtain_metadata()
|
|
d = self._node
|
|
for key in keys:
|
|
d = d[key]
|
|
return d
|
|
|
|
@property
|
|
def hashtagid(self) -> int:
|
|
return int(self._metadata("id"))
|
|
|
|
@property
|
|
def profile_pic_url(self) -> str:
|
|
return self._metadata("profile_pic_url")
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return self._metadata("description")
|
|
|
|
@property
|
|
def allow_following(self) -> bool:
|
|
return self._metadata("allow_following")
|
|
|
|
@property
|
|
def is_following(self) -> bool:
|
|
return self._metadata("is_following")
|
|
|
|
@property
|
|
def is_top_media_only(self) -> bool:
|
|
return self._metadata("is_top_media_only")
|
|
|
|
def get_related_tags(self) -> Iterator["Hashtag"]:
|
|
"""Yields similar hashtags."""
|
|
yield from (Hashtag(self._context, edge["node"])
|
|
for edge in self._metadata("edge_hashtag_to_related_tags", "edges"))
|
|
|
|
def get_top_posts(self) -> Iterator[Post]:
|
|
"""Yields the top posts of the hashtag."""
|
|
yield from (Post(self._context, edge["node"])
|
|
for edge in self._metadata("edge_hashtag_to_top_posts", "edges"))
|
|
|
|
@property
|
|
def mediacount(self) -> int:
|
|
"""
|
|
The count of all media associated with this hashtag.
|
|
|
|
The number of posts with a certain hashtag may differ from the number of posts that can actually be accessed, as
|
|
the hashtag count might include private posts
|
|
"""
|
|
return self._metadata("edge_hashtag_to_media", "count")
|
|
|
|
def get_posts(self) -> Iterator[Post]:
|
|
"""Yields the posts associated with this hashtag."""
|
|
self._metadata("edge_hashtag_to_media", "edges")
|
|
self._metadata("edge_hashtag_to_media", "page_info")
|
|
conn = self._metadata("edge_hashtag_to_media")
|
|
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
|
|
while conn["page_info"]["has_next_page"]:
|
|
data = self._query({'__a': 1, 'max_id': conn["page_info"]["end_cursor"]})
|
|
conn = data["edge_hashtag_to_media"]
|
|
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
|
|
|
|
def get_all_posts(self) -> Iterator[Post]:
|
|
"""Yields all posts, i.e. all most recent posts and the top posts, in chronological order."""
|
|
sorted_top_posts = iter(sorted(self.get_top_posts(), key=lambda p: p.date_utc, reverse=True))
|
|
other_posts = self.get_posts()
|
|
next_top = next(sorted_top_posts, None)
|
|
next_other = next(other_posts, None)
|
|
while next_top is not None or next_other is not None:
|
|
if next_other is None:
|
|
yield from sorted_top_posts
|
|
break
|
|
if next_top is None:
|
|
yield from other_posts
|
|
break
|
|
if next_top == next_other:
|
|
yield next_top
|
|
next_top = next(sorted_top_posts, None)
|
|
next_other = next(other_posts, None)
|
|
continue
|
|
if next_top.date_utc > next_other.date_utc:
|
|
yield next_top
|
|
next_top = next(sorted_top_posts, None)
|
|
else:
|
|
yield next_other
|
|
next_other = next(other_posts, None)
|
|
|
|
|
|
class TopSearchResults:
|
|
"""
|
|
An invocation of this class triggers a search on Instagram for the provided search string.
|
|
|
|
Provides methods to access the search results as profiles (:class:`Profile`), locations (:class:`PostLocation`) and
|
|
hashtags.
|
|
|
|
:param context: :attr:`Instaloader.context` used to send the query for the search.
|
|
:param searchstring: String to search for with Instagram's "top search".
|
|
"""
|
|
|
|
def __init__(self, context: InstaloaderContext, searchstring: str):
|
|
self._context = context
|
|
self._searchstring = searchstring
|
|
# The `__a` param is only needed to prevent `get_json()` from searching for 'window._sharedData'.
|
|
self._node = context.get_json('web/search/topsearch/',
|
|
params={'context': 'blended',
|
|
'query': searchstring,
|
|
'include_reel': False,
|
|
'__a': 1})
|
|
|
|
def get_profiles(self) -> Iterator[Profile]:
|
|
"""
|
|
Provides the :class:`Profile` instances from the search result.
|
|
"""
|
|
for user in self._node.get('users', []):
|
|
user_node = user['user']
|
|
if 'pk' in user_node:
|
|
user_node['id'] = user_node['pk']
|
|
yield Profile(self._context, user_node)
|
|
|
|
def get_prefixed_usernames(self) -> Iterator[str]:
|
|
"""
|
|
Provides all profile names from the search result that start with the search string.
|
|
"""
|
|
for user in self._node.get('users', []):
|
|
username = user.get('user', {}).get('username', '')
|
|
if username.startswith(self._searchstring):
|
|
yield username
|
|
|
|
def get_locations(self) -> Iterator[PostLocation]:
|
|
"""
|
|
Provides instances of :class:`PostLocation` from the search result.
|
|
"""
|
|
for location in self._node.get('places', []):
|
|
place = location.get('place', {})
|
|
slug = place.get('slug')
|
|
loc = place.get('location', {})
|
|
yield PostLocation(int(loc['pk']), loc['name'], slug, None, loc['lat'], loc['lng'])
|
|
|
|
def get_hashtag_strings(self) -> Iterator[str]:
|
|
"""
|
|
Provides the hashtags from the search result as strings.
|
|
"""
|
|
for hashtag in self._node.get('hashtags', []):
|
|
name = hashtag.get('hashtag', {}).get('name')
|
|
if name:
|
|
yield name
|
|
|
|
def get_hashtags(self) -> Iterator[Hashtag]:
|
|
"""
|
|
Provides the hashtags from the search result.
|
|
|
|
.. versionadded:: 4.4
|
|
"""
|
|
for hashtag in self._node.get('hashtags', []):
|
|
node = hashtag.get('hashtag', {})
|
|
if 'name' in node:
|
|
yield Hashtag(self._context, node)
|
|
|
|
@property
|
|
def searchstring(self) -> str:
|
|
"""
|
|
The string that was searched for on Instagram to produce this :class:`TopSearchResults` instance.
|
|
"""
|
|
return self._searchstring
|
|
|
|
|
|
JsonExportable = Union[Post, Profile, StoryItem, Hashtag, FrozenNodeIterator]
|
|
|
|
|
|
def save_structure_to_file(structure: JsonExportable, filename: str) -> None:
|
|
"""Saves a :class:`Post`, :class:`Profile`, :class:`StoryItem` or :class:`Hashtag` to a '.json' or '.json.xz' file
|
|
such that it can later be loaded by :func:`load_structure_from_file`.
|
|
|
|
If the specified filename ends in '.xz', the file will be LZMA compressed. Otherwise, a pretty-printed JSON file
|
|
will be created.
|
|
|
|
:param structure: :class:`Post`, :class:`Profile`, :class:`StoryItem` or :class:`Hashtag`
|
|
:param filename: Filename, ends in '.json' or '.json.xz'
|
|
"""
|
|
json_structure = {'node': structure._asdict(),
|
|
'instaloader': {'version': __version__, 'node_type': structure.__class__.__name__}}
|
|
compress = filename.endswith('.xz')
|
|
if compress:
|
|
with lzma.open(filename, 'wt', check=lzma.CHECK_NONE) as fp:
|
|
json.dump(json_structure, fp=fp, separators=(',', ':'))
|
|
else:
|
|
with open(filename, 'wt') as fp:
|
|
json.dump(json_structure, fp=fp, indent=4, sort_keys=True)
|
|
|
|
|
|
def load_structure_from_file(context: InstaloaderContext, filename: str) -> JsonExportable:
|
|
"""Loads a :class:`Post`, :class:`Profile`, :class:`StoryItem` or :class:`Hashtag` from a '.json' or '.json.xz' file
|
|
that has been saved by :func:`save_structure_to_file`.
|
|
|
|
:param context: :attr:`Instaloader.context` linked to the new object, used for additional queries if neccessary.
|
|
:param filename: Filename, ends in '.json' or '.json.xz'
|
|
"""
|
|
compressed = filename.endswith('.xz')
|
|
if compressed:
|
|
fp = lzma.open(filename, 'rt')
|
|
else:
|
|
fp = open(filename, 'rt')
|
|
json_structure = json.load(fp)
|
|
fp.close()
|
|
if 'node' in json_structure and 'instaloader' in json_structure and \
|
|
'node_type' in json_structure['instaloader']:
|
|
node_type = json_structure['instaloader']['node_type']
|
|
if node_type == "Post":
|
|
return Post(context, json_structure['node'])
|
|
elif node_type == "Profile":
|
|
return Profile(context, json_structure['node'])
|
|
elif node_type == "StoryItem":
|
|
return StoryItem(context, json_structure['node'])
|
|
elif node_type == "Hashtag":
|
|
return Hashtag(context, json_structure['node'])
|
|
elif node_type == "FrozenNodeIterator":
|
|
return FrozenNodeIterator(**json_structure['node'])
|
|
else:
|
|
raise InvalidArgumentException("{}: Not an Instaloader JSON.".format(filename))
|
|
elif 'shortcode' in json_structure:
|
|
# Post JSON created with Instaloader v3
|
|
return Post.from_shortcode(context, json_structure['shortcode'])
|
|
else:
|
|
raise InvalidArgumentException("{}: Not an Instaloader JSON.".format(filename))
|