aboutsummaryrefslogtreecommitdiffstats
path: root/sticker
diff options
context:
space:
mode:
Diffstat (limited to 'sticker')
-rw-r--r--sticker/__init__.py0
-rw-r--r--sticker/import.py158
-rw-r--r--sticker/lib/__init__.py0
-rw-r--r--sticker/lib/matrix.py77
-rw-r--r--sticker/lib/util.py67
-rw-r--r--sticker/pack.py99
-rw-r--r--sticker/scalar_convert.py41
7 files changed, 442 insertions, 0 deletions
diff --git a/sticker/__init__.py b/sticker/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sticker/__init__.py
diff --git a/sticker/import.py b/sticker/import.py
new file mode 100644
index 0000000..f350dc5
--- /dev/null
+++ b/sticker/import.py
@@ -0,0 +1,158 @@
+# Copyright (c) 2020 Tulir Asokan
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+from typing import Dict
+import argparse
+import asyncio
+import os.path
+import json
+import re
+
+from telethon import TelegramClient
+from telethon.tl.functions.messages import GetAllStickersRequest, GetStickerSetRequest
+from telethon.tl.types.messages import AllStickers
+from telethon.tl.types import InputStickerSetShortName, Document, DocumentAttributeSticker
+from telethon.tl.types.messages import StickerSet as StickerSetFull
+
+from .lib import matrix, util
+
+
+async def reupload_document(client: TelegramClient, document: Document) -> matrix.StickerInfo:
+ print(f"Reuploading {document.id}", end="", flush=True)
+ data = await client.download_media(document, file=bytes)
+ print(".", end="", flush=True)
+ data, width, height = util.convert_image(data)
+ print(".", end="", flush=True)
+ mxc = await matrix.upload(data, "image/png", f"{document.id}.png")
+ print(".", flush=True)
+ return util.make_sticker(mxc, width, height, len(data))
+
+
+def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
+ for attr in document.attributes:
+ if isinstance(attr, DocumentAttributeSticker):
+ info["body"] = attr.alt
+ info["id"] = f"tg-{document.id}"
+ info["net.maunium.telegram.sticker"] = {
+ "pack": {
+ "id": str(pack.set.id),
+ "short_name": pack.set.short_name,
+ },
+ "id": str(document.id),
+ "emoticons": [],
+ }
+
+
+async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir: str) -> None:
+ if pack.set.animated:
+ print("Animated stickerpacks are currently not supported")
+ return
+
+ pack_path = os.path.join(output_dir, f"{pack.set.short_name}.json")
+ try:
+ os.mkdir(os.path.dirname(pack_path))
+ except FileExistsError:
+ pass
+
+ print(f"Reuploading {pack.set.title} with {pack.set.count} stickers "
+ f"and writing output to {pack_path}")
+
+ already_uploaded = {}
+ try:
+ with open(pack_path) as pack_file:
+ existing_pack = json.load(pack_file)
+ already_uploaded = {int(sticker["net.maunium.telegram.sticker"]["id"]): sticker
+ for sticker in existing_pack["stickers"]}
+ print(f"Found {len(already_uploaded)} already reuploaded stickers")
+ except FileNotFoundError:
+ pass
+
+ reuploaded_documents: Dict[int, matrix.StickerInfo] = {}
+ for document in pack.documents:
+ try:
+ reuploaded_documents[document.id] = already_uploaded[document.id]
+ print(f"Skipped reuploading {document.id}")
+ except KeyError:
+ reuploaded_documents[document.id] = await reupload_document(client, document)
+ # Always ensure the body and telegram metadata is correct
+ add_meta(document, reuploaded_documents[document.id], pack)
+
+ for sticker in pack.packs:
+ if not sticker.emoticon:
+ continue
+ for document_id in sticker.documents:
+ doc = reuploaded_documents[document_id]
+ # If there was no sticker metadata, use the first emoji we find
+ if doc["body"] == "":
+ doc["body"] = sticker.emoticon
+ doc["net.maunium.telegram.sticker"]["emoticons"].append(sticker.emoticon)
+
+ with open(pack_path, "w") as pack_file:
+ json.dump({
+ "title": pack.set.title,
+ "id": f"tg-{pack.set.id}",
+ "net.maunium.telegram.pack": {
+ "short_name": pack.set.short_name,
+ "hash": str(pack.set.hash),
+ },
+ "stickers": list(reuploaded_documents.values()),
+ }, pack_file, ensure_ascii=False)
+ print(f"Saved {pack.set.title} as {pack.set.short_name}.json")
+
+ util.add_to_index(os.path.basename(pack_path), output_dir)
+
+
+pack_url_regex = re.compile(r"^(?:(?:https?://)?(?:t|telegram)\.(?:me|dog)/addstickers/)?"
+ r"([A-Za-z0-9-_]+)"
+ r"(?:\.json)?$")
+
+parser = argparse.ArgumentParser()
+
+parser.add_argument("--list", help="List your saved sticker packs", action="store_true")
+parser.add_argument("--session", help="Telethon session file name", default="sticker-import")
+parser.add_argument("--config",
+ help="Path to JSON file with Matrix homeserver and access_token",
+ type=str, default="config.json")
+parser.add_argument("--output-dir", help="Directory to write packs to", default="web/packs/",
+ type=str)
+parser.add_argument("pack", help="Sticker pack URLs to import", action="append", nargs="*")
+
+
+async def main(args: argparse.Namespace) -> None:
+ await matrix.load_config(args.config)
+ client = TelegramClient(args.session, 298751, "cb676d6bae20553c9996996a8f52b4d7")
+ await client.start()
+
+ if args.list:
+ stickers: AllStickers = await client(GetAllStickersRequest(hash=0))
+ index = 1
+ width = len(str(stickers.sets))
+ print("Your saved sticker packs:")
+ for saved_pack in stickers.sets:
+ print(f"{index:>{width}}. {saved_pack.title} "
+ f"(t.me/addstickers/{saved_pack.short_name})")
+ elif args.pack[0]:
+ input_packs = []
+ for pack_url in args.pack[0]:
+ match = pack_url_regex.match(pack_url)
+ if not match:
+ print(f"'{pack_url}' doesn't look like a sticker pack URL")
+ return
+ input_packs.append(InputStickerSetShortName(short_name=match.group(1)))
+ for input_pack in input_packs:
+ pack: StickerSetFull = await client(GetStickerSetRequest(input_pack))
+ await reupload_pack(client, pack, args.output_dir)
+ else:
+ parser.print_help()
+
+ await client.disconnect()
+
+
+def cmd() -> None:
+ asyncio.get_event_loop().run_until_complete(main(parser.parse_args()))
+
+
+if __name__ == "__main__":
+ cmd()
diff --git a/sticker/lib/__init__.py b/sticker/lib/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sticker/lib/__init__.py
diff --git a/sticker/lib/matrix.py b/sticker/lib/matrix.py
new file mode 100644
index 0000000..426bdd0
--- /dev/null
+++ b/sticker/lib/matrix.py
@@ -0,0 +1,77 @@
+# Copyright (c) 2020 Tulir Asokan
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+from typing import Optional, TYPE_CHECKING
+import json
+
+from aiohttp import ClientSession
+from yarl import URL
+
+access_token: Optional[str] = None
+homeserver_url: Optional[str] = None
+
+upload_url: Optional[URL] = None
+
+if TYPE_CHECKING:
+ from typing import TypedDict
+
+
+ class MediaInfo(TypedDict):
+ w: int
+ h: int
+ size: int
+ mimetype: str
+ thumbnail_url: Optional[str]
+ thumbnail_info: Optional['MediaInfo']
+
+
+ class StickerInfo(TypedDict, total=False):
+ body: str
+ url: str
+ info: MediaInfo
+ id: str
+else:
+ MediaInfo = None
+ StickerInfo = None
+
+
+async def load_config(path: str) -> None:
+ global access_token, homeserver_url, upload_url
+ try:
+ with open(path) as config_file:
+ config = json.load(config_file)
+ homeserver_url = config["homeserver"]
+ access_token = config["access_token"]
+ except FileNotFoundError:
+ print("Matrix config file not found. Please enter your homeserver and access token.")
+ homeserver_url = input("Homeserver URL: ")
+ access_token = input("Access token: ")
+ whoami_url = URL(homeserver_url) / "_matrix" / "client" / "r0" / "account" / "whoami"
+ user_id = await whoami(whoami_url, access_token)
+ with open(path, "w") as config_file:
+ json.dump({
+ "homeserver": homeserver_url,
+ "user_id": user_id,
+ "access_token": access_token
+ }, config_file)
+ print(f"Wrote config to {path}")
+
+ upload_url = URL(homeserver_url) / "_matrix" / "media" / "r0" / "upload"
+
+
+async def whoami(url: URL, access_token: str) -> str:
+ headers = {"Authorization": f"Bearer {access_token}"}
+ async with ClientSession() as sess, sess.get(url, headers=headers) as resp:
+ resp.raise_for_status()
+ user_id = (await resp.json())["user_id"]
+ print(f"Access token validated (user ID: {user_id})")
+ return user_id
+
+
+async def upload(data: bytes, mimetype: str, filename: str) -> str:
+ url = upload_url.with_query({"filename": filename})
+ headers = {"Content-Type": mimetype, "Authorization": f"Bearer {access_token}"}
+ async with ClientSession() as sess, sess.post(url, data=data, headers=headers) as resp:
+ return (await resp.json())["content_uri"]
diff --git a/sticker/lib/util.py b/sticker/lib/util.py
new file mode 100644
index 0000000..feefea2
--- /dev/null
+++ b/sticker/lib/util.py
@@ -0,0 +1,67 @@
+# Copyright (c) 2020 Tulir Asokan
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+from io import BytesIO
+import os.path
+import json
+
+from PIL import Image
+
+from . import matrix
+
+
+def convert_image(data: bytes) -> (bytes, int, int):
+ image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
+ new_file = BytesIO()
+ image.save(new_file, "png")
+ w, h = image.size
+ if w > 256 or h > 256:
+ # Set the width and height to lower values so clients wouldn't show them as huge images
+ if w > h:
+ h = int(h / (w / 256))
+ w = 256
+ else:
+ w = int(w / (h / 256))
+ h = 256
+ return new_file.getvalue(), w, h
+
+
+def add_to_index(name: str, output_dir: str) -> None:
+ index_path = os.path.join(output_dir, "index.json")
+ try:
+ with open(index_path) as index_file:
+ index_data = json.load(index_file)
+ except (FileNotFoundError, json.JSONDecodeError):
+ index_data = {"packs": []}
+ if "homeserver_url" not in index_data and matrix.homeserver_url:
+ index_data["homeserver_url"] = matrix.homeserver_url
+ if name not in index_data["packs"]:
+ index_data["packs"].append(name)
+ with open(index_path, "w") as index_file:
+ json.dump(index_data, index_file, indent=" ")
+ print(f"Added {name} to {index_path}")
+
+
+def make_sticker(mxc: str, width: int, height: int, size: int,
+ body: str = "") -> matrix.StickerInfo:
+ return {
+ "body": body,
+ "url": mxc,
+ "info": {
+ "w": width,
+ "h": height,
+ "size": size,
+ "mimetype": "image/png",
+
+ # Element iOS compatibility hack
+ "thumbnail_url": mxc,
+ "thumbnail_info": {
+ "w": width,
+ "h": height,
+ "size": size,
+ "mimetype": "image/png",
+ },
+ },
+ }
diff --git a/sticker/pack.py b/sticker/pack.py
new file mode 100644
index 0000000..9959d8e
--- /dev/null
+++ b/sticker/pack.py
@@ -0,0 +1,99 @@
+# Copyright (c) 2020 Tulir Asokan
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+from hashlib import sha256
+import argparse
+import os.path
+import asyncio
+import string
+import json
+
+import magic
+
+from .lib import matrix, util
+
+
+def convert_name(name: str) -> str:
+ name_translate = {
+ ord(" "): ord("_"),
+ }
+ allowed_chars = string.ascii_letters + string.digits + "_-/.#"
+ return "".join(filter(lambda char: char in allowed_chars, name.translate(name_translate)))
+
+
+async def main(args: argparse.Namespace) -> None:
+ await matrix.load_config(args.config)
+
+ dirname = os.path.basename(os.path.abspath(args.path))
+ meta_path = os.path.join(args.path, "pack.json")
+ try:
+ with open(meta_path) as pack_file:
+ pack = json.load(pack_file)
+ print(f"Loaded existing pack meta from {meta_path}")
+ except FileNotFoundError:
+ pack = {
+ "title": args.title or dirname,
+ "id": args.id or convert_name(dirname),
+ "stickers": [],
+ }
+ old_stickers = {}
+ else:
+ old_stickers = {sticker["id"]: sticker for sticker in pack["stickers"]}
+ pack["stickers"] = []
+ for file in os.listdir(args.path):
+ if file.startswith("."):
+ continue
+ path = os.path.join(args.path, file)
+ if not os.path.isfile(path):
+ continue
+ mime = magic.from_file(path, mime=True)
+ if not mime.startswith("image/"):
+ continue
+
+ try:
+ with open(path, "rb") as image_file:
+ image_data = image_file.read()
+ except Exception as e:
+ print(f"Failed to read {file}: {e}")
+ continue
+ print(f"Processing {file}", end="", flush=True)
+ name = os.path.splitext(file)[0]
+ sticker_id = f"sha256:{sha256(image_data).hexdigest()}"
+ print(".", end="", flush=True)
+ if sticker_id in old_stickers:
+ pack["stickers"].append({
+ **old_stickers[sticker_id],
+ "body": name,
+ })
+ print(f".. using existing upload")
+ else:
+ image_data, width, height = util.convert_image(image_data)
+ print(".", end="", flush=True)
+ mxc = await matrix.upload(image_data, "image/png", file)
+ print(".", end="", flush=True)
+ sticker = util.make_sticker(mxc, width, height, len(image_data), name)
+ sticker["id"] = sticker_id
+ pack["stickers"].append(sticker)
+ print(" uploaded", flush=True)
+ with open(meta_path, "w") as pack_file:
+ json.dump(pack, pack_file)
+ print(f"Wrote pack to {meta_path}")
+
+
+parser = argparse.ArgumentParser()
+parser.add_argument("--config",
+ help="Path to JSON file with Matrix homeserver and access_token",
+ type=str, default="config.json")
+parser.add_argument("--title", help="Override the sticker pack displayname", type=str)
+parser.add_argument("--id", help="Override the sticker pack ID", type=str)
+parser.add_argument("path", help="Path to the sticker pack directory", type=str)
+
+
+def cmd():
+ asyncio.get_event_loop().run_until_complete(main(parser.parse_args()))
+
+
+if __name__ == "__main__":
+ cmd()
diff --git a/sticker/scalar_convert.py b/sticker/scalar_convert.py
new file mode 100644
index 0000000..50e8a5c
--- /dev/null
+++ b/sticker/scalar_convert.py
@@ -0,0 +1,41 @@
+import sys
+import json
+
+index_path = "../web/packs/index.json"
+
+try:
+ with open(index_path) as index_file:
+ index_data = json.load(index_file)
+except (FileNotFoundError, json.JSONDecodeError):
+ index_data = {"packs": []}
+
+with open(sys.argv[-1]) as file:
+ data = json.load(file)
+
+for pack in data["assets"]:
+ title = pack["name"].title()
+ if "images" not in pack["data"]:
+ print(f"Skipping {title}")
+ continue
+ pack_id = f"scalar-{pack['asset_id']}"
+ stickers = []
+ for sticker in pack["data"]["images"]:
+ sticker_data = sticker["content"]
+ sticker_data["id"] = sticker_data["url"].split("/")[-1]
+ stickers.append(sticker_data)
+ pack_data = {
+ "title": title,
+ "id": pack_id,
+ "stickers": stickers,
+ }
+ filename = f"scalar-{pack['name'].replace(' ', '_')}.json"
+ pack_path = f"web/packs/{filename}"
+ with open(pack_path, "w") as pack_file:
+ json.dump(pack_data, pack_file)
+ print(f"Wrote {title} to {pack_path}")
+ if filename not in index_data["packs"]:
+ index_data["packs"].append(filename)
+
+with open(index_path, "w") as index_file:
+ json.dump(index_data, index_file, indent=" ")
+print(f"Updated {index_path}")
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage