
Fixes #178, where with `--profile-pic-only` an error message is printed (without any further implications) when downloading profile pictures of private profiles without being logged in.
964 lines
48 KiB
Python
964 lines
48 KiB
Python
import getpass
|
|
import json
|
|
import os
|
|
import platform
|
|
import re
|
|
import shutil
|
|
import string
|
|
import sys
|
|
import tempfile
|
|
from contextlib import contextmanager, suppress
|
|
from datetime import datetime, timezone
|
|
from functools import wraps
|
|
from io import BytesIO
|
|
from typing import Any, Callable, Iterator, List, Optional, Set, Union
|
|
|
|
from .exceptions import *
|
|
from .instaloadercontext import InstaloaderContext
|
|
from .structures import Highlight, JsonExportable, Post, PostLocation, Profile, Story, StoryItem, save_structure_to_file
|
|
|
|
|
|
def get_default_session_filename(username: str) -> str:
|
|
"""Returns default session filename for given username."""
|
|
dirname = tempfile.gettempdir() + "/" + ".instaloader-" + getpass.getuser()
|
|
filename = dirname + "/" + "session-" + username
|
|
return filename.lower()
|
|
|
|
|
|
def format_string_contains_key(format_string: str, key: str) -> bool:
|
|
# pylint:disable=unused-variable
|
|
for literal_text, field_name, format_spec, conversion in string.Formatter().parse(format_string):
|
|
if field_name and (field_name == key or field_name.startswith(key + '.')):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _requires_login(func: Callable) -> Callable:
|
|
"""Decorator to raise an exception if herewith-decorated function is called without being logged in"""
|
|
@wraps(func)
|
|
def call(instaloader, *args, **kwargs):
|
|
if not instaloader.context.is_logged_in:
|
|
raise LoginRequiredException("--login=USERNAME required.")
|
|
return func(instaloader, *args, **kwargs)
|
|
# pylint:disable=no-member
|
|
call.__doc__ += ":raises LoginRequiredException: If called without being logged in.\n"
|
|
return call
|
|
|
|
|
|
class _ArbitraryItemFormatter(string.Formatter):
|
|
def __init__(self, item: Any):
|
|
self._item = item
|
|
|
|
def get_value(self, key, args, kwargs):
|
|
"""Override to substitute {ATTRIBUTE} by attributes of our _item."""
|
|
if hasattr(self._item, key):
|
|
return getattr(self._item, key)
|
|
return super().get_value(key, args, kwargs)
|
|
|
|
def format_field(self, value, format_spec):
|
|
"""Override :meth:`string.Formatter.format_field` to have our
|
|
default format_spec for :class:`datetime.Datetime` objects, and to
|
|
let None yield an empty string rather than ``None``."""
|
|
if isinstance(value, datetime) and not format_spec:
|
|
return super().format_field(value, '%Y-%m-%d_%H-%M-%S')
|
|
if value is None:
|
|
return ''
|
|
return super().format_field(value, format_spec)
|
|
|
|
|
|
class _PostPathFormatter(_ArbitraryItemFormatter):
|
|
def vformat(self, format_string, args, kwargs):
|
|
"""Override :meth:`string.Formatter.vformat` for character substitution in paths for Windows, see issue #84."""
|
|
ret = super().vformat(format_string, args, kwargs)
|
|
if platform.system() == 'Windows':
|
|
ret = ret.replace(':', '\ua789').replace('<', '\ufe64').replace('>', '\ufe65').replace('\"', '\uff02')
|
|
ret = ret.replace('\\', '\uff3c').replace('|', '\uff5c').replace('?', '\ufe16').replace('*', '\uff0a')
|
|
return ret
|
|
|
|
|
|
class Instaloader:
|
|
"""Instaloader Class.
|
|
|
|
:param quiet: :option:`--quiet`
|
|
:param user_agent: :option:`--user-agent`
|
|
:param dirname_pattern: :option:`--dirname-pattern`, default is ``{target}``
|
|
:param filename_pattern: :option:`--filename-pattern`, default is ``{date_utc}_UTC``
|
|
:param download_pictures: not :option:`--no-pictures`
|
|
:param download_videos: not :option:`--no-videos`
|
|
:param download_video_thumbnails: not :option:`--no-video-thumbnails`
|
|
:param download_geotags: :option:`--geotags`
|
|
:param download_comments: :option:`--comments`
|
|
:param save_metadata: not :option:`--no-metadata-json`
|
|
:param compress_json: not :option:`--no-compress-json`
|
|
:param post_metadata_txt_pattern:
|
|
:option:`--post-metadata-txt`, default is ``{caption}``. Set to empty string to avoid creation of post metadata
|
|
txt file.
|
|
:param storyitem_metadata_txt_pattern: :option:`--storyitem-metadata-txt`, default is empty (=none)
|
|
:param max_connection_attempts: :option:`--max-connection-attempts`
|
|
|
|
.. attribute:: context
|
|
|
|
The associated :class:`InstaloaderContext` with low-level communication functions and logging.
|
|
"""
|
|
|
|
def __init__(self,
|
|
sleep: bool = True, quiet: bool = False,
|
|
user_agent: Optional[str] = None,
|
|
dirname_pattern: Optional[str] = None,
|
|
filename_pattern: Optional[str] = None,
|
|
download_pictures=True,
|
|
download_videos: bool = True,
|
|
download_video_thumbnails: bool = True,
|
|
download_geotags: bool = True,
|
|
download_comments: bool = True,
|
|
save_metadata: bool = True,
|
|
compress_json: bool = True,
|
|
post_metadata_txt_pattern: str = None,
|
|
storyitem_metadata_txt_pattern: str = None,
|
|
graphql_rate_limit: Optional[int] = None,
|
|
max_connection_attempts: int = 3):
|
|
|
|
self.context = InstaloaderContext(sleep, quiet, user_agent, graphql_rate_limit, max_connection_attempts)
|
|
|
|
# configuration parameters
|
|
self.dirname_pattern = dirname_pattern or "{target}"
|
|
self.filename_pattern = filename_pattern or "{date_utc}_UTC"
|
|
self.download_pictures = download_pictures
|
|
self.download_videos = download_videos
|
|
self.download_video_thumbnails = download_video_thumbnails
|
|
self.download_geotags = download_geotags
|
|
self.download_comments = download_comments
|
|
self.save_metadata = save_metadata
|
|
self.compress_json = compress_json
|
|
self.post_metadata_txt_pattern = '{caption}' if post_metadata_txt_pattern is None \
|
|
else post_metadata_txt_pattern
|
|
self.storyitem_metadata_txt_pattern = '' if storyitem_metadata_txt_pattern is None \
|
|
else storyitem_metadata_txt_pattern
|
|
|
|
@contextmanager
|
|
def anonymous_copy(self):
|
|
"""Yield an anonymous, otherwise equally-configured copy of an Instaloader instance; Then copy its error log."""
|
|
new_loader = Instaloader(self.context.sleep, self.context.quiet, self.context.user_agent, self.dirname_pattern,
|
|
self.filename_pattern, download_pictures=self.download_pictures,
|
|
download_videos=self.download_videos,
|
|
download_video_thumbnails=self.download_video_thumbnails,
|
|
download_geotags=self.download_geotags, download_comments=self.download_comments,
|
|
save_metadata=self.save_metadata, compress_json=self.compress_json,
|
|
post_metadata_txt_pattern=self.post_metadata_txt_pattern,
|
|
storyitem_metadata_txt_pattern=self.storyitem_metadata_txt_pattern,
|
|
graphql_rate_limit=self.context.graphql_count_per_slidingwindow,
|
|
max_connection_attempts=self.context.max_connection_attempts)
|
|
new_loader.context.query_timestamps = self.context.query_timestamps
|
|
yield new_loader
|
|
self.context.error_log.extend(new_loader.context.error_log)
|
|
new_loader.context.error_log = [] # avoid double-printing of errors
|
|
self.context.query_timestamps = new_loader.context.query_timestamps
|
|
new_loader.close()
|
|
|
|
def close(self):
|
|
"""Close associated session objects and repeat error log."""
|
|
self.context.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.close()
|
|
|
|
def download_pic(self, filename: str, url: str, mtime: datetime, filename_suffix: Optional[str] = None) -> bool:
|
|
"""Downloads and saves picture with given url under given directory with given timestamp.
|
|
Returns true, if file was actually downloaded, i.e. updated."""
|
|
urlmatch = re.search('\\.[a-z0-9]*\\?', url)
|
|
file_extension = url[-3:] if urlmatch is None else urlmatch.group(0)[1:-1]
|
|
if filename_suffix is not None:
|
|
filename += '_' + filename_suffix
|
|
filename += '.' + file_extension
|
|
if os.path.isfile(filename):
|
|
self.context.log(filename + ' exists', end=' ', flush=True)
|
|
return False
|
|
self.context.get_and_write_raw(url, filename)
|
|
os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
|
|
return True
|
|
|
|
def save_metadata_json(self, filename: str, structure: JsonExportable) -> None:
|
|
"""Saves metadata JSON file of a structure."""
|
|
if self.compress_json:
|
|
filename += '.json.xz'
|
|
else:
|
|
filename += '.json'
|
|
save_structure_to_file(structure, filename)
|
|
if isinstance(structure, (Post, StoryItem)):
|
|
# log 'json ' message when saving Post or StoryItem
|
|
self.context.log('json', end=' ', flush=True)
|
|
|
|
def update_comments(self, filename: str, post: Post) -> None:
|
|
def _postcomment_asdict(comment):
|
|
return {'id': comment.id,
|
|
'created_at': int(comment.created_at_utc.replace(tzinfo=timezone.utc).timestamp()),
|
|
'text': comment.text,
|
|
'owner': comment.owner._asdict()}
|
|
filename += '_comments.json'
|
|
try:
|
|
with open(filename) as fp:
|
|
comments = json.load(fp)
|
|
except (FileNotFoundError, json.decoder.JSONDecodeError):
|
|
comments = list()
|
|
comments.extend(_postcomment_asdict(comment) for comment in post.get_comments())
|
|
if comments:
|
|
comments_list = sorted(sorted(list(comments), key=lambda t: int(t['id'])),
|
|
key=lambda t: int(t['created_at']), reverse=True)
|
|
unique_comments_list = [comments_list[0]]
|
|
#for comment in comments_list:
|
|
# if unique_comments_list[-1]['id'] != comment['id']:
|
|
# unique_comments_list.append(comment)
|
|
#file.write(json.dumps(unique_comments_list, indent=4))
|
|
for x, y in zip(comments_list[:-1], comments_list[1:]):
|
|
if x['id'] != y['id']:
|
|
unique_comments_list.append(y)
|
|
with open(filename, 'w') as file:
|
|
file.write(json.dumps(unique_comments_list, indent=4))
|
|
self.context.log('comments', end=' ', flush=True)
|
|
|
|
def save_caption(self, filename: str, mtime: datetime, caption: str) -> None:
|
|
"""Updates picture caption / Post metadata info"""
|
|
def _elliptify(caption):
|
|
pcaption = caption.replace('\n', ' ').strip()
|
|
return '[' + ((pcaption[:29] + u"\u2026") if len(pcaption) > 31 else pcaption) + ']'
|
|
filename += '.txt'
|
|
caption += '\n'
|
|
pcaption = _elliptify(caption)
|
|
caption = caption.encode("UTF-8")
|
|
with suppress(FileNotFoundError):
|
|
with open(filename, 'rb') as file:
|
|
file_caption = file.read()
|
|
if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'):
|
|
try:
|
|
self.context.log(pcaption + ' unchanged', end=' ', flush=True)
|
|
except UnicodeEncodeError:
|
|
self.context.log('txt unchanged', end=' ', flush=True)
|
|
return None
|
|
else:
|
|
def get_filename(index):
|
|
return filename if index == 0 else (filename[:-4] + '_old_' +
|
|
(str(0) if index < 10 else str()) + str(index) + filename[-4:])
|
|
|
|
i = 0
|
|
while os.path.isfile(get_filename(i)):
|
|
i = i + 1
|
|
for index in range(i, 0, -1):
|
|
os.rename(get_filename(index - 1), get_filename(index))
|
|
try:
|
|
self.context.log(_elliptify(file_caption.decode("UTF-8")) + ' updated', end=' ', flush=True)
|
|
except UnicodeEncodeError:
|
|
self.context.log('txt updated', end=' ', flush=True)
|
|
try:
|
|
self.context.log(pcaption, end=' ', flush=True)
|
|
except UnicodeEncodeError:
|
|
self.context.log('txt', end=' ', flush=True)
|
|
with open(filename, 'wb') as text_file:
|
|
shutil.copyfileobj(BytesIO(caption), text_file)
|
|
os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
|
|
|
|
def save_location(self, filename: str, location: PostLocation, mtime: datetime) -> None:
|
|
"""Save post location name and Google Maps link."""
|
|
filename += '_location.txt'
|
|
location_string = (location.name + "\n" +
|
|
"https://maps.google.com/maps?q={0},{1}&ll={0},{1}\n".format(location.lat,
|
|
location.lng))
|
|
with open(filename, 'wb') as text_file:
|
|
shutil.copyfileobj(BytesIO(location_string.encode()), text_file)
|
|
os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
|
|
self.context.log('geo', end=' ', flush=True)
|
|
|
|
def download_profilepic(self, profile: Profile) -> None:
|
|
"""Downloads and saves profile pic."""
|
|
|
|
def _epoch_to_string(epoch: datetime) -> str:
|
|
return epoch.strftime('%Y-%m-%d_%H-%M-%S')
|
|
|
|
profile_pic_url = profile.profile_pic_url
|
|
with self.context.get_anonymous_session() as anonymous_session:
|
|
date_object = datetime.strptime(anonymous_session.head(profile_pic_url).headers["Last-Modified"],
|
|
'%a, %d %b %Y %H:%M:%S GMT')
|
|
profile_pic_extension = 'jpg'
|
|
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
|
|
format_string_contains_key(self.dirname_pattern, 'target'))):
|
|
filename = '{0}/{1}_UTC_profile_pic.{2}'.format(self.dirname_pattern.format(profile=profile.username.lower(),
|
|
target=profile.username.lower()),
|
|
_epoch_to_string(date_object), profile_pic_extension)
|
|
else:
|
|
filename = '{0}/{1}_{2}_UTC_profile_pic.{3}'.format(self.dirname_pattern.format(), profile.username.lower(),
|
|
_epoch_to_string(date_object), profile_pic_extension)
|
|
if os.path.isfile(filename):
|
|
self.context.log(filename + ' already exists')
|
|
return None
|
|
self.context.get_and_write_raw(profile_pic_url, filename)
|
|
os.utime(filename, (datetime.now().timestamp(), date_object.timestamp()))
|
|
self.context.log('') # log output of _get_and_write_raw() does not produce \n
|
|
|
|
@_requires_login
|
|
def save_session_to_file(self, filename: Optional[str] = None) -> None:
|
|
"""Saves internally stored :class:`requests.Session` object.
|
|
|
|
:param filename: Filename, or None to use default filename.
|
|
"""
|
|
if filename is None:
|
|
filename = get_default_session_filename(self.context.username)
|
|
dirname = os.path.dirname(filename)
|
|
if dirname != '' and not os.path.exists(dirname):
|
|
os.makedirs(dirname)
|
|
os.chmod(dirname, 0o700)
|
|
with open(filename, 'wb') as sessionfile:
|
|
os.chmod(filename, 0o600)
|
|
self.context.save_session_to_file(sessionfile)
|
|
self.context.log("Saved session to %s." % filename)
|
|
|
|
def load_session_from_file(self, username: str, filename: Optional[str] = None) -> None:
|
|
"""Internally stores :class:`requests.Session` object loaded from file.
|
|
|
|
If filename is None, the file with the default session path is loaded.
|
|
|
|
:raises FileNotFoundError: If the file does not exist.
|
|
"""
|
|
if filename is None:
|
|
filename = get_default_session_filename(username)
|
|
with open(filename, 'rb') as sessionfile:
|
|
self.context.load_session_from_file(username, sessionfile)
|
|
self.context.log("Loaded session from %s." % filename)
|
|
|
|
def test_login(self) -> Optional[str]:
|
|
"""Returns the Instagram username to which given :class:`requests.Session` object belongs, or None."""
|
|
return self.context.test_login()
|
|
|
|
def login(self, user: str, passwd: str) -> None:
|
|
"""Log in to instagram with given username and password and internally store session object.
|
|
|
|
:raises InvalidArgumentException: If the provided username does not exist.
|
|
:raises BadCredentialsException: If the provided password is wrong.
|
|
:raises ConnectionException: If connection to Instagram failed."""
|
|
self.context.login(user, passwd)
|
|
|
|
def format_filename(self, item: Union[Post, StoryItem], target: Optional[str] = None):
|
|
"""Format filename of a :class:`Post` or :class:`StoryItem` according to ``filename-pattern`` parameter.
|
|
|
|
.. versionadded:: 4.1"""
|
|
return _PostPathFormatter(item).format(self.filename_pattern, target=target)
|
|
|
|
def download_post(self, post: Post, target: str) -> bool:
|
|
"""
|
|
Download everything associated with one instagram post node, i.e. picture, caption and video.
|
|
|
|
:param post: Post to download.
|
|
:param target: Target name, i.e. profile name, #hashtag, :feed; for filename.
|
|
:return: True if something was downloaded, False otherwise, i.e. file was already there
|
|
"""
|
|
|
|
dirname = _PostPathFormatter(post).format(self.dirname_pattern, target=target)
|
|
filename = dirname + '/' + self.format_filename(post, target=target)
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
|
|
|
# Download the image(s) / video thumbnail and videos within sidecars if desired
|
|
downloaded = False
|
|
if self.download_pictures:
|
|
if post.typename == 'GraphSidecar':
|
|
edge_number = 1
|
|
for sidecar_node in post.get_sidecar_nodes():
|
|
# Download picture or video thumbnail
|
|
if not sidecar_node.is_video or self.download_video_thumbnails is True:
|
|
downloaded |= self.download_pic(filename=filename, url=sidecar_node.display_url,
|
|
mtime=post.date_local, filename_suffix=str(edge_number))
|
|
# Additionally download video if available and desired
|
|
if sidecar_node.is_video and self.download_videos is True:
|
|
downloaded |= self.download_pic(filename=filename, url=sidecar_node.video_url,
|
|
mtime=post.date_local, filename_suffix=str(edge_number))
|
|
edge_number += 1
|
|
elif post.typename == 'GraphImage':
|
|
downloaded = self.download_pic(filename=filename, url=post.url, mtime=post.date_local)
|
|
elif post.typename == 'GraphVideo':
|
|
if self.download_video_thumbnails is True:
|
|
downloaded = self.download_pic(filename=filename, url=post.url, mtime=post.date_local)
|
|
else:
|
|
self.context.error("Warning: {0} has unknown typename: {1}".format(post, post.typename))
|
|
|
|
# Save caption if desired
|
|
metadata_string = _ArbitraryItemFormatter(post).format(self.post_metadata_txt_pattern).strip()
|
|
if metadata_string:
|
|
self.save_caption(filename=filename, mtime=post.date_local, caption=metadata_string)
|
|
|
|
# Download video if desired
|
|
if post.is_video and self.download_videos is True:
|
|
downloaded |= self.download_pic(filename=filename, url=post.video_url, mtime=post.date_local)
|
|
|
|
# Download geotags if desired
|
|
if self.download_geotags and post.location:
|
|
self.save_location(filename, post.location, post.date_local)
|
|
|
|
# Update comments if desired
|
|
if self.download_comments is True:
|
|
self.update_comments(filename=filename, post=post)
|
|
|
|
# Save metadata as JSON if desired.
|
|
if self.save_metadata is not False:
|
|
self.save_metadata_json(filename, post)
|
|
|
|
self.context.log()
|
|
return downloaded
|
|
|
|
@_requires_login
|
|
def get_stories(self, userids: Optional[List[int]] = None) -> Iterator[Story]:
|
|
"""Get available stories from followees or all stories of users whose ID are given.
|
|
Does not mark stories as seen.
|
|
To use this, one needs to be logged in
|
|
|
|
:param userids: List of user IDs to be processed in terms of downloading their stories, or None.
|
|
"""
|
|
|
|
if not userids:
|
|
data = self.context.graphql_query("d15efd8c0c5b23f0ef71f18bf363c704",
|
|
{"only_stories": True})["data"]["user"]
|
|
if data is None:
|
|
raise BadResponseException('Bad stories reel JSON.')
|
|
userids = list(edge["node"]["id"] for edge in data["feed_reels_tray"]["edge_reels_tray_to_reel"]["edges"])
|
|
|
|
def _userid_chunks():
|
|
userids_per_query = 100
|
|
for i in range(0, len(userids), userids_per_query):
|
|
yield userids[i:i + userids_per_query]
|
|
|
|
for userid_chunk in _userid_chunks():
|
|
stories = self.context.graphql_query("bf41e22b1c4ba4c9f31b844ebb7d9056",
|
|
{"reel_ids": userid_chunk, "precomposed_overlay": False})["data"]
|
|
yield from (Story(self.context, media) for media in stories['reels_media'])
|
|
|
|
@_requires_login
|
|
def download_stories(self,
|
|
userids: Optional[List[Union[int, Profile]]] = None,
|
|
fast_update: bool = False,
|
|
filename_target: Optional[str] = ':stories',
|
|
storyitem_filter: Optional[Callable[[StoryItem], bool]] = None) -> None:
|
|
"""
|
|
Download available stories from user followees or all stories of users whose ID are given.
|
|
Does not mark stories as seen.
|
|
To use this, one needs to be logged in
|
|
|
|
:param userids: List of user IDs or Profiles to be processed in terms of downloading their stories
|
|
:param fast_update: If true, abort when first already-downloaded picture is encountered
|
|
:param filename_target: Replacement for {target} in dirname_pattern and filename_pattern
|
|
or None if profile name should be used instead
|
|
:param storyitem_filter: function(storyitem), which returns True if given StoryItem should be downloaded
|
|
"""
|
|
|
|
if not userids:
|
|
self.context.log("Retrieving all visible stories...")
|
|
else:
|
|
userids = [p if isinstance(p, int) else p.userid for p in userids]
|
|
|
|
for user_story in self.get_stories(userids):
|
|
name = user_story.owner_username
|
|
self.context.log("Retrieving stories from profile {}.".format(name))
|
|
totalcount = user_story.itemcount
|
|
count = 1
|
|
for item in user_story.get_items():
|
|
if storyitem_filter is not None and not storyitem_filter(item):
|
|
self.context.log("<{} skipped>".format(item), flush=True)
|
|
continue
|
|
self.context.log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
|
|
count += 1
|
|
with self.context.error_catcher('Download story from user {}'.format(name)):
|
|
downloaded = self.download_storyitem(item, filename_target if filename_target else name)
|
|
if fast_update and not downloaded:
|
|
break
|
|
|
|
def download_storyitem(self, item: StoryItem, target: str) -> bool:
|
|
"""Download one user story.
|
|
|
|
:param item: Story item, as in story['items'] for story in :meth:`get_stories`
|
|
:param target: Replacement for {target} in dirname_pattern and filename_pattern
|
|
:return: True if something was downloaded, False otherwise, i.e. file was already there
|
|
"""
|
|
|
|
date_local = item.date_local
|
|
dirname = _PostPathFormatter(item).format(self.dirname_pattern, target=target)
|
|
filename = dirname + '/' + self.format_filename(item, target=target)
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
|
downloaded = False
|
|
if not item.is_video or self.download_video_thumbnails is True:
|
|
url = item.url
|
|
downloaded = self.download_pic(filename=filename, url=url, mtime=date_local)
|
|
if item.is_video and self.download_videos is True:
|
|
downloaded |= self.download_pic(filename=filename, url=item.video_url, mtime=date_local)
|
|
# Save caption if desired
|
|
metadata_string = _ArbitraryItemFormatter(item).format(self.storyitem_metadata_txt_pattern).strip()
|
|
if metadata_string:
|
|
self.save_caption(filename=filename, mtime=item.date_local, caption=metadata_string)
|
|
# Save metadata as JSON if desired.
|
|
if self.save_metadata is not False:
|
|
self.save_metadata_json(filename, item)
|
|
self.context.log()
|
|
return downloaded
|
|
|
|
@_requires_login
|
|
def get_highlights(self, user: Union[int, Profile]) -> Iterator[Highlight]:
|
|
"""Get all highlights from a user.
|
|
To use this, one needs to be logged in.
|
|
|
|
.. versionadded:: 4.1
|
|
|
|
:param user: ID or Profile of the user whose highlights should get fetched.
|
|
"""
|
|
|
|
userid = user if isinstance(user, int) else user.userid
|
|
data = self.context.graphql_query("7c16654f22c819fb63d1183034a5162f",
|
|
{"user_id": userid, "include_chaining": False, "include_reel": False,
|
|
"include_suggested_users": False, "include_logged_out_extras": False,
|
|
"include_highlight_reels": True})["data"]["user"]['edge_highlight_reels']
|
|
if data is None:
|
|
raise BadResponseException('Bad highlights reel JSON.')
|
|
yield from (Highlight(self.context, edge['node'], user if isinstance(user, Profile) else None)
|
|
for edge in data['edges'])
|
|
|
|
@_requires_login
|
|
def download_highlights(self,
|
|
user: Union[int, Profile],
|
|
fast_update: bool = False,
|
|
filename_target: Optional[str] = None,
|
|
storyitem_filter: Optional[Callable[[StoryItem], bool]] = None) -> None:
|
|
"""
|
|
Download available highlights from a user whose ID is given.
|
|
To use this, one needs to be logged in.
|
|
|
|
.. versionadded:: 4.1
|
|
|
|
:param user: ID or Profile of the user whose highlights should get downloaded.
|
|
:param fast_update: If true, abort when first already-downloaded picture is encountered
|
|
:param filename_target: Replacement for {target} in dirname_pattern and filename_pattern
|
|
or None if profile name and the highlights' titles should be used instead
|
|
:param storyitem_filter: function(storyitem), which returns True if given StoryItem should be downloaded
|
|
"""
|
|
for user_highlight in self.get_highlights(user):
|
|
name = user_highlight.owner_username
|
|
self.context.log("Retrieving highlights \"{}\" from profile {}".format(user_highlight.title, name))
|
|
totalcount = user_highlight.itemcount
|
|
count = 1
|
|
for item in user_highlight.get_items():
|
|
if storyitem_filter is not None and not storyitem_filter(item):
|
|
self.context.log("<{} skipped>".format(item), flush=True)
|
|
continue
|
|
self.context.log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
|
|
count += 1
|
|
with self.context.error_catcher('Download highlights \"{}\" from user {}'.format(user_highlight.title, name)):
|
|
downloaded = self.download_storyitem(item, filename_target
|
|
if filename_target
|
|
else '{}/{}'.format(name, user_highlight.title))
|
|
if fast_update and not downloaded:
|
|
break
|
|
|
|
@_requires_login
|
|
def get_feed_posts(self) -> Iterator[Post]:
|
|
"""Get Posts of the user's feed.
|
|
|
|
:return: Iterator over Posts of the user's feed.
|
|
"""
|
|
|
|
data = self.context.graphql_query("d6f4427fbe92d846298cf93df0b937d3", {})["data"]
|
|
|
|
while True:
|
|
feed = data["user"]["edge_web_feed_timeline"]
|
|
yield from (Post(self.context, edge["node"]) for edge in feed["edges"]
|
|
if not edge["node"]["__typename"] == "GraphSuggestedUserFeedUnit")
|
|
if not feed["page_info"]["has_next_page"]:
|
|
break
|
|
data = self.context.graphql_query("d6f4427fbe92d846298cf93df0b937d3",
|
|
{'fetch_media_item_count': 12,
|
|
'fetch_media_item_cursor': feed["page_info"]["end_cursor"],
|
|
'fetch_comment_count': 4,
|
|
'fetch_like': 10,
|
|
'has_stories': False})["data"]
|
|
|
|
@_requires_login
|
|
def download_feed_posts(self, max_count: int = None, fast_update: bool = False,
|
|
post_filter: Optional[Callable[[Post], bool]] = None) -> None:
|
|
"""
|
|
Download pictures from the user's feed.
|
|
|
|
Example to download up to the 20 pics the user last liked::
|
|
|
|
loader = Instaloader()
|
|
loader.load_session_from_file('USER')
|
|
loader.download_feed_posts(max_count=20, fast_update=True,
|
|
post_filter=lambda post: post.viewer_has_liked)
|
|
|
|
:param max_count: Maximum count of pictures to download
|
|
:param fast_update: If true, abort when first already-downloaded picture is encountered
|
|
:param post_filter: function(post), which returns True if given picture should be downloaded
|
|
"""
|
|
self.context.log("Retrieving pictures from your feed...")
|
|
count = 1
|
|
for post in self.get_feed_posts():
|
|
if max_count is not None and count > max_count:
|
|
break
|
|
name = post.owner_username
|
|
if post_filter is not None and not post_filter(post):
|
|
self.context.log("<pic by %s skipped>" % name, flush=True)
|
|
continue
|
|
self.context.log("[%3i] %s " % (count, name), end="", flush=True)
|
|
count += 1
|
|
with self.context.error_catcher('Download feed'):
|
|
downloaded = self.download_post(post, target=':feed')
|
|
if fast_update and not downloaded:
|
|
break
|
|
|
|
@_requires_login
|
|
def download_saved_posts(self, max_count: int = None, fast_update: bool = False,
|
|
post_filter: Optional[Callable[[Post], bool]] = None) -> None:
|
|
"""Download user's saved pictures.
|
|
|
|
:param max_count: Maximum count of pictures to download
|
|
:param fast_update: If true, abort when first already-downloaded picture is encountered
|
|
:param post_filter: function(post), which returns True if given picture should be downloaded
|
|
"""
|
|
self.context.log("Retrieving saved posts...")
|
|
count = 1
|
|
for post in Profile.from_username(self.context, self.context.username).get_saved_posts():
|
|
if max_count is not None and count > max_count:
|
|
break
|
|
if post_filter is not None and not post_filter(post):
|
|
self.context.log("<{} skipped>".format(post), flush=True)
|
|
continue
|
|
self.context.log("[{:>3}] ".format(count), end=str(), flush=True)
|
|
count += 1
|
|
with self.context.error_catcher('Download saved posts'):
|
|
downloaded = self.download_post(post, target=':saved')
|
|
if fast_update and not downloaded:
|
|
break
|
|
|
|
@_requires_login
|
|
def get_explore_posts(self) -> Iterator[Post]:
|
|
"""Get Posts which are worthy of exploring suggested by Instagram.
|
|
|
|
:return: Iterator over Posts of the user's suggested posts.
|
|
"""
|
|
data = self.context.get_json('explore/', {})
|
|
yield from (Post(self.context, node)
|
|
for node in self.context.graphql_node_list("df0dcc250c2b18d9fd27c5581ef33c7c",
|
|
{}, 'https://www.instagram.com/explore/',
|
|
lambda d: d['data']['user']['edge_web_discover_media'],
|
|
data['rhx_gis']))
|
|
|
|
def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]:
|
|
"""Get Posts associated with a #hashtag."""
|
|
has_next_page = True
|
|
end_cursor = None
|
|
while has_next_page:
|
|
if end_cursor:
|
|
params = {'__a': 1, 'max_id': end_cursor}
|
|
else:
|
|
params = {'__a': 1}
|
|
hashtag_data = self.context.get_json('explore/tags/{0}/'.format(hashtag),
|
|
params)['graphql']['hashtag']['edge_hashtag_to_media']
|
|
yield from (Post(self.context, edge['node']) for edge in hashtag_data['edges'])
|
|
has_next_page = hashtag_data['page_info']['has_next_page']
|
|
end_cursor = hashtag_data['page_info']['end_cursor']
|
|
|
|
def download_hashtag(self, hashtag: str,
|
|
max_count: Optional[int] = None,
|
|
post_filter: Optional[Callable[[Post], bool]] = None,
|
|
fast_update: bool = False) -> None:
|
|
"""Download pictures of one hashtag.
|
|
|
|
To download the last 30 pictures with hashtag #cat, do::
|
|
|
|
loader = Instaloader()
|
|
loader.download_hashtag('cat', max_count=30)
|
|
|
|
:param hashtag: Hashtag to download, without leading '#'
|
|
:param max_count: Maximum count of pictures to download
|
|
:param post_filter: function(post), which returns True if given picture should be downloaded
|
|
:param fast_update: If true, abort when first already-downloaded picture is encountered
|
|
"""
|
|
hashtag = hashtag.lower()
|
|
self.context.log("Retrieving pictures with hashtag {}...".format(hashtag))
|
|
count = 1
|
|
for post in self.get_hashtag_posts(hashtag):
|
|
if max_count is not None and count > max_count:
|
|
break
|
|
self.context.log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True)
|
|
if post_filter is not None and not post_filter(post):
|
|
self.context.log('<skipped>')
|
|
continue
|
|
count += 1
|
|
with self.context.error_catcher('Download hashtag #{}'.format(hashtag)):
|
|
downloaded = self.download_post(post, target='#' + hashtag)
|
|
if fast_update and not downloaded:
|
|
break
|
|
|
|
def download_tagged(self, profile: Profile, fast_update: bool = False,
|
|
target: Optional[str] = None,
|
|
post_filter: Optional[Callable[[Post], bool]] = None) -> None:
|
|
"""Download all posts where a profile is tagged.
|
|
|
|
.. versionadded:: 4.1"""
|
|
if target is None:
|
|
target = profile.username + '/:tagged'
|
|
self.context.log("Retrieving tagged posts for profile {}.".format(profile.username))
|
|
count = 1
|
|
for post in profile.get_tagged_posts():
|
|
self.context.log("[%3i/???] " % (count), end="", flush=True)
|
|
count += 1
|
|
if post_filter is not None and not post_filter(post):
|
|
self.context.log('<{} skipped>'.format(post))
|
|
with self.context.error_catcher('Download tagged {}'.format(profile.username)):
|
|
downloaded = self.download_post(post, target)
|
|
if fast_update and not downloaded:
|
|
break
|
|
|
|
def _get_id_filename(self, profile_name: str) -> str:
|
|
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
|
|
format_string_contains_key(self.dirname_pattern, 'target'))):
|
|
return '{0}/id'.format(self.dirname_pattern.format(profile=profile_name.lower(),
|
|
target=profile_name.lower()))
|
|
else:
|
|
return '{0}/{1}_id'.format(self.dirname_pattern.format(), profile_name.lower())
|
|
|
|
def save_profile_id(self, profile: Profile):
|
|
"""
|
|
Store ID of profile locally.
|
|
|
|
.. versionadded:: 4.0.6
|
|
"""
|
|
os.makedirs(self.dirname_pattern.format(profile=profile.username,
|
|
target=profile.username), exist_ok=True)
|
|
with open(self._get_id_filename(profile.username), 'w') as text_file:
|
|
text_file.write(str(profile.userid) + "\n")
|
|
self.context.log("Stored ID {0} for profile {1}.".format(profile.userid, profile.username))
|
|
|
|
def check_profile_id(self, profile_name: str) -> Profile:
|
|
"""
|
|
Consult locally stored ID of profile with given name, check whether ID matches and whether name
|
|
has changed and return current name of the profile, and store ID of profile.
|
|
|
|
:param profile_name: Profile name
|
|
:return: Instance of current profile
|
|
"""
|
|
profile = None
|
|
with suppress(ProfileNotExistsException):
|
|
profile = Profile.from_username(self.context, profile_name)
|
|
profile_exists = profile is not None
|
|
id_filename = self._get_id_filename(profile_name)
|
|
try:
|
|
with open(id_filename, 'rb') as id_file:
|
|
profile_id = int(id_file.read())
|
|
if (not profile_exists) or \
|
|
(profile_id != profile.userid):
|
|
if profile_exists:
|
|
self.context.log("Profile {0} does not match the stored unique ID {1}.".format(profile_name,
|
|
profile_id))
|
|
else:
|
|
self.context.log("Trying to find profile {0} using its unique ID {1}.".format(profile_name,
|
|
profile_id))
|
|
profile_from_id = Profile.from_id(self.context, profile_id)
|
|
newname = profile_from_id.username
|
|
self.context.log("Profile {0} has changed its name to {1}.".format(profile_name, newname))
|
|
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
|
|
format_string_contains_key(self.dirname_pattern, 'target'))):
|
|
os.rename(self.dirname_pattern.format(profile=profile_name.lower(),
|
|
target=profile_name.lower()),
|
|
self.dirname_pattern.format(profile=newname.lower(),
|
|
target=newname.lower()))
|
|
else:
|
|
os.rename('{0}/{1}_id'.format(self.dirname_pattern.format(), profile_name.lower()),
|
|
'{0}/{1}_id'.format(self.dirname_pattern.format(), newname.lower()))
|
|
return profile_from_id
|
|
return profile
|
|
except (FileNotFoundError, ValueError):
|
|
pass
|
|
if profile_exists:
|
|
self.save_profile_id(profile)
|
|
return profile
|
|
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile_name))
|
|
|
|
def download_profiles(self, profiles: Set[Profile],
|
|
profile_pic: bool = True, posts: bool = True,
|
|
tagged: bool = False, highlights: bool = False, stories: bool = False,
|
|
fast_update: bool = False,
|
|
post_filter: Optional[Callable[[Post], bool]] = None,
|
|
storyitem_filter: Optional[Callable[[Post], bool]] = None,
|
|
raise_errors: bool = False):
|
|
"""High-level method to download set of profiles.
|
|
|
|
:param profiles: Set of profiles to download.
|
|
:param profile_pic: not :option:`--no-profile-pic`.
|
|
:param posts: not :option:`--no-posts`.
|
|
:param tagged: :option:`--tagged`.
|
|
:param highlights: :option:`--highlights`.
|
|
:param stories: :option:`--stories`.
|
|
:param fast_update: :option:`--fast-update`.
|
|
:param post_filter: :option:`--post-filter`.
|
|
:param storyitem_filter: :option:`--post-filter`.
|
|
:param raise_errors:
|
|
Whether :exc:`LoginRequiredException` and :exc:`PrivateProfileNotFollowedException` should be raised or
|
|
catched and printed with :meth:`InstaloaderContext.error_catcher`.
|
|
|
|
.. versionadded:: 4.1"""
|
|
|
|
def _error_raiser(_str):
|
|
yield
|
|
|
|
error_handler = _error_raiser if raise_errors else self.context.error_catcher
|
|
|
|
for profile in profiles:
|
|
with error_handler(profile.username):
|
|
profile_name = profile.username
|
|
|
|
# Save metadata as JSON if desired.
|
|
if self.save_metadata:
|
|
json_filename = '{0}/{1}_{2}'.format(self.dirname_pattern.format(profile=profile_name,
|
|
target=profile_name),
|
|
profile_name, profile.userid)
|
|
self.save_metadata_json(json_filename, profile)
|
|
|
|
# Download profile picture
|
|
if profile_pic:
|
|
with self.context.error_catcher('Download profile picture of {}'.format(profile_name)):
|
|
self.download_profilepic(profile)
|
|
|
|
# Catch some errors
|
|
if profile.is_private and (tagged or highlights or posts):
|
|
if not self.context.is_logged_in:
|
|
raise LoginRequiredException("--login=USERNAME required.")
|
|
if not profile.followed_by_viewer and self.context.username != profile.username:
|
|
raise PrivateProfileNotFollowedException("Private but not followed.")
|
|
|
|
# Download tagged, if requested
|
|
if tagged:
|
|
with self.context.error_catcher('Download tagged of {}'.format(profile_name)):
|
|
self.download_tagged(profile, fast_update=fast_update, post_filter=post_filter)
|
|
|
|
# Download highlights, if requested
|
|
if highlights:
|
|
with self.context.error_catcher('Download highlights of {}'.format(profile_name)):
|
|
self.download_highlights(profile, fast_update=fast_update, storyitem_filter=storyitem_filter)
|
|
|
|
# Iterate over pictures and download them
|
|
if posts:
|
|
self.context.log("Retrieving posts from profile {}.".format(profile_name))
|
|
totalcount = profile.mediacount
|
|
count = 1
|
|
for post in profile.get_posts():
|
|
self.context.log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
|
|
count += 1
|
|
if post_filter is not None and not post_filter(post):
|
|
self.context.log('<skipped>')
|
|
continue
|
|
with self.context.error_catcher("Download {} of {}".format(post, profile_name)):
|
|
downloaded = self.download_post(post, target=profile_name)
|
|
if fast_update and not downloaded:
|
|
break
|
|
|
|
if stories and profiles:
|
|
with self.context.error_catcher("Download stories"):
|
|
self.context.log("Downloading stories")
|
|
self.download_stories(userids=list(profiles), fast_update=fast_update, filename_target=None,
|
|
storyitem_filter=storyitem_filter)
|
|
|
|
def download_profile(self, profile_name: Union[str, Profile],
|
|
profile_pic: bool = True, profile_pic_only: bool = False,
|
|
fast_update: bool = False,
|
|
download_stories: bool = False, download_stories_only: bool = False,
|
|
download_tagged: bool = False, download_tagged_only: bool = False,
|
|
post_filter: Optional[Callable[[Post], bool]] = None,
|
|
storyitem_filter: Optional[Callable[[StoryItem], bool]] = None) -> None:
|
|
"""Download one profile
|
|
|
|
.. deprecated:: 4.1
|
|
Use :meth:`Instaloader.download_profiles`.
|
|
"""
|
|
|
|
# Get profile main page json
|
|
# check if profile does exist or name has changed since last download
|
|
# and update name and json data if necessary
|
|
if isinstance(profile_name, str):
|
|
profile = self.check_profile_id(profile_name.lower())
|
|
else:
|
|
profile = profile_name
|
|
|
|
profile_name = profile.username
|
|
|
|
# Save metadata as JSON if desired.
|
|
if self.save_metadata is not False:
|
|
json_filename = '{0}/{1}_{2}'.format(self.dirname_pattern.format(profile=profile_name, target=profile_name),
|
|
profile_name, profile.userid)
|
|
self.save_metadata_json(json_filename, profile)
|
|
|
|
if self.context.is_logged_in and profile.has_blocked_viewer and not profile.is_private:
|
|
# raising ProfileNotExistsException invokes "trying again anonymously" logic
|
|
raise ProfileNotExistsException("Profile {} has blocked you".format(profile_name))
|
|
|
|
# Download profile picture
|
|
if profile_pic or profile_pic_only:
|
|
with self.context.error_catcher('Download profile picture of {}'.format(profile_name)):
|
|
self.download_profilepic(profile)
|
|
if profile_pic_only:
|
|
return
|
|
|
|
# Catch some errors
|
|
if profile.is_private:
|
|
if not self.context.is_logged_in:
|
|
raise LoginRequiredException("profile %s requires login" % profile_name)
|
|
if not profile.followed_by_viewer and \
|
|
self.context.username != profile.username:
|
|
raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % profile_name)
|
|
else:
|
|
if self.context.is_logged_in and not (download_stories or download_stories_only):
|
|
self.context.log("profile %s could also be downloaded anonymously." % profile_name)
|
|
|
|
# Download stories, if requested
|
|
if download_stories or download_stories_only:
|
|
if profile.has_viewable_story:
|
|
with self.context.error_catcher("Download stories of {}".format(profile_name)):
|
|
self.download_stories(userids=[profile.userid], filename_target=profile_name,
|
|
fast_update=fast_update, storyitem_filter=storyitem_filter)
|
|
else:
|
|
self.context.log("{} does not have any stories.".format(profile_name))
|
|
if download_stories_only:
|
|
return
|
|
|
|
# Download tagged, if requested
|
|
if download_tagged or download_tagged_only:
|
|
with self.context.error_catcher('Download tagged of {}'.format(profile_name)):
|
|
self.download_tagged(profile, fast_update=fast_update, post_filter=post_filter)
|
|
if download_tagged_only:
|
|
return
|
|
|
|
# Iterate over pictures and download them
|
|
self.context.log("Retrieving posts from profile {}.".format(profile_name))
|
|
totalcount = profile.mediacount
|
|
count = 1
|
|
for post in profile.get_posts():
|
|
self.context.log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
|
|
count += 1
|
|
if post_filter is not None and not post_filter(post):
|
|
self.context.log('<skipped>')
|
|
continue
|
|
with self.context.error_catcher('Download profile {}'.format(profile_name)):
|
|
downloaded = self.download_post(post, target=profile_name)
|
|
if fast_update and not downloaded:
|
|
break
|
|
|
|
def interactive_login(self, username: str) -> None:
|
|
"""Logs in and internally stores session, asking user for password interactively.
|
|
|
|
:raises LoginRequiredException: when in quiet mode.
|
|
:raises InvalidArgumentException: If the provided username does not exist.
|
|
:raises ConnectionException: If connection to Instagram failed."""
|
|
if self.context.quiet:
|
|
raise LoginRequiredException("Quiet mode requires given password or valid session file.")
|
|
password = None
|
|
while password is None:
|
|
password = getpass.getpass(prompt="Enter Instagram password for %s: " % username)
|
|
try:
|
|
self.login(username, password)
|
|
except BadCredentialsException as err:
|
|
print(err, file=sys.stderr)
|
|
password = None
|