Merge branch 'upcoming/v4.9'
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
"""Download pictures (or videos) along with their captions and other metadata from Instagram."""
|
||||
|
||||
|
||||
__version__ = '4.8.5'
|
||||
__version__ = '4.9b3'
|
||||
|
||||
|
||||
try:
|
||||
|
@@ -383,6 +383,9 @@ def main():
|
||||
g_how.add_argument('--resume-prefix', metavar='PREFIX',
|
||||
help='Prefix for filenames that are used to save the information to resume an interrupted '
|
||||
'download.')
|
||||
g_how.add_argument('--sanitize-paths', action='store_true',
|
||||
help='Sanitize paths so that the resulting file and directory names are valid on both '
|
||||
'Windows and Unix.')
|
||||
g_how.add_argument('--no-resume', action='store_true',
|
||||
help='Do not resume a previously-aborted download iteration, and do not save such information '
|
||||
'when interrupted.')
|
||||
@@ -463,7 +466,8 @@ def main():
|
||||
slide=args.slide,
|
||||
fatal_status_codes=args.abort_on,
|
||||
iphone_support=not args.no_iphone,
|
||||
title_pattern=args.title_pattern)
|
||||
title_pattern=args.title_pattern,
|
||||
sanitize_paths=args.sanitize_paths)
|
||||
_main(loader,
|
||||
args.profile,
|
||||
username=args.login.lower() if args.login is not None else None,
|
||||
|
@@ -22,6 +22,7 @@ from .exceptions import *
|
||||
from .instaloadercontext import InstaloaderContext, RateController
|
||||
from .lateststamps import LatestStamps
|
||||
from .nodeiterator import NodeIterator, resumable_iteration
|
||||
from .sectioniterator import SectionIterator
|
||||
from .structures import (Hashtag, Highlight, JsonExportable, Post, PostLocation, Profile, Story, StoryItem,
|
||||
load_structure_from_file, save_structure_to_file, PostSidecarNode, TitlePic)
|
||||
|
||||
@@ -136,20 +137,38 @@ class _ArbitraryItemFormatter(string.Formatter):
|
||||
|
||||
|
||||
class _PostPathFormatter(_ArbitraryItemFormatter):
|
||||
RESERVED: set = {'CON', 'PRN', 'AUX', 'NUL',
|
||||
'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
|
||||
'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'}
|
||||
|
||||
def __init__(self, item: Any, force_windows_path: bool = False):
|
||||
super().__init__(item)
|
||||
self.force_windows_path = force_windows_path
|
||||
|
||||
def get_value(self, key, args, kwargs):
|
||||
ret = super().get_value(key, args, kwargs)
|
||||
if not isinstance(ret, str):
|
||||
return ret
|
||||
return self.sanitize_path(ret)
|
||||
return self.sanitize_path(ret, self.force_windows_path)
|
||||
|
||||
@staticmethod
|
||||
def sanitize_path(ret: str) -> str:
|
||||
def sanitize_path(ret: str, force_windows_path: bool = False) -> str:
|
||||
"""Replaces '/' with similar looking Division Slash and some other illegal filename characters on Windows."""
|
||||
ret = ret.replace('/', '\u2215')
|
||||
if platform.system() == 'Windows':
|
||||
|
||||
if ret.startswith('.'):
|
||||
ret = ret.replace('.', '\u2024', 1)
|
||||
|
||||
if force_windows_path or platform.system() == 'Windows':
|
||||
ret = ret.replace(':', '\uff1a').replace('<', '\ufe64').replace('>', '\ufe65').replace('\"', '\uff02')
|
||||
ret = ret.replace('\\', '\ufe68').replace('|', '\uff5c').replace('?', '\ufe16').replace('*', '\uff0a')
|
||||
ret = ret.replace('\n', ' ').replace('\r', ' ')
|
||||
root, ext = os.path.splitext(ret)
|
||||
if root.upper() in _PostPathFormatter.RESERVED:
|
||||
root += '_'
|
||||
if ext == '.':
|
||||
ext = '\u2024'
|
||||
ret = root + ext
|
||||
return ret
|
||||
|
||||
|
||||
@@ -182,6 +201,7 @@ class Instaloader:
|
||||
:param slide: :option:`--slide`
|
||||
:param fatal_status_codes: :option:`--abort-on`
|
||||
:param iphone_support: not :option:`--no-iphone`
|
||||
:param sanitize_paths: :option:`--sanitize-paths`
|
||||
|
||||
.. attribute:: context
|
||||
|
||||
@@ -211,7 +231,8 @@ class Instaloader:
|
||||
slide: Optional[str] = None,
|
||||
fatal_status_codes: Optional[List[int]] = None,
|
||||
iphone_support: bool = True,
|
||||
title_pattern: Optional[str] = None):
|
||||
title_pattern: Optional[str] = None,
|
||||
sanitize_paths: bool = False):
|
||||
|
||||
self.context = InstaloaderContext(sleep, quiet, user_agent, max_connection_attempts,
|
||||
request_timeout, rate_controller, fatal_status_codes,
|
||||
@@ -228,6 +249,7 @@ class Instaloader:
|
||||
self.title_pattern = '{date_utc}_UTC_{typename}'
|
||||
else:
|
||||
self.title_pattern = '{target}_{date_utc}_UTC_{typename}'
|
||||
self.sanitize_paths = sanitize_paths
|
||||
self.download_pictures = download_pictures
|
||||
self.download_videos = download_videos
|
||||
self.download_video_thumbnails = download_video_thumbnails
|
||||
@@ -291,7 +313,8 @@ class Instaloader:
|
||||
check_resume_bbd=self.check_resume_bbd,
|
||||
slide=self.slide,
|
||||
fatal_status_codes=self.context.fatal_status_codes,
|
||||
iphone_support=self.context.iphone_support)
|
||||
iphone_support=self.context.iphone_support,
|
||||
sanitize_paths=self.sanitize_paths)
|
||||
yield new_loader
|
||||
self.context.error_log.extend(new_loader.context.error_log)
|
||||
new_loader.context.error_log = [] # avoid double-printing of errors
|
||||
@@ -511,9 +534,10 @@ class Instaloader:
|
||||
pic_bytes = http_response.content
|
||||
ig_filename = url.split('/')[-1].split('?')[0]
|
||||
pic_data = TitlePic(owner_profile, target, name_suffix, ig_filename, date_object)
|
||||
dirname = _PostPathFormatter(pic_data).format(self.dirname_pattern, target=target)
|
||||
filename_template = os.path.join(dirname,
|
||||
_PostPathFormatter(pic_data).format(self.title_pattern, target=target))
|
||||
dirname = _PostPathFormatter(pic_data, self.sanitize_paths).format(self.dirname_pattern, target=target)
|
||||
filename_template = os.path.join(
|
||||
dirname,
|
||||
_PostPathFormatter(pic_data, self.sanitize_paths).format(self.title_pattern, target=target))
|
||||
filename = self.__prepare_filename(filename_template, lambda: url) + ".jpg"
|
||||
content_length = http_response.headers.get('Content-Length', None)
|
||||
if os.path.isfile(filename) and (not self.context.is_logged_in or
|
||||
@@ -638,7 +662,7 @@ class Instaloader:
|
||||
"""Format filename of a :class:`Post` or :class:`StoryItem` according to ``filename-pattern`` parameter.
|
||||
|
||||
.. versionadded:: 4.1"""
|
||||
return _PostPathFormatter(item).format(self.filename_pattern, target=target)
|
||||
return _PostPathFormatter(item, self.sanitize_paths).format(self.filename_pattern, target=target)
|
||||
|
||||
def download_post(self, post: Post, target: Union[str, Path]) -> bool:
|
||||
"""
|
||||
@@ -670,7 +694,7 @@ class Instaloader:
|
||||
return False
|
||||
return True
|
||||
|
||||
dirname = _PostPathFormatter(post).format(self.dirname_pattern, target=target)
|
||||
dirname = _PostPathFormatter(post, self.sanitize_paths).format(self.dirname_pattern, target=target)
|
||||
filename_template = os.path.join(dirname, self.format_filename(post, target=target))
|
||||
filename = self.__prepare_filename(filename_template, lambda: post.url)
|
||||
|
||||
@@ -821,7 +845,7 @@ class Instaloader:
|
||||
last_scraped = latest_stamps.get_last_story_timestamp(name)
|
||||
scraped_timestamp = datetime.now().astimezone()
|
||||
for item in user_story.get_items():
|
||||
if latest_stamps is not None and item.date_utc.replace(tzinfo=timezone.utc) <= last_scraped:
|
||||
if latest_stamps is not None and item.date_local <= last_scraped:
|
||||
break
|
||||
if storyitem_filter is not None and not storyitem_filter(item):
|
||||
self.context.log("<{} skipped>".format(item), flush=True)
|
||||
@@ -851,7 +875,7 @@ class Instaloader:
|
||||
return True
|
||||
|
||||
date_local = item.date_local
|
||||
dirname = _PostPathFormatter(item).format(self.dirname_pattern, target=target)
|
||||
dirname = _PostPathFormatter(item, self.sanitize_paths).format(self.dirname_pattern, target=target)
|
||||
filename_template = os.path.join(dirname, self.format_filename(item, target=target))
|
||||
filename = self.__prepare_filename(filename_template, lambda: item.url)
|
||||
downloaded = False
|
||||
@@ -919,8 +943,9 @@ class Instaloader:
|
||||
name = user_highlight.owner_username
|
||||
highlight_target = (filename_target
|
||||
if filename_target
|
||||
else (Path(_PostPathFormatter.sanitize_path(name)) /
|
||||
_PostPathFormatter.sanitize_path(user_highlight.title))) # type: Union[str, Path]
|
||||
else (Path(_PostPathFormatter.sanitize_path(name, self.sanitize_paths)) /
|
||||
_PostPathFormatter.sanitize_path(user_highlight.title,
|
||||
self.sanitize_paths))) # type: Union[str, Path]
|
||||
self.context.log("Retrieving highlights \"{}\" from profile {}".format(user_highlight.title, name))
|
||||
self.download_highlight_cover(user_highlight, highlight_target)
|
||||
totalcount = user_highlight.itemcount
|
||||
@@ -970,7 +995,7 @@ class Instaloader:
|
||||
else total_count)
|
||||
sanitized_target = target
|
||||
if isinstance(target, str):
|
||||
sanitized_target = _PostPathFormatter.sanitize_path(target)
|
||||
sanitized_target = _PostPathFormatter.sanitize_path(target, self.sanitize_paths)
|
||||
if takewhile is None:
|
||||
takewhile = lambda _: True
|
||||
with resumable_iteration(
|
||||
@@ -1097,18 +1122,12 @@ class Instaloader:
|
||||
.. versionchanged:: 4.2.9
|
||||
Require being logged in (as required by Instagram)
|
||||
"""
|
||||
has_next_page = True
|
||||
end_cursor = None
|
||||
while has_next_page:
|
||||
if end_cursor:
|
||||
params = {'__a': 1, 'max_id': end_cursor}
|
||||
else:
|
||||
params = {'__a': 1}
|
||||
location_data = self.context.get_json('explore/locations/{0}/'.format(location),
|
||||
params)['graphql']['location']['edge_location_to_media']
|
||||
yield from (Post(self.context, edge['node']) for edge in location_data['edges'])
|
||||
has_next_page = location_data['page_info']['has_next_page']
|
||||
end_cursor = location_data['page_info']['end_cursor']
|
||||
yield from SectionIterator(
|
||||
self.context,
|
||||
lambda d: d["native_location_data"]["recent"],
|
||||
lambda m: Post.from_iphone_struct(self.context, m),
|
||||
f"explore/locations/{location}/",
|
||||
)
|
||||
|
||||
@_requires_login
|
||||
def download_location(self, location: str,
|
||||
@@ -1157,8 +1176,8 @@ class Instaloader:
|
||||
"""Get Posts associated with a #hashtag.
|
||||
|
||||
.. deprecated:: 4.4
|
||||
Use :meth:`Hashtag.get_posts`."""
|
||||
return Hashtag.from_name(self.context, hashtag).get_posts()
|
||||
Use :meth:`Hashtag.get_posts_resumable`."""
|
||||
return Hashtag.from_name(self.context, hashtag).get_posts_resumable()
|
||||
|
||||
def download_hashtag(self, hashtag: Union[Hashtag, str],
|
||||
max_count: Optional[int] = None,
|
||||
@@ -1194,7 +1213,7 @@ class Instaloader:
|
||||
self.download_hashtag_profilepic(hashtag)
|
||||
if posts:
|
||||
self.context.log("Retrieving pictures with hashtag #{}...".format(hashtag.name))
|
||||
self.posts_download_loop(hashtag.get_all_posts(), target, fast_update, post_filter,
|
||||
self.posts_download_loop(hashtag.get_posts_resumable(), target, fast_update, post_filter,
|
||||
max_count=max_count)
|
||||
if self.save_metadata:
|
||||
json_filename = '{0}/{1}'.format(self.dirname_pattern.format(profile=target,
|
||||
@@ -1216,15 +1235,15 @@ class Instaloader:
|
||||
posts_takewhile: Optional[Callable[[Post], bool]] = None
|
||||
if latest_stamps is not None:
|
||||
last_scraped = latest_stamps.get_last_tagged_timestamp(profile.username)
|
||||
posts_takewhile = lambda p: p.date_utc.replace(tzinfo=timezone.utc) > last_scraped
|
||||
posts_takewhile = lambda p: p.date_local > last_scraped
|
||||
tagged_posts = profile.get_tagged_posts()
|
||||
self.posts_download_loop(tagged_posts,
|
||||
target if target
|
||||
else (Path(_PostPathFormatter.sanitize_path(profile.username)) /
|
||||
_PostPathFormatter.sanitize_path(':tagged')),
|
||||
else (Path(_PostPathFormatter.sanitize_path(profile.username, self.sanitize_paths)) /
|
||||
_PostPathFormatter.sanitize_path(':tagged', self.sanitize_paths)),
|
||||
fast_update, post_filter, takewhile=posts_takewhile)
|
||||
if latest_stamps is not None and tagged_posts.first_item is not None:
|
||||
latest_stamps.set_last_tagged_timestamp(profile.username, tagged_posts.first_item.date_local.astimezone())
|
||||
latest_stamps.set_last_tagged_timestamp(profile.username, tagged_posts.first_item.date_local)
|
||||
|
||||
def download_igtv(self, profile: Profile, fast_update: bool = False,
|
||||
post_filter: Optional[Callable[[Post], bool]] = None,
|
||||
@@ -1239,12 +1258,12 @@ class Instaloader:
|
||||
posts_takewhile: Optional[Callable[[Post], bool]] = None
|
||||
if latest_stamps is not None:
|
||||
last_scraped = latest_stamps.get_last_igtv_timestamp(profile.username)
|
||||
posts_takewhile = lambda p: p.date_utc.replace(tzinfo=timezone.utc) > last_scraped
|
||||
posts_takewhile = lambda p: p.date_local > last_scraped
|
||||
igtv_posts = profile.get_igtv_posts()
|
||||
self.posts_download_loop(igtv_posts, profile.username, fast_update, post_filter,
|
||||
total_count=profile.igtvcount, owner_profile=profile, takewhile=posts_takewhile)
|
||||
if latest_stamps is not None and igtv_posts.first_item is not None:
|
||||
latest_stamps.set_last_igtv_timestamp(profile.username, igtv_posts.first_item.date_local.astimezone())
|
||||
latest_stamps.set_last_igtv_timestamp(profile.username, igtv_posts.first_item.date_local)
|
||||
|
||||
def _get_id_filename(self, profile_name: str) -> str:
|
||||
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
|
||||
@@ -1437,14 +1456,14 @@ class Instaloader:
|
||||
if latest_stamps is not None:
|
||||
# pylint:disable=cell-var-from-loop
|
||||
last_scraped = latest_stamps.get_last_post_timestamp(profile_name)
|
||||
posts_takewhile = lambda p: p.date_utc.replace(tzinfo=timezone.utc) > last_scraped
|
||||
posts_takewhile = lambda p: p.date_local > last_scraped
|
||||
posts_to_download = profile.get_posts()
|
||||
self.posts_download_loop(posts_to_download, profile_name, fast_update, post_filter,
|
||||
total_count=profile.mediacount, owner_profile=profile,
|
||||
takewhile=posts_takewhile)
|
||||
if latest_stamps is not None and posts_to_download.first_item is not None:
|
||||
latest_stamps.set_last_post_timestamp(profile_name,
|
||||
posts_to_download.first_item.date_local.astimezone())
|
||||
posts_to_download.first_item.date_local)
|
||||
|
||||
if stories and profiles:
|
||||
with self.context.error_catcher("Download stories"):
|
||||
|
@@ -33,7 +33,7 @@ def copy_session(session: requests.Session, request_timeout: Optional[float] = N
|
||||
|
||||
def default_user_agent() -> str:
|
||||
return 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
|
||||
'(KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36'
|
||||
'(KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36'
|
||||
|
||||
|
||||
class InstaloaderContext:
|
||||
|
46
instaloader/sectioniterator.py
Normal file
46
instaloader/sectioniterator.py
Normal file
@@ -0,0 +1,46 @@
|
||||
from typing import Any, Callable, Dict, Iterator, Optional, TypeVar
|
||||
|
||||
from .instaloadercontext import InstaloaderContext
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
class SectionIterator(Iterator[T]):
|
||||
"""Iterator for the new 'sections'-style responses.
|
||||
|
||||
.. versionadded:: 4.9"""
|
||||
def __init__(self,
|
||||
context: InstaloaderContext,
|
||||
sections_extractor: Callable[[Dict[str, Any]], Dict[str, Any]],
|
||||
media_wrapper: Callable[[Dict], T],
|
||||
query_path: str,
|
||||
first_data: Optional[Dict[str, Any]] = None):
|
||||
self._context = context
|
||||
self._sections_extractor = sections_extractor
|
||||
self._media_wrapper = media_wrapper
|
||||
self._query_path = query_path
|
||||
self._data = first_data or self._query()
|
||||
self._page_index = 0
|
||||
self._section_index = 0
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def _query(self, max_id: Optional[str] = None) -> Dict[str, Any]:
|
||||
pagination_variables = {"max_id": max_id} if max_id is not None else {}
|
||||
return self._sections_extractor(
|
||||
self._context.get_json(self._query_path, params={"__a": 1, **pagination_variables})
|
||||
)
|
||||
|
||||
def __next__(self) -> T:
|
||||
if self._page_index < len(self._data['sections']):
|
||||
media = self._data['sections'][self._page_index]['layout_content']['medias'][self._section_index]['media']
|
||||
self._section_index += 1
|
||||
if self._section_index >= len(self._data['sections'][self._page_index]['layout_content']['medias']):
|
||||
self._section_index = 0
|
||||
self._page_index += 1
|
||||
return self._media_wrapper(media)
|
||||
if self._data['more_available']:
|
||||
self._page_index, self._section_index, self._data = 0, 0, self._query(self._data["next_max_id"])
|
||||
return self.__next__()
|
||||
raise StopIteration()
|
@@ -3,7 +3,9 @@ import lzma
|
||||
import re
|
||||
from base64 import b64decode, b64encode
|
||||
from collections import namedtuple
|
||||
from contextlib import suppress
|
||||
from datetime import datetime
|
||||
from itertools import islice
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
|
||||
from unicodedata import normalize
|
||||
@@ -12,6 +14,7 @@ from . import __version__
|
||||
from .exceptions import *
|
||||
from .instaloadercontext import InstaloaderContext
|
||||
from .nodeiterator import FrozenNodeIterator, NodeIterator
|
||||
from .sectioniterator import SectionIterator
|
||||
|
||||
PostSidecarNode = namedtuple('PostSidecarNode', ['is_video', 'display_url', 'video_url'])
|
||||
PostSidecarNode.__doc__ = "Item of a Sidecar Post."
|
||||
@@ -90,6 +93,41 @@ class Post:
|
||||
"""Create a post object from a given mediaid"""
|
||||
return cls.from_shortcode(context, Post.mediaid_to_shortcode(mediaid))
|
||||
|
||||
@classmethod
|
||||
def from_iphone_struct(cls, context: InstaloaderContext, media: Dict[str, Any]):
|
||||
"""Create a post from a given iphone_struct.
|
||||
|
||||
.. versionadded:: 4.9"""
|
||||
media_types = {
|
||||
1: "GraphImage",
|
||||
2: "GraphVideo",
|
||||
8: "GraphSidecar",
|
||||
}
|
||||
fake_node = {
|
||||
"shortcode": media["code"],
|
||||
"id": media["pk"],
|
||||
"__typename": media_types[media["media_type"]],
|
||||
"is_video": media_types[media["media_type"]] == "GraphVideo",
|
||||
"date": media["taken_at"],
|
||||
"caption": media["caption"].get("text") if media.get("caption") is not None else None,
|
||||
"title": media.get("title"),
|
||||
"viewer_has_liked": media["has_liked"],
|
||||
"edge_media_preview_like": {"count": media["like_count"]},
|
||||
"iphone_struct": media,
|
||||
}
|
||||
with suppress(KeyError):
|
||||
fake_node["display_url"] = media['image_versions2']['candidates'][0]['url']
|
||||
with suppress(KeyError):
|
||||
fake_node["video_url"] = media['video_versions'][-1]['url']
|
||||
fake_node["video_duration"] = media["video_duration"]
|
||||
fake_node["video_view_count"] = media["view_count"]
|
||||
with suppress(KeyError):
|
||||
fake_node["edge_sidecar_to_children"] = {"edges": [{"node": {
|
||||
"display_url": node['image_versions2']['candidates'][0]['url'],
|
||||
"is_video": media_types[node["media_type"]] == "GraphVideo",
|
||||
}} for node in media["carousel_media"]]}
|
||||
return cls(context, fake_node, Profile.from_iphone_struct(context, media["user"]) if "user" in media else None)
|
||||
|
||||
@staticmethod
|
||||
def shortcode_to_mediaid(code: str) -> int:
|
||||
if len(code) > 11:
|
||||
@@ -225,17 +263,16 @@ class Post:
|
||||
|
||||
@property
|
||||
def date_local(self) -> datetime:
|
||||
"""Timestamp when the post was created (local time zone)."""
|
||||
return datetime.fromtimestamp(self._node["date"]
|
||||
if "date" in self._node
|
||||
else self._node["taken_at_timestamp"])
|
||||
"""Timestamp when the post was created (local time zone).
|
||||
|
||||
.. versionchanged:: 4.9
|
||||
Return timezone aware datetime object."""
|
||||
return datetime.fromtimestamp(self._get_timestamp_date_created()).astimezone()
|
||||
|
||||
@property
|
||||
def date_utc(self) -> datetime:
|
||||
"""Timestamp when the post was created (UTC)."""
|
||||
return datetime.utcfromtimestamp(self._node["date"]
|
||||
if "date" in self._node
|
||||
else self._node["taken_at_timestamp"])
|
||||
return datetime.utcfromtimestamp(self._get_timestamp_date_created())
|
||||
|
||||
@property
|
||||
def date(self) -> datetime:
|
||||
@@ -276,6 +313,12 @@ class Post:
|
||||
return len(edges)
|
||||
return 1
|
||||
|
||||
def _get_timestamp_date_created(self) -> float:
|
||||
"""Timestamp when the post was created"""
|
||||
return (self._node["date"]
|
||||
if "date" in self._node
|
||||
else self._node["taken_at_timestamp"])
|
||||
|
||||
def get_is_videos(self) -> List[bool]:
|
||||
"""
|
||||
Return a list containing the ``is_video`` property for each media in the post.
|
||||
@@ -360,6 +403,16 @@ class Post:
|
||||
return (pcaption[:30] + u"\u2026") if len(pcaption) > 31 else pcaption
|
||||
return _elliptify(self.caption) if self.caption else ''
|
||||
|
||||
@property
|
||||
def accessibility_caption(self) -> Optional[str]:
|
||||
"""Accessibility caption of the post, if available.
|
||||
|
||||
.. versionadded:: 4.9"""
|
||||
try:
|
||||
return self._field("accessibility_caption")
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
@property
|
||||
def tagged_users(self) -> List[str]:
|
||||
"""List of all lowercased users that are tagged in the Post."""
|
||||
@@ -666,6 +719,20 @@ class Profile:
|
||||
context.profile_id_cache[profile_id] = profile
|
||||
return profile
|
||||
|
||||
@classmethod
|
||||
def from_iphone_struct(cls, context: InstaloaderContext, media: Dict[str, Any]):
|
||||
"""Create a profile from a given iphone_struct.
|
||||
|
||||
.. versionadded:: 4.9"""
|
||||
return cls(context, {
|
||||
"id": media["pk"],
|
||||
"username": media["username"],
|
||||
"is_private": media["is_private"],
|
||||
"full_name": media["full_name"],
|
||||
"profile_pic_url_hd": media["profile_pic_url"],
|
||||
"iphone_struct": media,
|
||||
})
|
||||
|
||||
@classmethod
|
||||
def own_profile(cls, context: InstaloaderContext):
|
||||
"""Return own profile if logged-in.
|
||||
@@ -1048,6 +1115,21 @@ class StoryItem:
|
||||
def __hash__(self) -> int:
|
||||
return hash(self.mediaid)
|
||||
|
||||
@classmethod
|
||||
def from_mediaid(cls, context: InstaloaderContext, mediaid: int):
|
||||
"""Create a StoryItem object from a given mediaid.
|
||||
|
||||
.. versionadded:: 4.9
|
||||
"""
|
||||
pic_json = context.graphql_query(
|
||||
'2b0673e0dc4580674a88d426fe00ea90',
|
||||
{'shortcode': Post.mediaid_to_shortcode(mediaid)}
|
||||
)
|
||||
shortcode_media = pic_json['data']['shortcode_media']
|
||||
if shortcode_media is None:
|
||||
raise BadResponseException("Fetching StoryItem metadata failed.")
|
||||
return cls(context, shortcode_media)
|
||||
|
||||
@property
|
||||
def _iphone_struct(self) -> Dict[str, Any]:
|
||||
if not self._context.iphone_support:
|
||||
@@ -1079,8 +1161,11 @@ class StoryItem:
|
||||
|
||||
@property
|
||||
def date_local(self) -> datetime:
|
||||
"""Timestamp when the StoryItem was created (local time zone)."""
|
||||
return datetime.fromtimestamp(self._node['taken_at_timestamp'])
|
||||
"""Timestamp when the StoryItem was created (local time zone).
|
||||
|
||||
.. versionchanged:: 4.9
|
||||
Return timezone aware datetime object."""
|
||||
return datetime.fromtimestamp(self._node['taken_at_timestamp']).astimezone()
|
||||
|
||||
@property
|
||||
def date_utc(self) -> datetime:
|
||||
@@ -1360,6 +1445,9 @@ class Hashtag:
|
||||
L.download_post(post, target="#"+hashtag.name)
|
||||
|
||||
Also, this class implements == and is hashable.
|
||||
|
||||
.. versionchanged:: 4.9
|
||||
Removed ``get_related_tags()`` and ``is_top_media_only`` as these features were removed from Instagram.
|
||||
"""
|
||||
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
|
||||
assert "name" in node
|
||||
@@ -1388,8 +1476,8 @@ class Hashtag:
|
||||
return self._node["name"].lower()
|
||||
|
||||
def _query(self, params):
|
||||
return self._context.get_json("explore/tags/{0}/".format(self.name),
|
||||
params)["graphql"]["hashtag"]
|
||||
json_response = self._context.get_json("explore/tags/{0}/".format(self.name), params)
|
||||
return json_response["graphql"]["hashtag"] if "graphql" in json_response else json_response["data"]
|
||||
|
||||
def _obtain_metadata(self):
|
||||
if not self._has_full_metadata:
|
||||
@@ -1400,7 +1488,9 @@ class Hashtag:
|
||||
json_node = self._node.copy()
|
||||
# remove posts
|
||||
json_node.pop("edge_hashtag_to_top_posts", None)
|
||||
json_node.pop("top", None)
|
||||
json_node.pop("edge_hashtag_to_media", None)
|
||||
json_node.pop("recent", None)
|
||||
return json_node
|
||||
|
||||
def __repr__(self):
|
||||
@@ -1436,30 +1526,33 @@ class Hashtag:
|
||||
return self._metadata("profile_pic_url")
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
def description(self) -> Optional[str]:
|
||||
return self._metadata("description")
|
||||
|
||||
@property
|
||||
def allow_following(self) -> bool:
|
||||
return self._metadata("allow_following")
|
||||
return bool(self._metadata("allow_following"))
|
||||
|
||||
@property
|
||||
def is_following(self) -> bool:
|
||||
return self._metadata("is_following")
|
||||
|
||||
@property
|
||||
def is_top_media_only(self) -> bool:
|
||||
return self._metadata("is_top_media_only")
|
||||
|
||||
def get_related_tags(self) -> Iterator["Hashtag"]:
|
||||
"""Yields similar hashtags."""
|
||||
yield from (Hashtag(self._context, edge["node"])
|
||||
for edge in self._metadata("edge_hashtag_to_related_tags", "edges"))
|
||||
try:
|
||||
return self._metadata("is_following")
|
||||
except KeyError:
|
||||
return bool(self._metadata("following"))
|
||||
|
||||
def get_top_posts(self) -> Iterator[Post]:
|
||||
"""Yields the top posts of the hashtag."""
|
||||
yield from (Post(self._context, edge["node"])
|
||||
for edge in self._metadata("edge_hashtag_to_top_posts", "edges"))
|
||||
try:
|
||||
yield from (Post(self._context, edge["node"])
|
||||
for edge in self._metadata("edge_hashtag_to_top_posts", "edges"))
|
||||
except KeyError:
|
||||
yield from SectionIterator(
|
||||
self._context,
|
||||
lambda d: d["data"]["top"],
|
||||
lambda m: Post.from_iphone_struct(self._context, m),
|
||||
f"explore/tags/{self.name}/",
|
||||
self._metadata("top"),
|
||||
)
|
||||
|
||||
@property
|
||||
def mediacount(self) -> int:
|
||||
@@ -1469,23 +1562,38 @@ class Hashtag:
|
||||
The number of posts with a certain hashtag may differ from the number of posts that can actually be accessed, as
|
||||
the hashtag count might include private posts
|
||||
"""
|
||||
return self._metadata("edge_hashtag_to_media", "count")
|
||||
try:
|
||||
return self._metadata("edge_hashtag_to_media", "count")
|
||||
except KeyError:
|
||||
return self._metadata("media_count")
|
||||
|
||||
def get_posts(self) -> Iterator[Post]:
|
||||
"""Yields the posts associated with this hashtag."""
|
||||
self._metadata("edge_hashtag_to_media", "edges")
|
||||
self._metadata("edge_hashtag_to_media", "page_info")
|
||||
conn = self._metadata("edge_hashtag_to_media")
|
||||
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
|
||||
while conn["page_info"]["has_next_page"]:
|
||||
data = self._query({'__a': 1, 'max_id': conn["page_info"]["end_cursor"]})
|
||||
conn = data["edge_hashtag_to_media"]
|
||||
"""Yields the recent posts associated with this hashtag.
|
||||
|
||||
.. deprecated:: 4.9
|
||||
Use :meth:`Hashtag.get_posts_resumable` as this method may return incorrect results (:issue:`1457`)"""
|
||||
try:
|
||||
self._metadata("edge_hashtag_to_media", "edges")
|
||||
self._metadata("edge_hashtag_to_media", "page_info")
|
||||
conn = self._metadata("edge_hashtag_to_media")
|
||||
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
|
||||
while conn["page_info"]["has_next_page"]:
|
||||
data = self._query({'__a': 1, 'max_id': conn["page_info"]["end_cursor"]})
|
||||
conn = data["edge_hashtag_to_media"]
|
||||
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
|
||||
except KeyError:
|
||||
yield from SectionIterator(
|
||||
self._context,
|
||||
lambda d: d["data"]["recent"],
|
||||
lambda m: Post.from_iphone_struct(self._context, m),
|
||||
f"explore/tags/{self.name}/",
|
||||
self._metadata("recent"),
|
||||
)
|
||||
|
||||
def get_all_posts(self) -> Iterator[Post]:
|
||||
"""Yields all posts, i.e. all most recent posts and the top posts, in almost-chronological order."""
|
||||
sorted_top_posts = iter(sorted(self.get_top_posts(), key=lambda p: p.date_utc, reverse=True))
|
||||
other_posts = self.get_posts()
|
||||
sorted_top_posts = iter(sorted(islice(self.get_top_posts(), 9), key=lambda p: p.date_utc, reverse=True))
|
||||
other_posts = self.get_posts_resumable()
|
||||
next_top = next(sorted_top_posts, None)
|
||||
next_other = next(other_posts, None)
|
||||
while next_top is not None or next_other is not None:
|
||||
@@ -1511,6 +1619,20 @@ class Hashtag:
|
||||
yield next_other
|
||||
next_other = next(other_posts, None)
|
||||
|
||||
def get_posts_resumable(self) -> NodeIterator[Post]:
|
||||
"""Get the recent posts of the hashtag in a resumable fashion.
|
||||
|
||||
:rtype: NodeIterator[Post]
|
||||
|
||||
.. versionadded:: 4.9"""
|
||||
return NodeIterator(
|
||||
self._context, "9b498c08113f1e09617a1703c22b2f32",
|
||||
lambda d: d['data']['hashtag']['edge_hashtag_to_media'],
|
||||
lambda n: Post(self._context, n),
|
||||
{'tag_name': self.name},
|
||||
f"https://www.instagram.com/explore/tags/{self.name}/"
|
||||
)
|
||||
|
||||
|
||||
class TopSearchResults:
|
||||
"""
|
||||
@@ -1714,6 +1836,7 @@ def load_structure_from_file(context: InstaloaderContext, filename: str) -> Json
|
||||
if compressed:
|
||||
fp = lzma.open(filename, 'rt')
|
||||
else:
|
||||
# pylint:disable=consider-using-with
|
||||
fp = open(filename, 'rt')
|
||||
json_structure = json.load(fp)
|
||||
fp.close()
|
||||
|
Reference in New Issue
Block a user