diff options
| author | Tulir Asokan <tulir@maunium.net> | 2020-09-13 03:56:28 +0300 |
|---|---|---|
| committer | Tulir Asokan <tulir@maunium.net> | 2020-09-13 03:56:28 +0300 |
| commit | 80bcf6d0acdf35f082a6765db989ef80100f20fb (patch) | |
| tree | 89a02e12c0a4094863a80e2ff00591cb3551cb94 /import.py | |
| parent | de79aea53568cf00f07610ceb233cf4f69837e81 (diff) | |
Reorganize Python stuff and add command to create packs
Fixes #11
Diffstat (limited to 'import.py')
| -rw-r--r-- | import.py | 263 |
1 files changed, 0 insertions, 263 deletions
diff --git a/import.py b/import.py deleted file mode 100644 index cd917ed..0000000 --- a/import.py +++ /dev/null @@ -1,263 +0,0 @@ -# Copyright (c) 2020 Tulir Asokan -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. -from typing import Dict, Optional, TYPE_CHECKING -from io import BytesIO -import argparse -import os.path -import asyncio -import json -import re - -from aiohttp import ClientSession -from yarl import URL -from PIL import Image - -from telethon import TelegramClient -from telethon.tl.functions.messages import GetAllStickersRequest, GetStickerSetRequest -from telethon.tl.types.messages import AllStickers -from telethon.tl.types import InputStickerSetShortName, Document, DocumentAttributeSticker -from telethon.tl.types.messages import StickerSet as StickerSetFull - -parser = argparse.ArgumentParser() -parser.add_argument("--list", help="List your saved sticker packs", action="store_true") -parser.add_argument("--session", help="Telethon session file name", default="sticker-import") -parser.add_argument("--config", help="Path to JSON file with Matrix homeserver and access_token", - type=str, default="config.json") -parser.add_argument("--output-dir", help="Directory to write packs to", default="web/packs/", - type=str) -parser.add_argument("pack", help="Sticker pack URLs to import", action="append", nargs="*") -args = parser.parse_args() -loop = asyncio.get_event_loop() - - -async def whoami(url: URL, access_token: str) -> str: - headers = {"Authorization": f"Bearer {access_token}"} - async with ClientSession() as sess, sess.get(url, headers=headers) as resp: - resp.raise_for_status() - user_id = (await resp.json())["user_id"] - print(f"Access token validated (user ID: {user_id})") - return user_id - - -try: - with open(args.config) as config_file: - config = json.load(config_file) - homeserver_url = config["homeserver"] - access_token = config["access_token"] -except FileNotFoundError: - print("Matrix config file not found. Please enter your homeserver and access token.") - homeserver_url = input("Homeserver URL: ") - access_token = input("Access token: ") - whoami_url = URL(homeserver_url) / "_matrix" / "client" / "r0" / "account" / "whoami" - user_id = loop.run_until_complete(whoami(whoami_url, access_token)) - with open(args.config, "w") as config_file: - json.dump({ - "homeserver": homeserver_url, - "user_id": user_id, - "access_token": access_token - }, config_file) - print(f"Wrote config to {args.config}") - -upload_url = URL(homeserver_url) / "_matrix" / "media" / "r0" / "upload" - - -async def upload(data: bytes, mimetype: str, filename: str) -> str: - url = upload_url.with_query({"filename": filename}) - headers = {"Content-Type": mimetype, "Authorization": f"Bearer {access_token}"} - async with ClientSession() as sess, sess.post(url, data=data, headers=headers) as resp: - return (await resp.json())["content_uri"] - - -if TYPE_CHECKING: - from typing import TypedDict - - - class MatrixMediaInfo(TypedDict): - w: int - h: int - size: int - mimetype: str - thumbnail_url: Optional[str] - thumbnail_info: Optional['MatrixMediaInfo'] - - - class MatrixStickerInfo(TypedDict, total=False): - body: str - url: str - info: MatrixMediaInfo - id: str - - -def convert_image(data: bytes) -> (bytes, int, int): - image: Image.Image = Image.open(BytesIO(data)).convert("RGBA") - new_file = BytesIO() - image.save(new_file, "png") - w, h = image.size - return new_file.getvalue(), w, h - - -async def reupload_document(client: TelegramClient, document: Document) -> 'MatrixStickerInfo': - print(f"Reuploading {document.id}", end="", flush=True) - data = await client.download_media(document, file=bytes) - print(".", end="", flush=True) - data, width, height = convert_image(data) - print(".", end="", flush=True) - mxc = await upload(data, "image/png", f"{document.id}.png") - print(".", flush=True) - if width > 256 or height > 256: - # Set the width and height to lower values so clients wouldn't show them as huge images - if width > height: - height = int(height / (width / 256)) - width = 256 - else: - width = int(width / (height / 256)) - height = 256 - return { - "body": "", - "url": mxc, - "info": { - "w": width, - "h": height, - "size": len(data), - "mimetype": "image/png", - - # Element iOS compatibility hack - "thumbnail_url": mxc, - "thumbnail_info": { - "w": width, - "h": height, - "size": len(data), - "mimetype": "image/png", - }, - }, - } - - -def add_to_index(name: str) -> None: - index_path = os.path.join(args.output_dir, "index.json") - try: - with open(index_path) as index_file: - index_data = json.load(index_file) - except (FileNotFoundError, json.JSONDecodeError): - index_data = {"packs": []} - if "homeserver_url" not in index_data: - index_data["homeserver_url"] = homeserver_url - if name not in index_data["packs"]: - index_data["packs"].append(name) - with open(index_path, "w") as index_file: - json.dump(index_data, index_file, indent=" ") - print(f"Added {name} to {index_path}") - - -def add_meta(document: Document, info: 'MatrixStickerInfo', pack: StickerSetFull) -> None: - for attr in document.attributes: - if isinstance(attr, DocumentAttributeSticker): - info["body"] = attr.alt - info["id"] = f"tg-{document.id}" - info["net.maunium.telegram.sticker"] = { - "pack": { - "id": str(pack.set.id), - "short_name": pack.set.short_name, - }, - "id": str(document.id), - "emoticons": [], - } - - -async def reupload_pack(client: TelegramClient, pack: StickerSetFull) -> None: - if pack.set.animated: - print("Animated stickerpacks are currently not supported") - return - - pack_path = os.path.join(args.output_dir, f"{pack.set.short_name}.json") - try: - os.mkdir(os.path.dirname(pack_path)) - except FileExistsError: - pass - - print(f"Reuploading {pack.set.title} with {pack.set.count} stickers " - f"and writing output to {pack_path}") - - already_uploaded = {} - try: - with open(pack_path) as pack_file: - existing_pack = json.load(pack_file) - already_uploaded = {int(sticker["net.maunium.telegram.sticker"]["id"]): sticker - for sticker in existing_pack["stickers"]} - print(f"Found {len(already_uploaded)} already reuploaded stickers") - except FileNotFoundError: - pass - - reuploaded_documents: Dict[int, 'MatrixStickerInfo'] = {} - for document in pack.documents: - try: - reuploaded_documents[document.id] = already_uploaded[document.id] - print(f"Skipped reuploading {document.id}") - except KeyError: - reuploaded_documents[document.id] = await reupload_document(client, document) - # Always ensure the body and telegram metadata is correct - add_meta(document, reuploaded_documents[document.id], pack) - - for sticker in pack.packs: - if not sticker.emoticon: - continue - for document_id in sticker.documents: - doc = reuploaded_documents[document_id] - # If there was no sticker metadata, use the first emoji we find - if doc["body"] == "": - doc["body"] = sticker.emoticon - doc["net.maunium.telegram.sticker"]["emoticons"].append(sticker.emoticon) - - with open(pack_path, "w") as pack_file: - json.dump({ - "title": pack.set.title, - "id": f"tg-{pack.set.id}", - "net.maunium.telegram.pack": { - "short_name": pack.set.short_name, - "hash": str(pack.set.hash), - }, - "stickers": list(reuploaded_documents.values()), - }, pack_file, ensure_ascii=False) - print(f"Saved {pack.set.title} as {pack.set.short_name}.json") - - add_to_index(os.path.basename(pack_path)) - - -pack_url_regex = re.compile(r"^(?:(?:https?://)?(?:t|telegram)\.(?:me|dog)/addstickers/)?" - r"([A-Za-z0-9-_]+)" - r"(?:\.json)?$") - - -async def main(): - client = TelegramClient(args.session, 298751, "cb676d6bae20553c9996996a8f52b4d7") - await client.start() - - if args.list: - stickers: AllStickers = await client(GetAllStickersRequest(hash=0)) - index = 1 - width = len(str(stickers.sets)) - print("Your saved sticker packs:") - for saved_pack in stickers.sets: - print(f"{index:>{width}}. {saved_pack.title} " - f"(t.me/addstickers/{saved_pack.short_name})") - elif args.pack[0]: - input_packs = [] - for pack_url in args.pack[0]: - match = pack_url_regex.match(pack_url) - if not match: - print(f"'{pack_url}' doesn't look like a sticker pack URL") - return - input_packs.append(InputStickerSetShortName(short_name=match.group(1))) - for input_pack in input_packs: - pack: StickerSetFull = await client(GetStickerSetRequest(input_pack)) - await reupload_pack(client, pack) - else: - parser.print_help() - - await client.disconnect() - - -loop.run_until_complete(main()) |
