Annotate all types
This commit is contained in:
parent
13ebcb782e
commit
86f8b2f018
@ -6,6 +6,9 @@ of profiles (even if private), from your feed or from all followees of a given p
|
|||||||
import re, json, datetime, shutil, os, time, random, sys, pickle, getpass, tempfile
|
import re, json, datetime, shutil, os, time, random, sys, pickle, getpass, tempfile
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
from numbers import Real
|
||||||
|
from typing import List, Optional, Any, Dict, Callable
|
||||||
|
|
||||||
import requests, requests.utils
|
import requests, requests.utils
|
||||||
|
|
||||||
# To get version from setup.py for instaloader --version
|
# To get version from setup.py for instaloader --version
|
||||||
@ -58,7 +61,7 @@ def _log(*msg, sep='', end='\n', flush=False, quiet=False):
|
|||||||
print(*msg, sep=sep, end=end, flush=flush)
|
print(*msg, sep=sep, end=end, flush=flush)
|
||||||
|
|
||||||
|
|
||||||
def get_json(name, session, max_id=0, sleep=True):
|
def get_json(name: str, session: requests.Session, max_id: int = 0, sleep: bool = True) -> Optional[Dict[str, Any]]:
|
||||||
"""Return JSON of a profile"""
|
"""Return JSON of a profile"""
|
||||||
resp = session.get('http://www.instagram.com/'+name,
|
resp = session.get('http://www.instagram.com/'+name,
|
||||||
params={'max_id': max_id})
|
params={'max_id': max_id})
|
||||||
@ -71,7 +74,7 @@ def get_json(name, session, max_id=0, sleep=True):
|
|||||||
return json.loads(match.group(0)[21:-2])
|
return json.loads(match.group(0)[21:-2])
|
||||||
|
|
||||||
|
|
||||||
def get_username_by_id(session, profile_id):
|
def get_username_by_id(session: requests.Session, profile_id: int) -> str:
|
||||||
"""To get the current username of a profile, given its unique ID, this function can be used.
|
"""To get the current username of a profile, given its unique ID, this function can be used.
|
||||||
session is required to be a logged-in (i.e. non-anonymous) session."""
|
session is required to be a logged-in (i.e. non-anonymous) session."""
|
||||||
tempsession = copy_session(session)
|
tempsession = copy_session(session)
|
||||||
@ -92,7 +95,7 @@ def get_username_by_id(session, profile_id):
|
|||||||
str(profile_id) + ").")
|
str(profile_id) + ").")
|
||||||
|
|
||||||
|
|
||||||
def get_id_by_username(profile):
|
def get_id_by_username(profile: str) -> int:
|
||||||
"""Each Instagram profile has its own unique ID which stays unmodified even if a user changes
|
"""Each Instagram profile has its own unique ID which stays unmodified even if a user changes
|
||||||
his/her username. To get said ID, given the profile's name, you may call this function."""
|
his/her username. To get said ID, given the profile's name, you may call this function."""
|
||||||
data = get_json(profile, get_anonymous_session())
|
data = get_json(profile, get_anonymous_session())
|
||||||
@ -101,11 +104,11 @@ def get_id_by_username(profile):
|
|||||||
return int(data['entry_data']['ProfilePage'][0]['user']['id'])
|
return int(data['entry_data']['ProfilePage'][0]['user']['id'])
|
||||||
|
|
||||||
|
|
||||||
def _epoch_to_string(epoch):
|
def _epoch_to_string(epoch: Real) -> str:
|
||||||
return datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d_%H-%M-%S')
|
return datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d_%H-%M-%S')
|
||||||
|
|
||||||
|
|
||||||
def get_followees(profile, session):
|
def get_followees(profile: str, session: requests.Session) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Retrieve list of followees of given profile
|
Retrieve list of followees of given profile
|
||||||
|
|
||||||
@ -162,7 +165,7 @@ def get_followees(profile, session):
|
|||||||
raise LoginRequiredException("Login required to gather followees.")
|
raise LoginRequiredException("Login required to gather followees.")
|
||||||
|
|
||||||
|
|
||||||
def download_pic(name, url, date_epoch, outputlabel=None, quiet=False):
|
def download_pic(name: str, url: str, date_epoch: Real, outputlabel: Optional[str] = None, quiet: bool = False) -> bool:
|
||||||
"""Downloads and saves picture with given url under given directory with given timestamp.
|
"""Downloads and saves picture with given url under given directory with given timestamp.
|
||||||
Returns true, if file was actually downloaded, i.e. updated."""
|
Returns true, if file was actually downloaded, i.e. updated."""
|
||||||
if outputlabel is None:
|
if outputlabel is None:
|
||||||
@ -186,7 +189,7 @@ def download_pic(name, url, date_epoch, outputlabel=None, quiet=False):
|
|||||||
raise ConnectionException("File \'" + url + "\' could not be downloaded.")
|
raise ConnectionException("File \'" + url + "\' could not be downloaded.")
|
||||||
|
|
||||||
|
|
||||||
def save_caption(name, date_epoch, caption, shorter_output=False, quiet=False):
|
def save_caption(name: str, date_epoch: Real, caption: str, shorter_output: bool = False, quiet: bool = False) -> None:
|
||||||
"""Updates picture caption"""
|
"""Updates picture caption"""
|
||||||
filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '.txt'
|
filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '.txt'
|
||||||
pcaption = caption.replace('\n', ' ').strip()
|
pcaption = caption.replace('\n', ' ').strip()
|
||||||
@ -229,7 +232,7 @@ def save_caption(name, date_epoch, caption, shorter_output=False, quiet=False):
|
|||||||
os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch))
|
os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch))
|
||||||
|
|
||||||
|
|
||||||
def save_location(name, location_json, date_epoch):
|
def save_location(name: str, location_json: Dict[str, str], date_epoch: Real) -> None:
|
||||||
filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '_location.txt'
|
filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '_location.txt'
|
||||||
location_string = location_json["name"]+"\n" + \
|
location_string = location_json["name"]+"\n" + \
|
||||||
"https://maps.google.com/maps?q={0},{1}&ll={0},{1}\n" \
|
"https://maps.google.com/maps?q={0},{1}&ll={0},{1}\n" \
|
||||||
@ -240,7 +243,7 @@ def save_location(name, location_json, date_epoch):
|
|||||||
os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch))
|
os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch))
|
||||||
|
|
||||||
|
|
||||||
def download_profilepic(name, url, quiet=False):
|
def download_profilepic(name: str, url: str, quiet: bool = False) -> None:
|
||||||
"""Downloads and saves profile pic with given url."""
|
"""Downloads and saves profile pic with given url."""
|
||||||
date_object = datetime.datetime.strptime(requests.head(url).headers["Last-Modified"], \
|
date_object = datetime.datetime.strptime(requests.head(url).headers["Last-Modified"], \
|
||||||
'%a, %d %b %Y %H:%M:%S GMT')
|
'%a, %d %b %Y %H:%M:%S GMT')
|
||||||
@ -267,14 +270,14 @@ def download_profilepic(name, url, quiet=False):
|
|||||||
raise ConnectionException("File \'" + url + "\' could not be downloaded.")
|
raise ConnectionException("File \'" + url + "\' could not be downloaded.")
|
||||||
|
|
||||||
|
|
||||||
def get_default_session_filename(username):
|
def get_default_session_filename(username: str) -> str:
|
||||||
"""Returns default session filename for given username."""
|
"""Returns default session filename for given username."""
|
||||||
dirname = tempfile.gettempdir() + "/" + ".instaloader-" + getpass.getuser()
|
dirname = tempfile.gettempdir() + "/" + ".instaloader-" + getpass.getuser()
|
||||||
filename = dirname + "/" + "session-" + username
|
filename = dirname + "/" + "session-" + username
|
||||||
return filename
|
return filename
|
||||||
|
|
||||||
|
|
||||||
def save_session(session, username, filename=None, quiet=False):
|
def save_session(session: requests.Session, username: str, filename: Optional[str] = None, quiet: bool = False) -> None:
|
||||||
"""Saves requests.Session object."""
|
"""Saves requests.Session object."""
|
||||||
if filename is None:
|
if filename is None:
|
||||||
filename = get_default_session_filename(username)
|
filename = get_default_session_filename(username)
|
||||||
@ -288,7 +291,7 @@ def save_session(session, username, filename=None, quiet=False):
|
|||||||
_log("Saved session to %s." % filename, quiet=quiet)
|
_log("Saved session to %s." % filename, quiet=quiet)
|
||||||
|
|
||||||
|
|
||||||
def load_session(username, filename=None, quiet=False):
|
def load_session(username: str, filename: Optional[str] = None, quiet: bool = False) -> requests.Session:
|
||||||
"""Returns loaded requests.Session object, or None if not found."""
|
"""Returns loaded requests.Session object, or None if not found."""
|
||||||
if filename is None:
|
if filename is None:
|
||||||
filename = get_default_session_filename(username)
|
filename = get_default_session_filename(username)
|
||||||
@ -304,7 +307,7 @@ def load_session(username, filename=None, quiet=False):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def copy_session(session):
|
def copy_session(session: requests.Session) -> requests.Session:
|
||||||
"""Duplicates a requests.Session."""
|
"""Duplicates a requests.Session."""
|
||||||
new = requests.Session()
|
new = requests.Session()
|
||||||
new.cookies = \
|
new.cookies = \
|
||||||
@ -313,7 +316,7 @@ def copy_session(session):
|
|||||||
return new
|
return new
|
||||||
|
|
||||||
|
|
||||||
def test_login(session):
|
def test_login(session: requests.Session) -> Optional[str]:
|
||||||
"""Returns the Instagram username to which given requests.Session object belongs, or None."""
|
"""Returns the Instagram username to which given requests.Session object belongs, or None."""
|
||||||
if session is None:
|
if session is None:
|
||||||
return
|
return
|
||||||
@ -324,7 +327,7 @@ def test_login(session):
|
|||||||
return data['config']['viewer']['username']
|
return data['config']['viewer']['username']
|
||||||
|
|
||||||
|
|
||||||
def default_http_header(empty_session_only=False):
|
def default_http_header(empty_session_only: bool = False) -> Dict[str, str]:
|
||||||
"""Returns default HTTP header we use for requests."""
|
"""Returns default HTTP header we use for requests."""
|
||||||
user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
|
user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
|
||||||
'(KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36'
|
'(KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36'
|
||||||
@ -347,7 +350,7 @@ def default_http_header(empty_session_only=False):
|
|||||||
return header
|
return header
|
||||||
|
|
||||||
|
|
||||||
def get_anonymous_session():
|
def get_anonymous_session() -> requests.Session:
|
||||||
"""Returns our default anonymous requests.Session object."""
|
"""Returns our default anonymous requests.Session object."""
|
||||||
session = requests.Session()
|
session = requests.Session()
|
||||||
session.cookies.update({'sessionid' : '', 'mid' : '', 'ig_pr' : '1', \
|
session.cookies.update({'sessionid' : '', 'mid' : '', 'ig_pr' : '1', \
|
||||||
@ -357,7 +360,7 @@ def get_anonymous_session():
|
|||||||
return session
|
return session
|
||||||
|
|
||||||
|
|
||||||
def get_session(user, passwd):
|
def get_session(user: str, passwd: str) -> requests.Session:
|
||||||
"""Log in to instagram with given username and password and return session object"""
|
"""Log in to instagram with given username and password and return session object"""
|
||||||
session = requests.Session()
|
session = requests.Session()
|
||||||
session.cookies.update({'sessionid' : '', 'mid' : '', 'ig_pr' : '1', \
|
session.cookies.update({'sessionid' : '', 'mid' : '', 'ig_pr' : '1', \
|
||||||
@ -380,7 +383,7 @@ def get_session(user, passwd):
|
|||||||
raise ConnectionException('Login error! Connection error!')
|
raise ConnectionException('Login error! Connection error!')
|
||||||
|
|
||||||
|
|
||||||
def get_feed_json(session, end_cursor=None, sleep=True):
|
def get_feed_json(session: requests.Session, end_cursor: str = None, sleep: bool = True) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Get JSON of the user's feed.
|
Get JSON of the user's feed.
|
||||||
|
|
||||||
@ -425,7 +428,7 @@ def get_feed_json(session, end_cursor=None, sleep=True):
|
|||||||
return json.loads(resp.text)
|
return json.loads(resp.text)
|
||||||
|
|
||||||
|
|
||||||
def get_location(node, session, sleep=True):
|
def get_location(node: Dict[str, str], session: requests.Session, sleep: bool = True) -> Dict[str, str]:
|
||||||
pic_json = get_json("p/" + node["code"], session, sleep=sleep)
|
pic_json = get_json("p/" + node["code"], session, sleep=sleep)
|
||||||
if pic_json["entry_data"]["PostPage"][0]["media"]["location"] is not None:
|
if pic_json["entry_data"]["PostPage"][0]["media"]["location"] is not None:
|
||||||
location_json = get_json("explore/locations/" +
|
location_json = get_json("explore/locations/" +
|
||||||
@ -434,9 +437,9 @@ def get_location(node, session, sleep=True):
|
|||||||
return location_json["entry_data"]["LocationsPage"][0]["location"]
|
return location_json["entry_data"]["LocationsPage"][0]["location"]
|
||||||
|
|
||||||
|
|
||||||
def download_node(node, session, name,
|
def download_node(node: Dict[str, Any], session: requests.Session, name: str,
|
||||||
download_videos=True, geotags=False,
|
download_videos: bool = True, geotags: bool = False,
|
||||||
sleep=True, shorter_output=False, quiet=False):
|
sleep: bool = True, shorter_output: bool = False, quiet: bool = False) -> bool:
|
||||||
"""
|
"""
|
||||||
Download everything associated with one instagram node, i.e. picture, caption and video.
|
Download everything associated with one instagram node, i.e. picture, caption and video.
|
||||||
|
|
||||||
@ -444,6 +447,7 @@ def download_node(node, session, name,
|
|||||||
:param session: Session
|
:param session: Session
|
||||||
:param name: Name of profile to which this node belongs
|
:param name: Name of profile to which this node belongs
|
||||||
:param download_videos: True, if videos should be downloaded
|
:param download_videos: True, if videos should be downloaded
|
||||||
|
:param geotags: Download geotags
|
||||||
:param sleep: Sleep between requests to instagram server
|
:param sleep: Sleep between requests to instagram server
|
||||||
:param shorter_output: Shorten log output by not printing captions
|
:param shorter_output: Shorten log output by not printing captions
|
||||||
:param quiet: Suppress output
|
:param quiet: Suppress output
|
||||||
@ -469,9 +473,10 @@ def download_node(node, session, name,
|
|||||||
return downloaded
|
return downloaded
|
||||||
|
|
||||||
|
|
||||||
def download_feed_pics(session, max_count=None, fast_update=False, filter_func=None,
|
def download_feed_pics(session: requests.Session, max_count: int = None, fast_update: bool = False,
|
||||||
download_videos=True, geotags=False,
|
filter_func: Optional[Callable[Dict[str, Dict[str, Any]], bool]] = None,
|
||||||
shorter_output=False, sleep=True, quiet=False):
|
download_videos: bool = True, geotags: bool = False,
|
||||||
|
shorter_output: bool = False, sleep: bool = True, quiet: bool = False) -> None:
|
||||||
"""
|
"""
|
||||||
Download pictures from the user's feed.
|
Download pictures from the user's feed.
|
||||||
|
|
||||||
@ -484,6 +489,7 @@ def download_feed_pics(session, max_count=None, fast_update=False, filter_func=N
|
|||||||
:param fast_update: If true, abort when first already-downloaded picture is encountered
|
:param fast_update: If true, abort when first already-downloaded picture is encountered
|
||||||
:param filter_func: function(node), which returns True if given picture should not be downloaded
|
:param filter_func: function(node), which returns True if given picture should not be downloaded
|
||||||
:param download_videos: True, if videos should be downloaded
|
:param download_videos: True, if videos should be downloaded
|
||||||
|
:param geotags: Download geotags
|
||||||
:param shorter_output: Shorten log output by not printing captions
|
:param shorter_output: Shorten log output by not printing captions
|
||||||
:param sleep: Sleep between requests to instagram server
|
:param sleep: Sleep between requests to instagram server
|
||||||
:param quiet: Suppress output
|
:param quiet: Suppress output
|
||||||
@ -509,7 +515,7 @@ def download_feed_pics(session, max_count=None, fast_update=False, filter_func=N
|
|||||||
sleep=sleep)
|
sleep=sleep)
|
||||||
|
|
||||||
|
|
||||||
def check_id(profile, session, json_data, quiet=False):
|
def check_id(profile: str, session: requests.Session, json_data: Dict[str, Any], quiet: bool = False) -> str:
|
||||||
"""
|
"""
|
||||||
Consult locally stored ID of profile with given name, check whether ID matches and whether name
|
Consult locally stored ID of profile with given name, check whether ID matches and whether name
|
||||||
has changed and return current name of the profile, and store ID of profile.
|
has changed and return current name of the profile, and store ID of profile.
|
||||||
@ -546,8 +552,9 @@ def check_id(profile, session, json_data, quiet=False):
|
|||||||
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
|
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
|
||||||
|
|
||||||
|
|
||||||
def download(name, session, profile_pic_only=False, download_videos=True, geotags=False,
|
def download(name: str, session: requests.Session, profile_pic_only: bool = False, download_videos: bool = True,
|
||||||
fast_update=False, shorter_output=False, sleep=True, quiet=False):
|
geotags: bool = False,
|
||||||
|
fast_update: bool = False, shorter_output: bool = False, sleep: bool = True, quiet: bool = False) -> None:
|
||||||
"""Download one profile"""
|
"""Download one profile"""
|
||||||
# pylint:disable=too-many-branches,too-many-locals
|
# pylint:disable=too-many-branches,too-many-locals
|
||||||
# Get profile main page json
|
# Get profile main page json
|
||||||
@ -599,7 +606,8 @@ def download(name, session, profile_pic_only=False, download_videos=True, geotag
|
|||||||
return
|
return
|
||||||
data = get_json(name, session, max_id=get_last_id(data), sleep=sleep)
|
data = get_json(name, session, max_id=get_last_id(data), sleep=sleep)
|
||||||
|
|
||||||
def get_logged_in_session(username, password=None, quiet=False):
|
|
||||||
|
def get_logged_in_session(username: str, password: Optional[str] = None, quiet: bool = False) -> requests.Session:
|
||||||
"""Logs in and returns session, asking user for password if needed"""
|
"""Logs in and returns session, asking user for password if needed"""
|
||||||
if password is not None:
|
if password is not None:
|
||||||
return get_session(username, password)
|
return get_session(username, password)
|
||||||
@ -614,9 +622,12 @@ def get_logged_in_session(username, password=None, quiet=False):
|
|||||||
print(err, file=sys.stderr)
|
print(err, file=sys.stderr)
|
||||||
password = None
|
password = None
|
||||||
|
|
||||||
def download_profiles(profilelist, username=None, password=None, sessionfile=None,
|
|
||||||
profile_pic_only=False, download_videos=True, geotags=False, fast_update=False,
|
def download_profiles(profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None,
|
||||||
sleep=True, shorter_output=False, quiet=False):
|
sessionfile: Optional[str] = None,
|
||||||
|
profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False,
|
||||||
|
fast_update: bool = False,
|
||||||
|
sleep: bool = True, shorter_output: bool = False, quiet: bool = False) -> None:
|
||||||
"""Download set of profiles and handle sessions"""
|
"""Download set of profiles and handle sessions"""
|
||||||
# pylint:disable=too-many-branches,too-many-locals
|
# pylint:disable=too-many-branches,too-many-locals
|
||||||
# Login, if desired
|
# Login, if desired
|
||||||
|
Loading…
x
Reference in New Issue
Block a user