"@_requires_login" for logged-in-only methods

Methods in Instaloader class can now be prefixed with @_requires_login
decorator to ensure they are only called if logged in, otherwise
LoginRequiredException is raised.
This commit is contained in:
Alexander Graf 2018-03-18 20:01:59 +01:00
parent b0a8bfbcf4
commit a9fb254d76

View File

@ -550,7 +550,7 @@ class Profile:
"""Get Posts that are marked as saved by the user.""" """Get Posts that are marked as saved by the user."""
if self.username != self._instaloader.username: if self.username != self._instaloader.username:
return raise LoginRequiredException("--login={} required to get that profile's saved posts.".format(self.username))
data = self._metadata data = self._metadata
@ -591,6 +591,15 @@ class Tristate(Enum):
always = 2 always = 2
def _requires_login(func: Callable) -> Callable:
"""Decorator to raise an exception if herewith-decorated function is called without being logged in"""
def call(instaloader, *args, **kwargs):
if not instaloader.is_logged_in:
raise LoginRequiredException("--login=USERNAME required.")
return func(instaloader, *args, **kwargs)
return call
class Instaloader: class Instaloader:
GRAPHQL_PAGE_LENGTH = 200 GRAPHQL_PAGE_LENGTH = 200
@ -890,6 +899,7 @@ class Instaloader:
else: else:
break break
@_requires_login
def get_followers(self, profile: str) -> Iterator[Dict[str, Any]]: def get_followers(self, profile: str) -> Iterator[Dict[str, Any]]:
""" """
Retrieve list of followers of given profile. Retrieve list of followers of given profile.
@ -902,6 +912,7 @@ class Instaloader:
'https://www.instagram.com/' + profile + '/', 'https://www.instagram.com/' + profile + '/',
lambda d: d['data']['user']['edge_followed_by']) lambda d: d['data']['user']['edge_followed_by'])
@_requires_login
def get_followees(self, profile: str) -> Iterator[Dict[str, Any]]: def get_followees(self, profile: str) -> Iterator[Dict[str, Any]]:
""" """
Retrieve list of followees (followings) of given profile. Retrieve list of followees (followings) of given profile.
@ -1062,6 +1073,7 @@ class Instaloader:
os.utime(filename, (datetime.now().timestamp(), date_object.timestamp())) os.utime(filename, (datetime.now().timestamp(), date_object.timestamp()))
self._log('') # log output of _get_and_write_raw() does not produce \n self._log('') # log output of _get_and_write_raw() does not produce \n
@_requires_login
def save_session_to_file(self, filename: Optional[str] = None) -> None: def save_session_to_file(self, filename: Optional[str] = None) -> None:
"""Saves internally stored :class:`requests.Session` object.""" """Saves internally stored :class:`requests.Session` object."""
if filename is None: if filename is None:
@ -1210,6 +1222,7 @@ class Instaloader:
self._log() self._log()
return downloaded return downloaded
@_requires_login
def get_stories(self, userids: Optional[List[int]] = None) -> Iterator[Dict[str, Any]]: def get_stories(self, userids: Optional[List[int]] = None) -> Iterator[Dict[str, Any]]:
"""Get available stories from followees or all stories of users whose ID are given. """Get available stories from followees or all stories of users whose ID are given.
Does not mark stories as seen. Does not mark stories as seen.
@ -1245,6 +1258,7 @@ class Instaloader:
for user in data["tray"]: for user in data["tray"]:
yield user if "items" in user else _get(url_reel_media.format(user['user']['pk'])) yield user if "items" in user else _get(url_reel_media.format(user['user']['pk']))
@_requires_login
def download_stories(self, def download_stories(self,
userids: Optional[List[int]] = None, userids: Optional[List[int]] = None,
fast_update: bool = False, fast_update: bool = False,
@ -1259,13 +1273,13 @@ class Instaloader:
:param filename_target: Replacement for {target} in dirname_pattern and filename_pattern :param filename_target: Replacement for {target} in dirname_pattern and filename_pattern
""" """
if not userids:
self._log("Retrieving all visible stories...")
if format_string_contains_key(self.filename_pattern, 'post'): if format_string_contains_key(self.filename_pattern, 'post'):
raise InvalidArgumentException("The \"post\" keyword is not supported in the filename pattern when " raise InvalidArgumentException("The \"post\" keyword is not supported in the filename pattern when "
"downloading stories.") "downloading stories.")
if not self.is_logged_in:
raise LoginRequiredException('Login required to download stories')
for user_stories in self.get_stories(userids): for user_stories in self.get_stories(userids):
if "items" not in user_stories: if "items" not in user_stories:
raise BadResponseException('Bad reel media JSON.') raise BadResponseException('Bad reel media JSON.')
@ -1331,6 +1345,7 @@ class Instaloader:
self._log() self._log()
return downloaded return downloaded
@_requires_login
def get_feed_posts(self) -> Iterator[Post]: def get_feed_posts(self) -> Iterator[Post]:
"""Get Posts of the user's feed.""" """Get Posts of the user's feed."""
@ -1351,6 +1366,7 @@ class Instaloader:
'fetch_like': 10, 'fetch_like': 10,
'has_stories': False})["data"] 'has_stories': False})["data"]
@_requires_login
def download_feed_posts(self, max_count: int = None, fast_update: bool = False, def download_feed_posts(self, max_count: int = None, fast_update: bool = False,
filter_func: Optional[Callable[[Post], bool]] = None) -> None: filter_func: Optional[Callable[[Post], bool]] = None) -> None:
""" """
@ -1367,6 +1383,7 @@ class Instaloader:
:param fast_update: If true, abort when first already-downloaded picture is encountered :param fast_update: If true, abort when first already-downloaded picture is encountered
:param filter_func: function(post), which returns True if given picture should be downloaded :param filter_func: function(post), which returns True if given picture should be downloaded
""" """
self._log("Retrieving pictures from your feed...")
count = 1 count = 1
for post in self.get_feed_posts(): for post in self.get_feed_posts():
if max_count is not None and count > max_count: if max_count is not None and count > max_count:
@ -1382,6 +1399,7 @@ class Instaloader:
if fast_update and not downloaded: if fast_update and not downloaded:
break break
@_requires_login
def download_saved_posts(self, max_count: int = None, fast_update: bool = False, def download_saved_posts(self, max_count: int = None, fast_update: bool = False,
filter_func: Optional[Callable[[Post], bool]] = None) -> None: filter_func: Optional[Callable[[Post], bool]] = None) -> None:
"""Download user's saved pictures. """Download user's saved pictures.
@ -1390,6 +1408,7 @@ class Instaloader:
:param fast_update: If true, abort when first already-downloaded picture is encountered :param fast_update: If true, abort when first already-downloaded picture is encountered
:param filter_func: function(post), which returns True if given picture should be downloaded :param filter_func: function(post), which returns True if given picture should be downloaded
""" """
self._log("Retrieving saved posts...")
count = 1 count = 1
for post in Profile(self, self.username).get_saved_posts(): for post in Profile(self, self.username).get_saved_posts():
if max_count is not None and count > max_count: if max_count is not None and count > max_count:
@ -1429,6 +1448,7 @@ class Instaloader:
:param fast_update: If true, abort when first already-downloaded picture is encountered :param fast_update: If true, abort when first already-downloaded picture is encountered
""" """
hashtag = hashtag.lower() hashtag = hashtag.lower()
self._log("Retrieving pictures with hashtag {}...".format(hashtag))
count = 1 count = 1
for post in self.get_hashtag_posts(hashtag): for post in self.get_hashtag_posts(hashtag):
if max_count is not None and count > max_count: if max_count is not None and count > max_count:
@ -1573,7 +1593,7 @@ class Instaloader:
print(err, file=sys.stderr) print(err, file=sys.stderr)
password = None password = None
def main(self, profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None, def main(self, targetlist: List[str], username: Optional[str] = None, password: Optional[str] = None,
sessionfile: Optional[str] = None, max_count: Optional[int] = None, sessionfile: Optional[str] = None, max_count: Optional[int] = None,
profile_pic: bool = True, profile_pic_only: bool = False, profile_pic: bool = True, profile_pic_only: bool = False,
fast_update: bool = False, fast_update: bool = False,
@ -1601,57 +1621,34 @@ class Instaloader:
self.interactive_login(username) self.interactive_login(username)
self._log("Logged in as %s." % username) self._log("Logged in as %s." % username)
# Try block for KeyboardInterrupt (save session on ^C) # Try block for KeyboardInterrupt (save session on ^C)
targets = set() profiles = set()
try: try:
# Generate set of targets # Generate set of profiles, already downloading non-profile targets
for pentry in profilelist: for target in targetlist:
if pentry[0] == '#': with self._error_catcher(target):
self._log("Retrieving pictures with hashtag {0}".format(pentry)) if target[0] == '#':
with self._error_catcher(): self.download_hashtag(hashtag=target[1:], max_count=max_count, fast_update=fast_update,
self.download_hashtag(hashtag=pentry[1:], max_count=max_count, fast_update=fast_update,
filter_func=filter_func) filter_func=filter_func)
elif pentry[0] == '@': elif target == ":feed":
if username is not None: self.download_feed_posts(fast_update=fast_update, max_count=max_count,
self._log("Retrieving followees of %s..." % pentry[1:]) filter_func=filter_func)
with self._error_catcher(): elif target == ":stories":
followees = self.get_followees(pentry[1:]) self.download_stories(fast_update=fast_update)
targets.update([followee['username'] for followee in followees]) elif target == ":saved":
self.download_saved_posts(fast_update=fast_update, max_count=max_count,
filter_func=filter_func)
else: else:
self.error("--login=USERNAME required to download {}.".format(pentry)) profiles.add(target)
elif pentry == ":feed": if len(profiles) > 1:
if username is not None: self._log("Downloading {} profiles: {}".format(len(profiles), ','.join(profiles)))
self._log("Retrieving pictures from your feed...") # Iterate through profiles list and download them
with self._error_catcher(): for target in profiles:
self.download_feed_posts(fast_update=fast_update, max_count=max_count, with self._error_catcher(target):
filter_func=filter_func)
else:
self.error("--login=USERNAME required to download {}.".format(pentry))
elif pentry == ":stories":
if username is not None:
with self._error_catcher():
self.download_stories(fast_update=fast_update)
else:
self.error("--login=USERNAME required to download {}.".format(pentry))
elif pentry == ":saved":
if username is not None:
self._log("Retrieving saved posts...")
with self._error_catcher():
self.download_saved_posts(fast_update=fast_update, max_count=max_count,
filter_func=filter_func)
else:
self.error("--login=USERNAME required to download {}.".format(pentry))
else:
targets.add(pentry)
if len(targets) > 1:
self._log("Downloading {} profiles: {}".format(len(targets), ','.join(targets)))
# Iterate through targets list and download them
for target in targets:
with self._error_catcher():
try: try:
self.download_profile(target, profile_pic, profile_pic_only, fast_update, stories, stories_only, self.download_profile(target, profile_pic, profile_pic_only, fast_update, stories, stories_only,
filter_func=filter_func) filter_func=filter_func)
except ProfileNotExistsException as err: except ProfileNotExistsException as err:
if username is not None: if not self.is_logged_in:
self._log(err) self._log(err)
self._log("Trying again anonymously, helps in case you are just blocked.") self._log("Trying again anonymously, helps in case you are just blocked.")
with self.anonymous_copy() as anonymous_loader: with self.anonymous_copy() as anonymous_loader:
@ -1663,7 +1660,7 @@ class Instaloader:
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nInterrupted by user.", file=sys.stderr) print("\nInterrupted by user.", file=sys.stderr)
# Save session if it is useful # Save session if it is useful
if username is not None: if self.is_logged_in:
self.save_session_to_file(sessionfile) self.save_session_to_file(sessionfile)
if self.error_log: if self.error_log:
print("\nErrors occured:", file=sys.stderr) print("\nErrors occured:", file=sys.stderr)