Filter posts with --only-if=FILTER
where FILTER is a boolean expression in python syntax where all names are evaluated to instaloader.Post properties. Examples: instaloader --login=USER --only-if='viewer_has_liked' :feed instaloader --only-if='likes>1000 and comments>5' profile
This commit is contained in:
		
							
								
								
									
										121
									
								
								instaloader.py
									
									
									
									
									
								
							
							
						
						
									
										121
									
								
								instaloader.py
									
									
									
									
									
								
							@@ -1,7 +1,8 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
"""Download pictures (or videos) along with their captions and other metadata from Instagram."""
 | 
			
		||||
 | 
			
		||||
import ast
 | 
			
		||||
import copy
 | 
			
		||||
import getpass
 | 
			
		||||
import json
 | 
			
		||||
import os
 | 
			
		||||
@@ -123,6 +124,42 @@ def format_string_contains_key(format_string: str, key: str) -> bool:
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def filterstr_to_filterfunc(filter_str: str, logged_in: bool) -> Callable[['Post'], bool]:
 | 
			
		||||
    """Takes an --only-if=... filter specification and makes a filter_func Callable out of it."""
 | 
			
		||||
 | 
			
		||||
    class VerifyFilter(ast.NodeVisitor):
 | 
			
		||||
        def visit_Name(self, node: ast.Name):
 | 
			
		||||
            # pylint:disable=invalid-name
 | 
			
		||||
            if not isinstance(node.ctx, ast.Load):
 | 
			
		||||
                raise InvalidArgumentException("Invalid filter: Modifying variables ({}) not allowed.".format(node.id))
 | 
			
		||||
            if not hasattr(Post, node.id):
 | 
			
		||||
                raise InvalidArgumentException("Invalid filter: Name {} is not defined.".format(node.id))
 | 
			
		||||
            if node.id in Post.LOGIN_REQUIRING_PROPERTIES and not logged_in:
 | 
			
		||||
                raise InvalidArgumentException("Invalid filter: Name {} requires being logged in.".format(node.id))
 | 
			
		||||
            return self.generic_visit(node)
 | 
			
		||||
 | 
			
		||||
    filter_ast = ast.parse(filter_str, filename='<--only-if parameter>', mode='eval')
 | 
			
		||||
    VerifyFilter().visit(filter_ast)
 | 
			
		||||
 | 
			
		||||
    def filterfunc(post: 'Post') -> bool:
 | 
			
		||||
        class EvaluatePostAttributes(ast.NodeTransformer):
 | 
			
		||||
            def visit_Name(self, node: ast.Name):
 | 
			
		||||
                # pylint:disable=invalid-name,no-self-use
 | 
			
		||||
                obj = post.__getattribute__(node.id)
 | 
			
		||||
                if isinstance(obj, str):
 | 
			
		||||
                    new_node = ast.Str(obj)
 | 
			
		||||
                elif isinstance(obj, int) and not isinstance(obj, bool):
 | 
			
		||||
                    new_node = ast.Num(obj)
 | 
			
		||||
                else:  # True, False or None
 | 
			
		||||
                    new_node = ast.NameConstant(obj)
 | 
			
		||||
                return ast.copy_location(new_node, node)
 | 
			
		||||
        ast_obj = EvaluatePostAttributes().visit(copy.deepcopy(filter_ast))
 | 
			
		||||
        # pylint:disable=eval-used
 | 
			
		||||
        return bool(eval(compile(ast_obj, '', 'eval'), {}))
 | 
			
		||||
 | 
			
		||||
    return filterfunc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Post:
 | 
			
		||||
    """
 | 
			
		||||
    Structure containing information about an Instagram post.
 | 
			
		||||
@@ -132,6 +169,8 @@ class Post:
 | 
			
		||||
    This class unifies access to the properties associated with a post. It implements == and is hashable.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    LOGIN_REQUIRING_PROPERTIES = ["viewer_has_liked"]
 | 
			
		||||
 | 
			
		||||
    def __init__(self, instaloader: 'Instaloader', node: Dict[str, Any], profile: Optional[str] = None):
 | 
			
		||||
        """Create a Post instance from a node structure as returned by Instagram.
 | 
			
		||||
 | 
			
		||||
@@ -182,15 +221,19 @@ class Post:
 | 
			
		||||
                self._full_metadata_dict = pic_json["media"]
 | 
			
		||||
        return self._full_metadata_dict
 | 
			
		||||
 | 
			
		||||
    def __getitem__(self, item):
 | 
			
		||||
        """Implements self[item]. This must not be used from outside of Post. Use the properties instead."""
 | 
			
		||||
        if item in self._node:
 | 
			
		||||
            return self._node[item]
 | 
			
		||||
        return self._full_metadata[item]
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def owner_username(self) -> str:
 | 
			
		||||
        """The Post's lowercase owner name, or 'UNKNOWN'."""
 | 
			
		||||
        try:
 | 
			
		||||
            if self._profile:
 | 
			
		||||
                return self._profile.lower()
 | 
			
		||||
            if 'owner' in self._node and 'username' in self._node['owner']:
 | 
			
		||||
                return self._node['owner']['username'].lower()
 | 
			
		||||
            return self._full_metadata['owner']['username'].lower()
 | 
			
		||||
            return self['owner']['username'].lower()
 | 
			
		||||
        except (InstaloaderException, KeyError, TypeError) as err:
 | 
			
		||||
            self._instaloader.error("Get owner name of {}: {} -- using \'UNKNOWN\'.".format(self, err))
 | 
			
		||||
            return 'UNKNOWN'
 | 
			
		||||
@@ -213,7 +256,7 @@ class Post:
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def sidecar_edges(self) -> List[Dict[str, Any]]:
 | 
			
		||||
        return self._full_metadata['edge_sidecar_to_children']['edges']
 | 
			
		||||
        return self['edge_sidecar_to_children']['edges']
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def caption(self) -> Optional[str]:
 | 
			
		||||
@@ -228,7 +271,7 @@ class Post:
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def video_url(self) -> str:
 | 
			
		||||
        return self._full_metadata['video_url']
 | 
			
		||||
        return self['video_url']
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def viewer_has_liked(self) -> bool:
 | 
			
		||||
@@ -237,15 +280,24 @@ class Post:
 | 
			
		||||
        :raises LoginRequiredException: if not logged in."""
 | 
			
		||||
        if not self._instaloader.is_logged_in:
 | 
			
		||||
            raise LoginRequiredException("Login required to obtain whether viewer has liked {}.".format(self))
 | 
			
		||||
        if 'likes' in self._node:
 | 
			
		||||
        if 'likes' in self._node and 'viewer_has_liked' in self._node['likes']:
 | 
			
		||||
            return self._node['likes']['viewer_has_liked']
 | 
			
		||||
        if 'viewer_has_liked' in self._node:
 | 
			
		||||
            return self._node['viewer_has_liked']
 | 
			
		||||
        return self._full_metadata['viewer_has_liked']
 | 
			
		||||
        return self['viewer_has_liked']
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def likes(self) -> int:
 | 
			
		||||
        """Likes count"""
 | 
			
		||||
        return self['edge_media_preview_like']['count']
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def comments(self) -> int:
 | 
			
		||||
        """Comment count"""
 | 
			
		||||
        return self['edge_media_to_comment']['count']
 | 
			
		||||
 | 
			
		||||
    def get_comments(self) -> Iterator[Dict[str, Any]]:
 | 
			
		||||
        comments_in_metadata = self._full_metadata['edge_media_to_comment']
 | 
			
		||||
        if comments_in_metadata['count'] == len(comments_in_metadata['edges']):
 | 
			
		||||
        """Iterate over all comments of the post."""
 | 
			
		||||
        comments_in_metadata = self['edge_media_to_comment']
 | 
			
		||||
        if self.comments == len(comments_in_metadata['edges']):
 | 
			
		||||
            # If the Post's metadata already contains all comments, don't do GraphQL requests to obtain them
 | 
			
		||||
            yield from (comment['node'] for comment in comments_in_metadata['edges'])
 | 
			
		||||
        yield from self._instaloader.graphql_node_list(17852405266163336, {'shortcode': self.shortcode},
 | 
			
		||||
@@ -254,7 +306,7 @@ class Post:
 | 
			
		||||
 | 
			
		||||
    def get_location(self) -> Optional[Dict[str, str]]:
 | 
			
		||||
        """If the Post has a location, returns a dictionary with fields 'lat' and 'lng'."""
 | 
			
		||||
        loc_dict = self._full_metadata["location"]
 | 
			
		||||
        loc_dict = self["location"]
 | 
			
		||||
        if loc_dict is not None:
 | 
			
		||||
            location_json = self._instaloader.get_json("explore/locations/{0}/".format(loc_dict["id"]),
 | 
			
		||||
                                                       params={'__a': 1})
 | 
			
		||||
@@ -1110,8 +1162,15 @@ class Instaloader:
 | 
			
		||||
    def main(self, profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None,
 | 
			
		||||
             sessionfile: Optional[str] = None, max_count: Optional[int] = None,
 | 
			
		||||
             profile_pic_only: bool = False, fast_update: bool = False,
 | 
			
		||||
             stories: bool = False, stories_only: bool = False) -> None:
 | 
			
		||||
             stories: bool = False, stories_only: bool = False,
 | 
			
		||||
             filter_str: Optional[str] = None) -> None:
 | 
			
		||||
        """Download set of profiles, hashtags etc. and handle logging in and session files if desired."""
 | 
			
		||||
        # Parse and generate filter function
 | 
			
		||||
        if filter_str is not None:
 | 
			
		||||
            filter_func = filterstr_to_filterfunc(filter_str, username is not None)
 | 
			
		||||
            self._log('Only download posts with property "{}".'.format(filter_str))
 | 
			
		||||
        else:
 | 
			
		||||
            filter_func = None
 | 
			
		||||
        # Login, if desired
 | 
			
		||||
        if username is not None:
 | 
			
		||||
            try:
 | 
			
		||||
@@ -1134,7 +1193,8 @@ class Instaloader:
 | 
			
		||||
                if pentry[0] == '#':
 | 
			
		||||
                    self._log("Retrieving pictures with hashtag {0}".format(pentry))
 | 
			
		||||
                    with self._error_catcher():
 | 
			
		||||
                        self.download_hashtag(hashtag=pentry[1:], max_count=max_count, fast_update=fast_update)
 | 
			
		||||
                        self.download_hashtag(hashtag=pentry[1:], max_count=max_count, fast_update=fast_update,
 | 
			
		||||
                                              filter_func=filter_func)
 | 
			
		||||
                elif pentry[0] == '@':
 | 
			
		||||
                    if username is not None:
 | 
			
		||||
                        self._log("Retrieving followees of %s..." % pentry[1:])
 | 
			
		||||
@@ -1143,19 +1203,12 @@ class Instaloader:
 | 
			
		||||
                            targets.update([followee['username'] for followee in followees])
 | 
			
		||||
                    else:
 | 
			
		||||
                        print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr)
 | 
			
		||||
                elif pentry == ":feed-all":
 | 
			
		||||
                elif pentry == ":feed":
 | 
			
		||||
                    if username is not None:
 | 
			
		||||
                        self._log("Retrieving pictures from your feed...")
 | 
			
		||||
                        with self._error_catcher():
 | 
			
		||||
                            self.download_feed_posts(fast_update=fast_update, max_count=max_count)
 | 
			
		||||
                    else:
 | 
			
		||||
                        print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr)
 | 
			
		||||
                elif pentry == ":feed-liked":
 | 
			
		||||
                    if username is not None:
 | 
			
		||||
                        self._log("Retrieving pictures you liked from your feed...")
 | 
			
		||||
                        with self._error_catcher():
 | 
			
		||||
                            self.download_feed_posts(fast_update=fast_update, max_count=max_count,
 | 
			
		||||
                                                     filter_func=lambda post: post.viewer_has_liked)
 | 
			
		||||
                                                     filter_func=filter_func)
 | 
			
		||||
                    else:
 | 
			
		||||
                        print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr)
 | 
			
		||||
                elif pentry == ":stories":
 | 
			
		||||
@@ -1167,19 +1220,21 @@ class Instaloader:
 | 
			
		||||
                else:
 | 
			
		||||
                    targets.add(pentry)
 | 
			
		||||
            if len(targets) > 1:
 | 
			
		||||
                self._log("Downloading %i profiles..." % len(targets))
 | 
			
		||||
                self._log("Downloading {} profiles: {}".format(len(targets), ','.join(targets)))
 | 
			
		||||
            # Iterate through targets list and download them
 | 
			
		||||
            for target in targets:
 | 
			
		||||
                with self._error_catcher():
 | 
			
		||||
                    try:
 | 
			
		||||
                        self.download_profile(target, profile_pic_only, fast_update, stories, stories_only)
 | 
			
		||||
                        self.download_profile(target, profile_pic_only, fast_update, stories, stories_only,
 | 
			
		||||
                                              filter_func=filter_func)
 | 
			
		||||
                    except ProfileNotExistsException as err:
 | 
			
		||||
                        if username is not None:
 | 
			
		||||
                            self._log(err)
 | 
			
		||||
                            self._log("Trying again anonymously, helps in case you are just blocked.")
 | 
			
		||||
                            with self.anonymous_copy() as anonymous_loader:
 | 
			
		||||
                                with self._error_catcher():
 | 
			
		||||
                                    anonymous_loader.download_profile(target, profile_pic_only, fast_update)
 | 
			
		||||
                                    anonymous_loader.download_profile(target, profile_pic_only, fast_update,
 | 
			
		||||
                                                                      filter_func=filter_func)
 | 
			
		||||
                        else:
 | 
			
		||||
                            raise err
 | 
			
		||||
        except KeyboardInterrupt:
 | 
			
		||||
@@ -1207,7 +1262,7 @@ def main():
 | 
			
		||||
    g_what.add_argument('profile', nargs='*', metavar='profile|#hashtag',
 | 
			
		||||
                        help='Name of profile or #hashtag to download. '
 | 
			
		||||
                             'Alternatively, if --login is given: @<profile> to download all followees of '
 | 
			
		||||
                             '<profile>; the special targets :feed-all or :feed-liked to '
 | 
			
		||||
                             '<profile>; the special targets :feed to '
 | 
			
		||||
                             'download pictures from your feed; or :stories to download the stories of your '
 | 
			
		||||
                             'followees.')
 | 
			
		||||
    g_what.add_argument('-P', '--profile-pic-only', action='store_true',
 | 
			
		||||
@@ -1232,6 +1287,10 @@ def main():
 | 
			
		||||
    g_what.add_argument('--stories-only', action='store_true',
 | 
			
		||||
                        help='Rather than downloading regular posts of each specified profile, only download '
 | 
			
		||||
                             'stories. Requires --login.')
 | 
			
		||||
    g_what.add_argument('--only-if', metavar='filter',
 | 
			
		||||
                        help='Expression that, if given, must evaluate to True for each post to be downloaded. Must be '
 | 
			
		||||
                             'a syntactically valid python expression. Variables are evaluated to '
 | 
			
		||||
                             'instaloader.Post attributes. Example: --only-if=viewer_has_liked.')
 | 
			
		||||
 | 
			
		||||
    g_stop = parser.add_argument_group('When to Stop Downloading',
 | 
			
		||||
                                       'If none of these options are given, Instaloader goes through all pictures '
 | 
			
		||||
@@ -1294,6 +1353,10 @@ def main():
 | 
			
		||||
            if args.stories_only:
 | 
			
		||||
                raise SystemExit(1)
 | 
			
		||||
 | 
			
		||||
        if ':feed-all' in args.profile or ':feed-liked' in args.profile:
 | 
			
		||||
            raise SystemExit(":feed-all and :feed-liked were removed. Use :feed as target and "
 | 
			
		||||
                             "eventually --only-if=viewer_has_liked.")
 | 
			
		||||
 | 
			
		||||
        download_videos = Tristate.always if not args.skip_videos else Tristate.no_extra_query
 | 
			
		||||
        download_comments = Tristate.always if args.comments else Tristate.no_extra_query
 | 
			
		||||
        download_captions = Tristate.no_extra_query if not args.no_captions else Tristate.never
 | 
			
		||||
@@ -1315,7 +1378,7 @@ def main():
 | 
			
		||||
        loader.main(args.profile, args.login.lower() if args.login is not None else None, args.password,
 | 
			
		||||
                    args.sessionfile,
 | 
			
		||||
                    int(args.count) if args.count is not None else None,
 | 
			
		||||
                    args.profile_pic_only, args.fast_update, args.stories, args.stories_only)
 | 
			
		||||
                    args.profile_pic_only, args.fast_update, args.stories, args.stories_only, args.only_if)
 | 
			
		||||
    except InstaloaderException as err:
 | 
			
		||||
        raise SystemExit("Fatal error: %s" % err)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user