Hashtag class

This commit is contained in:
Alexander Graf 2020-04-17 18:47:45 +02:00
parent 5c5c633bee
commit 83a0f52c60
7 changed files with 183 additions and 33 deletions

View File

@ -37,7 +37,7 @@ picture, video or sidecar (set of multiple pictures/videos) posted in a user's
profile. :class:`Instaloader` provides methods to iterate over Posts from a profile. :class:`Instaloader` provides methods to iterate over Posts from a
certain source:: certain source::
for post in L.get_hashtag_posts('cat'): for post in instaloader.Hashtag.from_name(L.context, 'cat').get_posts():
# post is an instance of instaloader.Post # post is an instance of instaloader.Post
L.download_post(post, target='#cat') L.download_post(post, target='#cat')
@ -63,7 +63,7 @@ certain source::
- :meth:`Instaloader.get_explore_posts` - :meth:`Instaloader.get_explore_posts`
Media that is suggested by Instagram to explore. Media that is suggested by Instagram to explore.
- :meth:`Instaloader.get_hashtag_posts` - :meth:`Hashtag.get_posts`
Media associated with given hashtag. Media associated with given hashtag.
With the :class:`Profile` class, Instaloader also makes it easy to access With the :class:`Profile` class, Instaloader also makes it easy to access
@ -155,6 +155,14 @@ Profiles
.. autoclass:: Profile .. autoclass:: Profile
:no-show-inheritance: :no-show-inheritance:
Hashtags
""""""""
.. autoclass:: Hashtag
:no-show-inheritance:
.. versionadded:: 4.4
TopSearchResults TopSearchResults
"""""""""""""""" """"""""""""""""
@ -166,7 +174,7 @@ TopSearchResults
Loading and Saving Loading and Saving
"""""""""""""""""" """"""""""""""""""
:class:`Post`, :class:`StoryItem` and :class:`Profile` can be saved and loaded :class:`Post`, :class:`StoryItem`, :class:`Profile` and :class:`Hashtag` can be saved and loaded
to/from JSON files. to/from JSON files.
.. autofunction:: load_structure_from_file .. autofunction:: load_structure_from_file

View File

@ -2,7 +2,7 @@ import instaloader
L = instaloader.Instaloader() L = instaloader.Instaloader()
posts = L.get_hashtag_posts('urbanphotography') posts = instaloader.Hashtag.from_name(L.context, 'urbanphotography').get_posts()
users = set() users = set()

View File

@ -5,7 +5,7 @@ import instaloader
L = instaloader.Instaloader() L = instaloader.Instaloader()
posts = L.get_hashtag_posts('urbanphotography') posts = instaloader.Hashtag.from_name(L.context, 'urbanphotography').get_posts()
# or # or
# posts = instaloader.Profile.from_username(L.context, PROFILE).get_posts() # posts = instaloader.Profile.from_username(L.context, PROFILE).get_posts()

View File

@ -15,5 +15,5 @@ else:
from .exceptions import * from .exceptions import *
from .instaloader import Instaloader from .instaloader import Instaloader
from .instaloadercontext import InstaloaderContext from .instaloadercontext import InstaloaderContext
from .structures import (Highlight, Post, PostSidecarNode, PostComment, PostCommentAnswer, PostLocation, Profile, Story, from .structures import (Hashtag, Highlight, Post, PostSidecarNode, PostComment, PostCommentAnswer, PostLocation,
StoryItem, TopSearchResults, load_structure_from_file, save_structure_to_file) Profile, Story, StoryItem, TopSearchResults, load_structure_from_file, save_structure_to_file)

View File

@ -21,7 +21,7 @@ import urllib3 # type: ignore
from .exceptions import * from .exceptions import *
from .instaloadercontext import InstaloaderContext from .instaloadercontext import InstaloaderContext
from .structures import (Highlight, JsonExportable, Post, PostLocation, Profile, Story, StoryItem, from .structures import (Hashtag, Highlight, JsonExportable, Post, PostLocation, Profile, Story, StoryItem,
save_structure_to_file, load_structure_from_file) save_structure_to_file, load_structure_from_file)
@ -879,21 +879,13 @@ class Instaloader:
data.get('rhx_gis'))) data.get('rhx_gis')))
def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]: def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]:
"""Get Posts associated with a #hashtag.""" """Get Posts associated with a #hashtag.
has_next_page = True
end_cursor = None
while has_next_page:
if end_cursor:
params = {'__a': 1, 'max_id': end_cursor}
else:
params = {'__a': 1}
hashtag_data = self.context.get_json('explore/tags/{0}/'.format(hashtag),
params)['graphql']['hashtag']['edge_hashtag_to_media']
yield from (Post(self.context, edge['node']) for edge in hashtag_data['edges'])
has_next_page = hashtag_data['page_info']['has_next_page']
end_cursor = hashtag_data['page_info']['end_cursor']
def download_hashtag(self, hashtag: str, .. deprecated:: 4.4
Use :meth:`Hashtag.get_posts`."""
return Hashtag.from_name(self.context, hashtag).get_posts()
def download_hashtag(self, hashtag: Union[Hashtag, str],
max_count: Optional[int] = None, max_count: Optional[int] = None,
post_filter: Optional[Callable[[Post], bool]] = None, post_filter: Optional[Callable[[Post], bool]] = None,
fast_update: bool = False) -> None: fast_update: bool = False) -> None:
@ -904,14 +896,17 @@ class Instaloader:
loader = Instaloader() loader = Instaloader()
loader.download_hashtag('cat', max_count=30) loader.download_hashtag('cat', max_count=30)
:param hashtag: Hashtag to download, without leading '#' :param hashtag: Hashtag to download, as instance of :class:`Hashtag`, or string without leading '#'
:param max_count: Maximum count of pictures to download :param max_count: Maximum count of pictures to download
:param post_filter: function(post), which returns True if given picture should be downloaded :param post_filter: function(post), which returns True if given picture should be downloaded
:param fast_update: If true, abort when first already-downloaded picture is encountered :param fast_update: If true, abort when first already-downloaded picture is encountered
""" """
hashtag = hashtag.lower() if isinstance(hashtag, str):
self.context.log("Retrieving pictures with hashtag {}...".format(hashtag)) with self.context.error_catcher("Get hashtag #{}".format(hashtag)):
self.posts_download_loop(self.get_hashtag_posts(hashtag), "#" + hashtag, fast_update, post_filter, hashtag = Hashtag.from_name(self.context, hashtag)
assert isinstance(hashtag, Hashtag)
self.context.log("Retrieving pictures with hashtag #{}...".format(hashtag.name))
self.posts_download_loop(hashtag.get_posts(), "#" + hashtag.name, fast_update, post_filter,
max_count=max_count) max_count=max_count)
def download_tagged(self, profile: Profile, fast_update: bool = False, def download_tagged(self, profile: Profile, fast_update: bool = False,

View File

@ -1130,6 +1130,140 @@ class Highlight(Story):
yield from (StoryItem(self._context, item, self.owner_profile) for item in self._items) yield from (StoryItem(self._context, item, self.owner_profile) for item in self._items)
class Hashtag:
"""
An Hashtag.
Analogous to :class:`Profile`, get an instance with::
L = Instaloader()
hashtag = Hashtag.from_name(L.context, HASHTAG)
To then download the Hashtag's Posts, do::
for post in hashtag.get_posts():
L.download_post(post, target="#"+hashtag.name)
Also, this class implements == and is hashable.
"""
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
assert "name" in node
self._context = context
self._node = node
self._has_full_metadata = False
@classmethod
def from_name(cls, context: InstaloaderContext, name: str):
"""
Create a Hashtag instance from a given hashtag name, without preceeding '#'. Raises an Exception if there is no
hashtag with the given name.
:param context: :attr:`Instaloader.context`
:param name: Hashtag, without preceeding '#'
:raises: :class:`QueryReturnedNotFoundException`
"""
# pylint:disable=protected-access
hashtag = cls(context, {'name': name.lower()})
hashtag._obtain_metadata()
return hashtag
@property
def name(self):
"""Hashtag name, without preceeding '#'"""
return self._node["name"]
def _query(self, params):
return self._context.get_json("explore/tags/{0}/".format(self.name),
params)["graphql"]["hashtag"]
def _obtain_metadata(self):
if not self._has_full_metadata:
self._node = self._query({"__a": 1})
self._has_full_metadata = True
def _asdict(self):
return self._node
def __repr__(self):
return "<Hashtag #{}>".format(self.name)
def __eq__(self, other: object) -> bool:
if isinstance(other, Hashtag):
return self.name.lower() == other.name.lower()
return NotImplemented
def __hash__(self) -> int:
return hash(self.name)
def _metadata(self, *keys) -> Any:
try:
d = self._node
for key in keys:
d = d[key]
return d
except KeyError:
self._obtain_metadata()
d = self._node
for key in keys:
d = d[key]
return d
@property
def hashtagid(self) -> int:
return int(self._metadata("id"))
@property
def profile_pic_url(self) -> str:
return self._metadata("profile_pic_url")
@property
def description(self) -> str:
return self._metadata("description")
@property
def allow_following(self) -> bool:
return self._metadata("allow_following")
@property
def is_following(self) -> bool:
return self._metadata("is_following")
@property
def is_top_media_only(self) -> bool:
return self._metadata("is_top_media_only")
def get_related_tags(self) -> Iterator["Hashtag"]:
"""Yields similar hashtags."""
yield from (Hashtag(self._context, edge["node"])
for edge in self._metadata("edge_hashtag_to_related_tags", "edges"))
def get_top_posts(self) -> Iterator[Post]:
"""Yields the top posts of the hashtag."""
yield from (Post(self._context, edge["node"])
for edge in self._metadata("edge_hashtag_to_top_posts", "edges"))
@property
def mediacount(self) -> int:
"""
The count of all media associated with this hashtag.
The number of posts with a certain hashtag may differ from the number of posts that can actually be accessed, as
the hashtag count might include private posts
"""
return self._metadata("edge_hashtag_to_media", "count")
def get_posts(self) -> Iterator[Post]:
"""Yields the posts associated with this hashtag."""
self._metadata("edge_hashtag_to_media", "edges")
self._metadata("edge_hashtag_to_media", "page_info")
conn = self._metadata("edge_hashtag_to_media")
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
while conn["page_info"]["has_next_page"]:
data = self._query({'__a': 1, 'max_id': conn["page_info"]["end_cursor"]})
conn = data["edge_hashtag_to_media"]
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
class TopSearchResults: class TopSearchResults:
""" """
An invocation of this class triggers a search on Instagram for the provided search string. An invocation of this class triggers a search on Instagram for the provided search string.
@ -1189,6 +1323,17 @@ class TopSearchResults:
if name: if name:
yield name yield name
def get_hashtags(self) -> Iterator[Hashtag]:
"""
Provides the hashtags from the search result.
.. versionadded:: 4.4
"""
for hashtag in self._node.get('hashtags', []):
node = hashtag.get('hashtag', {})
if 'name' in node:
yield Hashtag(self._context, node)
@property @property
def searchstring(self) -> str: def searchstring(self) -> str:
""" """
@ -1197,17 +1342,17 @@ class TopSearchResults:
return self._searchstring return self._searchstring
JsonExportable = Union[Post, Profile, StoryItem] JsonExportable = Union[Post, Profile, StoryItem, Hashtag]
def save_structure_to_file(structure: JsonExportable, filename: str) -> None: def save_structure_to_file(structure: JsonExportable, filename: str) -> None:
"""Saves a :class:`Post`, :class:`Profile` or :class:`StoryItem` to a '.json' or '.json.xz' file such that it can """Saves a :class:`Post`, :class:`Profile`, :class:`StoryItem` or :class:`Hashtag` to a '.json' or '.json.xz' file
later be loaded by :func:`load_structure_from_file`. such that it can later be loaded by :func:`load_structure_from_file`.
If the specified filename ends in '.xz', the file will be LZMA compressed. Otherwise, a pretty-printed JSON file If the specified filename ends in '.xz', the file will be LZMA compressed. Otherwise, a pretty-printed JSON file
will be created. will be created.
:param structure: :class:`Post`, :class:`Profile` or :class:`StoryItem` :param structure: :class:`Post`, :class:`Profile`, :class:`StoryItem` or :class:`Hashtag`
:param filename: Filename, ends in '.json' or '.json.xz' :param filename: Filename, ends in '.json' or '.json.xz'
""" """
json_structure = {'node': structure._asdict(), json_structure = {'node': structure._asdict(),
@ -1222,8 +1367,8 @@ def save_structure_to_file(structure: JsonExportable, filename: str) -> None:
def load_structure_from_file(context: InstaloaderContext, filename: str) -> JsonExportable: def load_structure_from_file(context: InstaloaderContext, filename: str) -> JsonExportable:
"""Loads a :class:`Post`, :class:`Profile` or :class:`StoryItem` from a '.json' or '.json.xz' file that """Loads a :class:`Post`, :class:`Profile`, :class:`StoryItem` or :class:`Hashtag` from a '.json' or '.json.xz' file
has been saved by :func:`save_structure_to_file`. that has been saved by :func:`save_structure_to_file`.
:param context: :attr:`Instaloader.context` linked to the new object, used for additional queries if neccessary. :param context: :attr:`Instaloader.context` linked to the new object, used for additional queries if neccessary.
:param filename: Filename, ends in '.json' or '.json.xz' :param filename: Filename, ends in '.json' or '.json.xz'
@ -1244,6 +1389,8 @@ def load_structure_from_file(context: InstaloaderContext, filename: str) -> Json
return Profile(context, json_structure['node']) return Profile(context, json_structure['node'])
elif node_type == "StoryItem": elif node_type == "StoryItem":
return StoryItem(context, json_structure['node']) return StoryItem(context, json_structure['node'])
elif node_type == "Hashtag":
return Hashtag(context, json_structure['node'])
else: else:
raise InvalidArgumentException("{}: Not an Instaloader JSON.".format(filename)) raise InvalidArgumentException("{}: Not an Instaloader JSON.".format(filename))
elif 'shortcode' in json_structure: elif 'shortcode' in json_structure:

View File

@ -73,7 +73,7 @@ class TestInstaloaderAnonymously(unittest.TestCase):
self.L.download_hashtag(HASHTAG, NORMAL_MAX_COUNT) self.L.download_hashtag(HASHTAG, NORMAL_MAX_COUNT)
def test_hashtag_paging(self): def test_hashtag_paging(self):
for count, post in enumerate(self.L.get_hashtag_posts(HASHTAG)): for count, post in enumerate(instaloader.Hashtag.from_name(L.context, HASHTAG).get_posts()):
print(post) print(post)
if count == PAGING_MAX_COUNT: if count == PAGING_MAX_COUNT:
break break