From 691021cf8fe8b0ad4e9858cee4e8a4a74915c479 Mon Sep 17 00:00:00 2001 From: king-millez Date: Fri, 23 Dec 2022 23:53:01 +1100 Subject: [PATCH] Rewrite entire codebase. Professionalism and that. Innit --- CODE_OF_CONDUCT.md | 76 ---------- README.md | 28 ++-- setup.py | 43 +++--- snapmap_archiver.py | 5 + snapmap_archiver/Coordinates.py | 10 +- snapmap_archiver/SnapmapArchiver.py | 208 ++++++++++++++++++++++------ snapmap_archiver/__init__.py | 143 +++---------------- snapmap_archiver/__main__.py | 3 +- snapmap_archiver/get_data.py | 70 ---------- snapmap_archiver/snap.py | 9 ++ snapmap_archiver/utils/__init__.py | 113 --------------- 11 files changed, 242 insertions(+), 466 deletions(-) delete mode 100644 CODE_OF_CONDUCT.md create mode 100644 snapmap_archiver.py delete mode 100644 snapmap_archiver/get_data.py create mode 100644 snapmap_archiver/snap.py delete mode 100644 snapmap_archiver/utils/__init__.py diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md deleted file mode 100644 index 960413e..0000000 --- a/CODE_OF_CONDUCT.md +++ /dev/null @@ -1,76 +0,0 @@ -# Contributor Covenant Code of Conduct - -## Our Pledge - -In the interest of fostering an open and welcoming environment, we as -contributors and maintainers pledge to making participation in our project and -our community a harassment-free experience for everyone, regardless of age, body -size, disability, ethnicity, sex characteristics, gender identity and expression, -level of experience, education, socio-economic status, nationality, personal -appearance, race, religion, or sexual identity and orientation. - -## Our Standards - -Examples of behavior that contributes to creating a positive environment -include: - -* Using welcoming and inclusive language -* Being respectful of differing viewpoints and experiences -* Gracefully accepting constructive criticism -* Focusing on what is best for the community -* Showing empathy towards other community members - -Examples of unacceptable behavior by participants include: - -* The use of sexualized language or imagery and unwelcome sexual attention or - advances -* Trolling, insulting/derogatory comments, and personal or political attacks -* Public or private harassment -* Publishing others' private information, such as a physical or electronic - address, without explicit permission -* Other conduct which could reasonably be considered inappropriate in a - professional setting - -## Our Responsibilities - -Project maintainers are responsible for clarifying the standards of acceptable -behavior and are expected to take appropriate and fair corrective action in -response to any instances of unacceptable behavior. - -Project maintainers have the right and responsibility to remove, edit, or -reject comments, commits, code, wiki edits, issues, and other contributions -that are not aligned to this Code of Conduct, or to ban temporarily or -permanently any contributor for other behaviors that they deem inappropriate, -threatening, offensive, or harmful. - -## Scope - -This Code of Conduct applies both within project spaces and in public spaces -when an individual is representing the project or its community. Examples of -representing a project or community include using an official project e-mail -address, posting via an official social media account, or acting as an appointed -representative at an online or offline event. Representation of a project may be -further defined and clarified by project maintainers. - -## Enforcement - -Instances of abusive, harassing, or otherwise unacceptable behavior may be -reported by contacting the project team at Millez.Dev@gmail.com. All -complaints will be reviewed and investigated and will result in a response that -is deemed necessary and appropriate to the circumstances. The project team is -obligated to maintain confidentiality with regard to the reporter of an incident. -Further details of specific enforcement policies may be posted separately. - -Project maintainers who do not follow or enforce the Code of Conduct in good -faith may face temporary or permanent repercussions as determined by other -members of the project's leadership. - -## Attribution - -This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, -available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html - -[homepage]: https://www.contributor-covenant.org - -For answers to common questions about this code of conduct, see -https://www.contributor-covenant.org/faq diff --git a/README.md b/README.md index 7b12a0f..32e99ff 100644 --- a/README.md +++ b/README.md @@ -6,24 +6,16 @@ A tool written in Python to download all Snapmaps content from a specific locati ## Setup -`pip3 install snapmap-archiver` +`pip install snapmap-archiver` [View on PyPI](https://pypi.org/project/snapmap-archiver/) -Install dependencies with `pip3`. +Install dependencies with `pip`. ```sh -pip3 install -r requirements.txt +pip install -r requirements.txt ``` -### Install [aria2c](http://aria2.github.io/) - -Download `aria2c` from here: - -[https://aria2.github.io/](https://aria2.github.io/) - -This is the downloader used for the fastest Snap download speeds. - ## Usage ```sh @@ -45,12 +37,12 @@ python3 -m snapmap_archiver -o ~/Desktop/snap -l='123.123,123.123' -l '445.445,4 #### Input File -With `-t`, you can specify a file containing a list of line-separated Snap URLs or IDs. +With `-f` or `--file`, you can specify a file containing a list of line-separated Snap URLs or IDs. E.g ```sh -python3 -m snapmap_archiver -o ~/Desktop/snap -t ~/snaps.txt +python3 -m snapmap_archiver -o ~/Desktop/snaps -t ~/Desktop/snaps.txt ``` Inside `snaps.txt`: @@ -64,20 +56,18 @@ https://map.snapchat.com/ttp/snap/Example/ #### Snap URL -You can also just pass 1 or more normal Snap URLs to the package to download it individually like this: +You can also just pass 1 or more normal Snap URLs or IDs to the package to download it individually like this: ```sh -python3 -m snapmap_archiver -o ~/Desktop/snap 'https://map.snapchat.com/ttp/snap/Example/@-33.643495,115.741281,11.86z' +python3 -m snapmap_archiver -o ~/Desktop/snap 'https://map.snapchat.com/ttp/snap/Example/@-33.643495,115.741281,11.86z' 'Example' ``` #### Export JSON You can export a JSON file with info about downloaded snaps with the `--write-json` argument, which will contain information like the time the Snap was posted, and the Snap location. +It will write `archive.json` to the specified output directory. + #### Snap Radius The radius from the coordinates you provide that will be included for downloads. `-r 20000` will download all Snaps within a 20km radius of your coordinates. - -#### No Overlay - -By default the script merges the video and the overlay file into one file. With the `--no-overlay` argument you can disable this and only download the raw video. diff --git a/setup.py b/setup.py index 62479e4..23ceffc 100644 --- a/setup.py +++ b/setup.py @@ -1,32 +1,31 @@ -from setuptools import setup, find_packages -import pathlib +# -*- coding: utf-8 -*- +"""setup.py: setuptools control.""" -here = pathlib.Path(__file__).parent.resolve() +import re +from setuptools import setup -long_description = (here / "README.md").read_text(encoding="utf-8") +version = '2.0' +with open("README.md", "r") as f: + long_descr = f.read() setup( - name="snapmap-archiver", - version="1.3.1", - description="Download all Snapmaps content from a specific location.", - long_description=long_description, - long_description_content_type="text/markdown", - url="https://github.com/king-millez/snapmap-archiver", - author="king-millez", - author_email="millez.dev@gmail.com", - classifiers=[ - "Programming Language :: Python :: 3", - "Operating System :: OS Independent", - ], - packages=find_packages(), - include_package_data=True, - python_requires=">=3.6", + name = "snapmap-archiver", + packages = ["snapmap_archiver"], + entry_points = { + "console_scripts": ['snapmap-archiver = snapmap_archiver:main'] + }, + version = version, + description = "Download all Snapmaps content from a specific location.", + long_description = long_descr, + author = "Miles Greenwark", + author_email = "millez.dev@gmail.com", + url = "https://github.com/king-millez/snapmap-archiver", + python_requires=">=3.10", install_requires=[ "certifi", "chardet", "idna", "requests", "urllib3", - ], - entry_points={"console_scripts": ["snapmap-archiver=snapmap_archiver:main"]}, -) + ] +) \ No newline at end of file diff --git a/snapmap_archiver.py b/snapmap_archiver.py new file mode 100644 index 0000000..56e9744 --- /dev/null +++ b/snapmap_archiver.py @@ -0,0 +1,5 @@ +import snapmap_archiver + + +if __name__ == '__main__': + snapmap_archiver.main() diff --git a/snapmap_archiver/Coordinates.py b/snapmap_archiver/Coordinates.py index c1f5fd6..7916f83 100644 --- a/snapmap_archiver/Coordinates.py +++ b/snapmap_archiver/Coordinates.py @@ -4,6 +4,14 @@ def __init__(self, coord_str: str): if ',' not in coord_str: raise ValueError(f'No comma is present in the provided coordinates.{self.geo_msg}') try: - self.lat, self.long = coord_str.split(',', 1) + lat, long = coord_str.split(',', 1) + self.lat = float(lat) + self.long = float(long) except Exception: raise ValueError(f'Provided coordinates could not be split to lat/long points.{self.geo_msg}') + + def __str__(self) -> str: + return f'Lat: {self.lat}, Lon: {self.long}' + + def __repr__(self) -> str: + return f'({self.lat},{self.long})' diff --git a/snapmap_archiver/SnapmapArchiver.py b/snapmap_archiver/SnapmapArchiver.py index 6896b1a..300f2a3 100644 --- a/snapmap_archiver/SnapmapArchiver.py +++ b/snapmap_archiver/SnapmapArchiver.py @@ -1,52 +1,180 @@ import os -from snapmap_archiver.Coordinates import Coordinates +import re +import json +import requests +from time import sleep +from typing import Iterable + +from snapmap_archiver.coordinates import Coordinates +from snapmap_archiver.snap import Snap + + +MAX_RADIUS = 85_000 +ISSUES_URL = 'https://github.com/king-millez/snapmap-archiver/issues/new/choose' class SnapmapArchiver: - def __init__(self) -> None: + def __init__(self, *args, **kwargs) -> None: + self.write_json = kwargs.get('write_json') + self.all_snaps = {} + self.arg_snaps = args + self.coords_list = [] self.radius = 10_000 - self.max_radius = 85_000 + self.zoom_depth = kwargs.get('zoom_depth') or 5 # TODO change this 5 to a default const somewhere? + self.input_file = '' - def main(self, **kwargs): - if kwargs['ffmpeg_path']: - if not os.path.isfile(kwargs['ffmpeg_path']): - raise FileNotFoundError('Please provide a valid file for --ffmpeg-path') - self.ffmpeg_path = kwargs['ffmpeg_path'] + if not kwargs['locations'] and not args and not kwargs['input_file']: + raise ValueError('Some sort of input is required; location (-l), input file (--file), and raw Snap IDs are all valid options.') if not kwargs['output_dir']: raise ValueError('Output directory (-o) is required.') - if not os.path.isdir(kwargs['output_dir']): - os.makedirs(kwargs['output_dir'], exist_ok=True) # Python's exception handling has us covered here - self.output_dir = kwargs['output_dir'] - if not kwargs['location']: - raise ValueError('location (-l) is required.') - - self.coords = Coordinates(kwargs['location']) - - if kwargs['radius'] > self.max_radius: - print('Supplied radius value is too large (above 85,000). Defaulting to 85000.') - self.radius = self.max_radius - - # def api_query(coords: Coordinates, zl=5, max_radius=10000): - # available_snaps = [] - # current_iteration = max_radius - # _epoch = get_epoch() - # try: - # print('Querying Snaps...') - # while current_iteration != 1: - # payload = {"requestGeoPoint":{"lat":lat,"lon":lon},"zoomLevel":zl,"tileSetId":{"flavor":"default","epoch":_epoch,"type":1},"radiusMeters":current_iteration,"maximumFuzzRadius":0} - # req_headers['Content-Length'] = str(len(str(payload))) - # api_data = json.loads(requests.post('https://ms.sc-jpl.com/web/getPlaylist', headers=req_headers, json=payload).text) - # available_snaps = available_snaps + api_data['manifest']['elements'] - # if(current_iteration > 2000): - # current_iteration = current_iteration - 2000 - # elif(current_iteration > 1000): - # current_iteration = current_iteration - 100 - # else: - # current_iteration = 1 - # return [i for n, i in enumerate(available_snaps) if i not in available_snaps[n + 1:]] - # except: - # sys.exit("You seem to have been rate limited, please wait and try again.") \ No newline at end of file + if not os.path.isdir(self.output_dir): + os.makedirs(self.output_dir, exist_ok=True) # Python's exception handling has us covered here + + if kwargs.get('radius'): + self.radius = MAX_RADIUS if kwargs['radius'] > MAX_RADIUS else kwargs['radius'] + + # Query provided coordinates for Snaps + if kwargs.get('locations'): + self.coords_list = [Coordinates(latlon[0]) for latlon in kwargs['locations']] + + # Check input file for Snap IDs + if kwargs.get('input_file'): + self.input_file = kwargs['input_file'] + + def download_snaps(self, group: Iterable[Snap] | Snap): + if isinstance(group, Snap): + group = [group] + for snap in group: + fpath = os.path.join(self.output_dir, f'{snap.snap_id}.{snap.file_type}') + if os.path.isfile(fpath): + print(f' - {fpath} already exists.') + continue + with open(fpath, 'wb') as f: + f.write(requests.get(snap.url).content) + print(f' - Downloaded {fpath}.') + + def query_snaps(self, snaps: str | Iterable[str]) -> dict[str, str] | None: + if isinstance(snaps, str): + snaps = [snaps] # The Snap query endpoint can take multiple IDs, so here we can query 1 or more snaps with ease. + to_query = [] + for snap_id in snaps: + rgx_match = re.search(r"(?:https?:\/\/map\.snapchat\.com\/ttp\/snap\/)?(W7_(?:[aA-zZ0-9\-_\+]{22})(?:[aA-zZ0-9-_\+]{28})AAAAA[AQ])(?:\/?@-?[0-9]{1,3}\.?[0-9]{0,},-?[0-9]{1,3}\.?[0-9]{0,}(?:,[0-9]{1,3}\.?[0-9]{0,}z))?", snap_id) + if not rgx_match: + print(f'{snap_id} is not a valid Snap URL or ID.') + continue + to_query.append(rgx_match.group(1)) + return [self._parse_snap(snap) for snap in requests.post( + "https://ms.sc-jpl.com/web/getStoryElements", json={"snapIds": to_query} + ).json()['elements']] + + def query_coords(self, coords: Coordinates): + to_download = {} + current_iteration = self.radius + epoch = self._get_epoch() + while current_iteration != 1: + print(f"Querying with radius {current_iteration}...") + while True: + api_data = requests.post( + "https://ms.sc-jpl.com/web/getPlaylist", + headers={ + "Content-Type": "application/json", + }, + json={ + "requestGeoPoint": {"lat": coords.lat, "lon": coords.long}, + "zoomLevel": self.zoom_depth, + "tileSetId": {"flavor": "default", "epoch": epoch, "type": 1}, + "radiusMeters": current_iteration, + "maximumFuzzRadius": 0, + } + ).text + if api_data: + if api_data.strip() == 'Too many requests': + print('You have been rate limited. Sleeping for 1 minute.') + sleep(60) + else: + try: + json_data = json.loads(api_data)['manifest']['elements'] + break + except requests.exceptions.JSONDecodeError: + print('You have been rate limited. Sleeping for 1 minute.') + sleep(60) + + for snap in json_data: + if to_download.get(snap['id']): # Avoids downloading duplicates. Faster than a list because the Snap ID is indexed + continue + parsed = self._parse_snap(snap) + if not parsed: + continue + to_download[snap['id']] = parsed + + if current_iteration > 2000: + current_iteration -= 2000 + elif current_iteration > 1000: + current_iteration -= 100 + else: + current_iteration = 1 + + print(f'Found {len(list(to_download.keys()))} Snaps') + return self._transform_index(to_download.values()) + + def main(self): + # Query provided coordinates + if self.coords_list: + for coords in self.coords_list: + self.download_snaps(self.query_coords(coords)) + + # Download Snaps from input file + if self.input_file: + if os.path.isfile(self.input_file): + with open(self.input_file, "r") as f: + to_format = f.read().split("\n") + self.download_snaps(self.query_snaps(to_format)) + else: + raise FileNotFoundError('Input file does not exist.') + + # Download Snaps provided from the command line + self.download_snaps(self.query_snaps(self.arg_snaps)) + + if self.write_json: + with open(os.path.join(self.output_dir, 'archive.json'), 'w') as f: + f.write(json.dumps(self._transform_index(self.all_snaps), indent=2)) + + def _transform_index(index: dict[str, dict[str, str]]): + return [v for v in index.values()] + + def _parse_snap(self, snap: dict): + data_dict = {'create_time': snap['timestamp'], 'snap_id': snap['id']} + if snap['snapInfo'].get('snapMediaType'): + data_dict['file_type'] = 'mp4' + elif snap['snapInfo'].get('streamingMediaInfo'): + data_dict['file_type'] = 'jpg' + else: + print(f'**Unknown Snap type detected!**\n\tID: {snap["id"]}\n\tSnap data: {json.dumps(snap)}\nPlease report this at {ISSUES_URL}\n') + return + url = snap['snapInfo']['streamingMediaInfo'].get('mediaUrl') + if not url: + return + data_dict['url'] = url + if not self.all_snaps.get(snap['id']): + self.all_snaps[snap['id']] = data_dict + return Snap(**data_dict) + + def _get_epoch(self): + epoch_endpoint = requests.post( + "https://ms.sc-jpl.com/web/getLatestTileSet", + headers={"Content-Type": "application/json"}, + json={}, + ).json() + if epoch_endpoint: + for entry in epoch_endpoint["tileSetInfos"]: + if entry["id"]["type"] == "HEAT": + return entry["id"]["epoch"] + else: + raise self.MissingEpochError(f'The API epoch could not be obtained.\n\nPlease report this at {ISSUES_URL}') + + class MissingEpochError(Exception): + pass diff --git a/snapmap_archiver/__init__.py b/snapmap_archiver/__init__.py index 2743c6f..bcec74f 100644 --- a/snapmap_archiver/__init__.py +++ b/snapmap_archiver/__init__.py @@ -1,25 +1,16 @@ -import os -import re -import sys import argparse -from . import get_data -from .utils import organise_media, download_media, match_snap_id +from snapmap_archiver.SnapmapArchiver import SnapmapArchiver + + +USAGE_MSG = 'snapmap_archiver -o [OUTPUT DIR] -l="[LATITUDE],[LONGITUDE]"\n\nUse -h to display more options.' def main(): - USAGE_MSG = 'snapmap_archiver -o [OUTPUT DIR] -l="[LATITUDE],[LONGITUDE]" [SNAP URL (optional)]' - geo_msg = ( - "Use comma seperated values for " 'latitude/longitude, e.g: -l="35.0,67.0"' - ) - parser = argparse.ArgumentParser( - description="Download content from Snapmaps", usage=USAGE_MSG - ) - parser.add_argument( - "-o", - dest="output_dir", - type=str, - help="Output directory for downloaded content.", - ) + parser = argparse.ArgumentParser(description='Download content from Snapmaps', usage=USAGE_MSG) + parser.add_argument('-o', dest='output_dir', type=str, help='Output directory for downloaded content.') + parser.add_argument('-z', dest='zoom_depth', type=float, help='Snapmaps zoom depth, default is 5.', default=5) + parser.add_argument('-r', dest='radius', type=int, help='Maximum Snap radius in meters, default is 30,000.', default=30_000) + parser.add_argument('--write-json', dest='write_json', action='store_true', default=False, help='Write Snap metadata JSON.') parser.add_argument( "-l", "--location", @@ -29,111 +20,15 @@ def main(): action="append", nargs="*", ) - parser.add_argument( - "-z", dest="zoom_depth", type=float, help="Snapmaps zoom depth, default is 5." - ) - parser.add_argument( - "-r", - dest="radius", - type=int, - help="Maximum Snap radius in meters, default is 30000.", - ) - parser.add_argument( - "--file", - dest="input_file", - type=str, - help="File containing line-separated Snap URLs or IDs", - ) - parser.add_argument( - "--write-json", - dest="write_json", - action="store_true", - default=False, - help="Write Snap metadata JSON.", - ) - parser.add_argument( - "--no-overlay", - dest="no_overlay", - action="store_true", - default=False, - help="Do not use ffmpeg to merge graphical " - "elements to video Snaps. Default is False", - ) + parser.add_argument('-f', "--file", dest="input_file", type=str, help="File containing line-separated Snap URLs or IDs") args, unknown = parser.parse_known_args() - snap_ids = [] - if unknown: - snap_ids = [ - match_snap_id(i) - for i in unknown - if re.match( - r"https?:\/\/map\.snapchat\.com\/ttp\/snap\/W7_(?:[aA-zZ0-9\-_\+]{22})(?:[aA-zZ0-9-_\+]{28})AAAAAA\/?(?:@-?[0-9]{1,3}\.?[0-9]{0,},-?[0-9]{1,3}\.?[0-9]{0,}(?:,[0-9]{1,3}\.?[0-9]{0,}z))?", - i, - ) - ] - - if args.input_file: - if os.path.isfile(args.input_file): - with open(args.input_file, "r") as f: - to_format = f.read().split("\n") - for candidate in to_format: - if len(candidate) > 0: - try: - snap_ids.append(match_snap_id(candidate)) - except Exception: - print(f"{candidate} is not a valid Snap URL or ID.") - else: - sys.exit("Input file does not exist.") - - if not args.output_dir: - print("Output directory (-o) is required.") - sys.exit(USAGE_MSG) - - if not args.location and not snap_ids and not args.input_file: - print("No form of input was supplied.") - sys.exit(USAGE_MSG) - - if args.location: - for coords in args.location: - if "," not in coords[0]: - sys.exit(f"{geo_msg}\n{coords} is not valid.") - - if not os.path.isdir(args.output_dir): - try: - os.mkdir(args.output_dir) - except PermissionError: - sys.exit(f'Could not create directory "{args.output_dir}"') - - if not args.radius: - args.radius = 30000 - elif args.radius > 85000: - print( - "Supplied radius value is too large " "(above 85,000). Defaulting to 85000." - ) - args.radius = 85000 - - if args.location: - for coords in args.location: - try: - geo_data = coords[0].split(",", 1) - except Exception: - sys.exit(geo_msg) - api_response = get_data.api_query( - float(geo_data[0]), float(geo_data[1]), max_radius=args.radius - ) - download_media( - args.output_dir, - organise_media(api_response), - args.write_json, - args.no_overlay, - ) - - if snap_ids: - download_media( - args.output_dir, - organise_media( - get_data.api_query(snap_ids=snap_ids, mode="snap")["elements"] - ), - args.write_json, - args.no_overlay, - ) + sm_archiver = SnapmapArchiver( + *unknown, + radius=args.radius, + output_dir=args.output_dir, + locations=args.location, + zoom_depth=args.zoom_depth, + write_json=args.write_json, + input_file=args.input_file) + sm_archiver.main() diff --git a/snapmap_archiver/__main__.py b/snapmap_archiver/__main__.py index e0abfb8..56e9744 100644 --- a/snapmap_archiver/__main__.py +++ b/snapmap_archiver/__main__.py @@ -1,4 +1,5 @@ import snapmap_archiver -if __name__ == "__main__": + +if __name__ == '__main__': snapmap_archiver.main() diff --git a/snapmap_archiver/get_data.py b/snapmap_archiver/get_data.py deleted file mode 100644 index 13f647c..0000000 --- a/snapmap_archiver/get_data.py +++ /dev/null @@ -1,70 +0,0 @@ -import sys -import json -import requests -from typing import Dict, List - - -def get_epoch(): - for entry in json.loads( - requests.post( - "https://ms.sc-jpl.com/web/getLatestTileSet", - headers={"Content-Type": "application/json"}, - json={}, - ).text - )["tileSetInfos"]: - if entry["id"]["type"] == "HEAT": - return entry["id"]["epoch"] - - -def api_query( - lat: float = 0, - lon: float = 0.0, - zl: float = 5.72, - max_radius: int = 30000, - mode: str = "loc", - snap_ids: list = [], -) -> Dict[str, List]: - if mode == "loc": - available_snaps = [] - current_iteration = max_radius - _epoch = get_epoch() - try: - print("Querying Snaps...") - while current_iteration != 1: - print(f"Querying with radius {current_iteration}...") - payload = { - "requestGeoPoint": {"lat": lat, "lon": lon}, - "zoomLevel": zl, - "tileSetId": {"flavor": "default", "epoch": _epoch, "type": 1}, - "radiusMeters": current_iteration, - "maximumFuzzRadius": 0, - } - api_data = requests.post( - "https://ms.sc-jpl.com/web/" "getPlaylist", - headers={ - "Content-Type": "application/json", - }, - json=payload, - ).json() - available_snaps = available_snaps + api_data["manifest"]["elements"] - if current_iteration > 2000: - current_iteration = current_iteration - 2000 - elif current_iteration > 1000: - current_iteration = current_iteration - 100 - else: - current_iteration = 1 - print("Sorting list of Snaps...") - return [ - i - for n, i in enumerate(available_snaps) - if i not in available_snaps[n + 1 :] - ] - except Exception: - sys.exit( - "You seem to have been rate limited, " "please wait and try again." - ) - - if mode == "snap": - return requests.post( - "https://ms.sc-jpl.com/web/getStoryElements", json={"snapIds": snap_ids} - ).json() diff --git a/snapmap_archiver/snap.py b/snapmap_archiver/snap.py new file mode 100644 index 0000000..472d94d --- /dev/null +++ b/snapmap_archiver/snap.py @@ -0,0 +1,9 @@ +from dataclasses import dataclass + + +@dataclass +class Snap: + snap_id: str + url: str + create_time: int + file_type: str diff --git a/snapmap_archiver/utils/__init__.py b/snapmap_archiver/utils/__init__.py deleted file mode 100644 index 91db9a2..0000000 --- a/snapmap_archiver/utils/__init__.py +++ /dev/null @@ -1,113 +0,0 @@ -import sys -import json -import subprocess -import os -import re - - -def match_snap_id(url: str) -> str: - return re.search( - r"(W7_(?:[aA-zZ0-9\-_\+]{22})(?:[aA-zZ0-9-_\+]{28})AAAAAA)", url - ).group(1) - - -def organise_media(api_data): - to_download = [] - print(f"Found {len(api_data)} Snaps.") - for entry in api_data: - data_dict = {"id": entry["id"], "create_time": entry["timestamp"], "media": {}} - try: - for locale in entry["snapInfo"]["title"]["strings"]: - if locale["locale"] == "en": - data_dict["location"] = locale["text"] - except KeyError: - data_dict["location"] = entry["snapInfo"]["localitySubtitle"]["fallback"] - try: - data_dict["media"]["overlayText"] = entry["snapInfo"]["overlayText"] - except KeyError: - data_dict["media"]["overlayText"] = None - try: - data_dict["media"]["raw_url"] = ( - entry["snapInfo"]["streamingMediaInfo"]["prefixUrl"] + "media.mp4" - ) - data_dict["media"]["filetype"] = "mp4" - try: - data_dict["media"]["video_overlay"] = ( - entry["snapInfo"]["streamingMediaInfo"]["prefixUrl"] - + entry["snapInfo"]["streamingMediaInfo"]["overlayUrl"] - ) - except KeyError: - data_dict["media"]["video_overlay"] = None - except KeyError: - try: - data_dict["media"]["raw_url"] = entry["snapInfo"]["publicMediaInfo"][ - "publicImage" "MediaInfo" - ]["mediaUrl"] - data_dict["media"]["filetype"] = "jpg" - except KeyError: - for i in entry["snapInfo"].items(): - if ( - i[0] == "streamingThumbnailInfo" - ): # For some reason JSON throws an error if you just query this key directly, so you have to do it this way. - data_dict["media"]["raw_url"] = i[1]["infos"][-1][ - "thumbnailUrl" - ] - if len(data_dict["media"]) == 0: - continue # If there's no video file and no video/image thumbnail, just skip the snap since there's nothing to download - to_download.append(data_dict) - return to_download - - -def download_media(output_dir, organised_data, dl_json=False, no_overlay=False): - for index, snap in enumerate(organised_data): - DL_MSG = f"Snap {index + 1}/{len(organised_data)} downloading..." - - filename = snap["location"] + " - " + snap["create_time"] + " - " + snap["id"] - if dl_json: - with open(f"{output_dir}/" + filename + ".info.json", "w") as json_file: - json_file.write(json.dumps(snap, indent=2)) - if sys.platform == "win32": - cmd = ["aria2c.exe", snap["media"]["raw_url"], "-d", output_dir, "-o"] - else: - cmd = ["aria2c", snap["media"]["raw_url"], "-d", output_dir, "-o"] - if snap["media"]["raw_url"][-3:] == "mp4": - if os.path.exists(f"{cmd[-2]}/" + filename + ".mp4"): - print(f"Snap {index + 1}/{len(organised_data)} " "already downloaded.") - else: - print(DL_MSG + f" - {filename}.mp4") - # Download snap without overlay - if no_overlay: - subprocess.run(cmd + [filename + ".mp4"], capture_output=True) - else: - if snap["media"]["video_overlay"] is not None: - merge_overlay = [ - "ffmpeg", - "-y", - "-i", - snap["media"]["raw_url"], - "-i", - snap["media"]["video_overlay"], - "-filter_complex", - "[1][0]scale2ref[i][m];[m][i]overlay[v]", - "-map", - "[v]", - "-map", - "0:a?", - "-ac", - "2", - ] - # Merge video and overlay to one file using ffmpeg - subprocess.run( - merge_overlay + [f"{cmd[-2]}/{filename}.mp4"], - capture_output=True, - ) - # Delete temp file - else: - subprocess.run(cmd + [filename + ".mp4"], capture_output=True) - - else: - if os.path.exists(f"{cmd[-2]}/" + filename + ".jpg"): - print(f"Snap {index + 1}/{len(organised_data)} " "already downloaded.") - else: - print(DL_MSG + f" - {filename}.jpg") - subprocess.run(cmd + [filename + ".jpg"], capture_output=True)