From 80bcf6d0acdf35f082a6765db989ef80100f20fb Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sun, 13 Sep 2020 03:56:28 +0300 Subject: Reorganize Python stuff and add command to create packs Fixes #11 --- .gitignore | 1 + README.md | 47 ++++++--- import.py | 263 ---------------------------------------------- requirements.txt | 1 + scalar-convert.py | 42 -------- setup.py | 42 ++++++++ sticker/__init__.py | 0 sticker/import.py | 158 ++++++++++++++++++++++++++++ sticker/lib/__init__.py | 0 sticker/lib/matrix.py | 77 ++++++++++++++ sticker/lib/util.py | 67 ++++++++++++ sticker/pack.py | 99 +++++++++++++++++ sticker/scalar_convert.py | 41 ++++++++ 13 files changed, 520 insertions(+), 318 deletions(-) delete mode 100644 import.py delete mode 100644 scalar-convert.py create mode 100644 setup.py create mode 100644 sticker/__init__.py create mode 100644 sticker/import.py create mode 100644 sticker/lib/__init__.py create mode 100644 sticker/lib/matrix.py create mode 100644 sticker/lib/util.py create mode 100644 sticker/pack.py create mode 100644 sticker/scalar_convert.py diff --git a/.gitignore b/.gitignore index 0d96178..27909fb 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ /env *.pyc __pycache__ +*.egg-info node_modules diff --git a/README.md b/README.md index c46f787..0d4c421 100644 --- a/README.md +++ b/README.md @@ -4,22 +4,43 @@ A fast and simple Matrix sticker picker widget. Tested on Element Web, Android & ## Discussion Matrix room: [`#maunium:maunium.net`](https://matrix.to/#/#maunium:maunium.net) -## Importing packs from Telegram +## Utility commands +In addition to the sticker picker widget itself, this project includes some +utility scripts you can use to import and create sticker packs. + +To get started, install the dependencies for using the commands: + +0. Make sure you have Python 3.6 or higher. 1. (Optional) Set up a virtual environment. - 1. Create with `virtualenv -p python3 .` - 2. Activate with `source ./bin/activate` -2. Install dependencies with `pip install -r requirements.txt` -3. Run `python3 import.py ` - * On the first run, it'll prompt you to log in to Matrix and Telegram. - * The Matrix URL and access token are stored in `config.json` by default. - * The Telethon session data is stored in `sticker-import.session` by default. - * By default, the pack data will be written to `web/packs/`. - * You can pass as many pack URLs as you want. - * You can re-run the command with the same URLs to update packs. - -If you want to list the URLs of all your saved packs, use `python3 import.py --list`. + 1. Create with `virtualenv -p python3 .venv` + 2. Activate with `source .venv/bin/activate` +2. Install the utility commands and their dependencies with `pip install .` + +### Importing packs from Telegram +To import packs from Telegram, simply run `sticker-import ` with +one or more t.me/addstickers/... URLs. + +If you want to list the URLs of all your saved packs, use `sticker-import --list`. This requires logging in with your account instead of a bot token. +Notes: + +* On the first run, it'll prompt you to log in to Matrix and Telegram. + * The Matrix URL and access token are stored in `config.json` by default. + * The Telethon session data is stored in `sticker-import.session` by default. +* By default, the pack data will be written to `web/packs/`. +* You can pass as many pack URLs as you want. +* You can re-run the command with the same URLs to update packs. + +### Creating your own packs +1. Create a directory with your sticker images. + * The file name (excluding extension) will be used as the caption. + * The directory name will be used as the pack name/ID. +2. Run `sticker-pack `. + * If you want to override the pack displayname, pass `--title `. +3. Copy `/pack.json` to `web/packs/your-pack-name.json`. +4. Add `your-pack-name.json` to the list in `web/packs/index.json`. + ## Enabling the sticker widget 1. Serve everything under `web/` using your webserver of choice. Make sure not to serve the top-level data, as `config.json` and the Telethon session file contain sensitive data. diff --git a/import.py b/import.py deleted file mode 100644 index cd917ed..0000000 --- a/import.py +++ /dev/null @@ -1,263 +0,0 @@ -# Copyright (c) 2020 Tulir Asokan -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. -from typing import Dict, Optional, TYPE_CHECKING -from io import BytesIO -import argparse -import os.path -import asyncio -import json -import re - -from aiohttp import ClientSession -from yarl import URL -from PIL import Image - -from telethon import TelegramClient -from telethon.tl.functions.messages import GetAllStickersRequest, GetStickerSetRequest -from telethon.tl.types.messages import AllStickers -from telethon.tl.types import InputStickerSetShortName, Document, DocumentAttributeSticker -from telethon.tl.types.messages import StickerSet as StickerSetFull - -parser = argparse.ArgumentParser() -parser.add_argument("--list", help="List your saved sticker packs", action="store_true") -parser.add_argument("--session", help="Telethon session file name", default="sticker-import") -parser.add_argument("--config", help="Path to JSON file with Matrix homeserver and access_token", - type=str, default="config.json") -parser.add_argument("--output-dir", help="Directory to write packs to", default="web/packs/", - type=str) -parser.add_argument("pack", help="Sticker pack URLs to import", action="append", nargs="*") -args = parser.parse_args() -loop = asyncio.get_event_loop() - - -async def whoami(url: URL, access_token: str) -> str: - headers = {"Authorization": f"Bearer {access_token}"} - async with ClientSession() as sess, sess.get(url, headers=headers) as resp: - resp.raise_for_status() - user_id = (await resp.json())["user_id"] - print(f"Access token validated (user ID: {user_id})") - return user_id - - -try: - with open(args.config) as config_file: - config = json.load(config_file) - homeserver_url = config["homeserver"] - access_token = config["access_token"] -except FileNotFoundError: - print("Matrix config file not found. Please enter your homeserver and access token.") - homeserver_url = input("Homeserver URL: ") - access_token = input("Access token: ") - whoami_url = URL(homeserver_url) / "_matrix" / "client" / "r0" / "account" / "whoami" - user_id = loop.run_until_complete(whoami(whoami_url, access_token)) - with open(args.config, "w") as config_file: - json.dump({ - "homeserver": homeserver_url, - "user_id": user_id, - "access_token": access_token - }, config_file) - print(f"Wrote config to {args.config}") - -upload_url = URL(homeserver_url) / "_matrix" / "media" / "r0" / "upload" - - -async def upload(data: bytes, mimetype: str, filename: str) -> str: - url = upload_url.with_query({"filename": filename}) - headers = {"Content-Type": mimetype, "Authorization": f"Bearer {access_token}"} - async with ClientSession() as sess, sess.post(url, data=data, headers=headers) as resp: - return (await resp.json())["content_uri"] - - -if TYPE_CHECKING: - from typing import TypedDict - - - class MatrixMediaInfo(TypedDict): - w: int - h: int - size: int - mimetype: str - thumbnail_url: Optional[str] - thumbnail_info: Optional['MatrixMediaInfo'] - - - class MatrixStickerInfo(TypedDict, total=False): - body: str - url: str - info: MatrixMediaInfo - id: str - - -def convert_image(data: bytes) -> (bytes, int, int): - image: Image.Image = Image.open(BytesIO(data)).convert("RGBA") - new_file = BytesIO() - image.save(new_file, "png") - w, h = image.size - return new_file.getvalue(), w, h - - -async def reupload_document(client: TelegramClient, document: Document) -> 'MatrixStickerInfo': - print(f"Reuploading {document.id}", end="", flush=True) - data = await client.download_media(document, file=bytes) - print(".", end="", flush=True) - data, width, height = convert_image(data) - print(".", end="", flush=True) - mxc = await upload(data, "image/png", f"{document.id}.png") - print(".", flush=True) - if width > 256 or height > 256: - # Set the width and height to lower values so clients wouldn't show them as huge images - if width > height: - height = int(height / (width / 256)) - width = 256 - else: - width = int(width / (height / 256)) - height = 256 - return { - "body": "", - "url": mxc, - "info": { - "w": width, - "h": height, - "size": len(data), - "mimetype": "image/png", - - # Element iOS compatibility hack - "thumbnail_url": mxc, - "thumbnail_info": { - "w": width, - "h": height, - "size": len(data), - "mimetype": "image/png", - }, - }, - } - - -def add_to_index(name: str) -> None: - index_path = os.path.join(args.output_dir, "index.json") - try: - with open(index_path) as index_file: - index_data = json.load(index_file) - except (FileNotFoundError, json.JSONDecodeError): - index_data = {"packs": []} - if "homeserver_url" not in index_data: - index_data["homeserver_url"] = homeserver_url - if name not in index_data["packs"]: - index_data["packs"].append(name) - with open(index_path, "w") as index_file: - json.dump(index_data, index_file, indent=" ") - print(f"Added {name} to {index_path}") - - -def add_meta(document: Document, info: 'MatrixStickerInfo', pack: StickerSetFull) -> None: - for attr in document.attributes: - if isinstance(attr, DocumentAttributeSticker): - info["body"] = attr.alt - info["id"] = f"tg-{document.id}" - info["net.maunium.telegram.sticker"] = { - "pack": { - "id": str(pack.set.id), - "short_name": pack.set.short_name, - }, - "id": str(document.id), - "emoticons": [], - } - - -async def reupload_pack(client: TelegramClient, pack: StickerSetFull) -> None: - if pack.set.animated: - print("Animated stickerpacks are currently not supported") - return - - pack_path = os.path.join(args.output_dir, f"{pack.set.short_name}.json") - try: - os.mkdir(os.path.dirname(pack_path)) - except FileExistsError: - pass - - print(f"Reuploading {pack.set.title} with {pack.set.count} stickers " - f"and writing output to {pack_path}") - - already_uploaded = {} - try: - with open(pack_path) as pack_file: - existing_pack = json.load(pack_file) - already_uploaded = {int(sticker["net.maunium.telegram.sticker"]["id"]): sticker - for sticker in existing_pack["stickers"]} - print(f"Found {len(already_uploaded)} already reuploaded stickers") - except FileNotFoundError: - pass - - reuploaded_documents: Dict[int, 'MatrixStickerInfo'] = {} - for document in pack.documents: - try: - reuploaded_documents[document.id] = already_uploaded[document.id] - print(f"Skipped reuploading {document.id}") - except KeyError: - reuploaded_documents[document.id] = await reupload_document(client, document) - # Always ensure the body and telegram metadata is correct - add_meta(document, reuploaded_documents[document.id], pack) - - for sticker in pack.packs: - if not sticker.emoticon: - continue - for document_id in sticker.documents: - doc = reuploaded_documents[document_id] - # If there was no sticker metadata, use the first emoji we find - if doc["body"] == "": - doc["body"] = sticker.emoticon - doc["net.maunium.telegram.sticker"]["emoticons"].append(sticker.emoticon) - - with open(pack_path, "w") as pack_file: - json.dump({ - "title": pack.set.title, - "id": f"tg-{pack.set.id}", - "net.maunium.telegram.pack": { - "short_name": pack.set.short_name, - "hash": str(pack.set.hash), - }, - "stickers": list(reuploaded_documents.values()), - }, pack_file, ensure_ascii=False) - print(f"Saved {pack.set.title} as {pack.set.short_name}.json") - - add_to_index(os.path.basename(pack_path)) - - -pack_url_regex = re.compile(r"^(?:(?:https?://)?(?:t|telegram)\.(?:me|dog)/addstickers/)?" - r"([A-Za-z0-9-_]+)" - r"(?:\.json)?$") - - -async def main(): - client = TelegramClient(args.session, 298751, "cb676d6bae20553c9996996a8f52b4d7") - await client.start() - - if args.list: - stickers: AllStickers = await client(GetAllStickersRequest(hash=0)) - index = 1 - width = len(str(stickers.sets)) - print("Your saved sticker packs:") - for saved_pack in stickers.sets: - print(f"{index:>{width}}. {saved_pack.title} " - f"(t.me/addstickers/{saved_pack.short_name})") - elif args.pack[0]: - input_packs = [] - for pack_url in args.pack[0]: - match = pack_url_regex.match(pack_url) - if not match: - print(f"'{pack_url}' doesn't look like a sticker pack URL") - return - input_packs.append(InputStickerSetShortName(short_name=match.group(1))) - for input_pack in input_packs: - pack: StickerSetFull = await client(GetStickerSetRequest(input_pack)) - await reupload_pack(client, pack) - else: - parser.print_help() - - await client.disconnect() - - -loop.run_until_complete(main()) diff --git a/requirements.txt b/requirements.txt index ca16357..6d89dbf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ yarl pillow telethon cryptg +python-magic diff --git a/scalar-convert.py b/scalar-convert.py deleted file mode 100644 index da424b2..0000000 --- a/scalar-convert.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python3 -import sys -import json - -index_path = "web/packs/index.json" - -try: - with open(index_path) as index_file: - index_data = json.load(index_file) -except (FileNotFoundError, json.JSONDecodeError): - index_data = {"packs": []} - -with open(sys.argv[-1]) as file: - data = json.load(file) - -for pack in data["assets"]: - title = pack["name"].title() - if "images" not in pack["data"]: - print(f"Skipping {title}") - continue - id = f"scalar-{pack['asset_id']}" - stickers = [] - for sticker in pack["data"]["images"]: - sticker_data = sticker["content"] - sticker_data["id"] = sticker_data["url"].split("/")[-1] - stickers.append(sticker_data) - pack_data = { - "title": title, - "id": id, - "stickers": stickers, - } - filename = f"scalar-{pack['name'].replace(' ', '_')}.json" - pack_path = f"web/packs/{filename}" - with open(pack_path, "w") as pack_file: - json.dump(pack_data, pack_file) - print(f"Wrote {title} to {pack_path}") - if filename not in index_data["packs"]: - index_data["packs"].append(filename) - -with open(index_path, "w") as index_file: - json.dump(index_data, index_file, indent=" ") -print(f"Updated {index_path}") diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b639d36 --- /dev/null +++ b/setup.py @@ -0,0 +1,42 @@ +import setuptools + +with open("requirements.txt") as reqs: + install_requires = reqs.read().splitlines() + +try: + long_desc = open("README.md").read() +except IOError: + long_desc = "Failed to read README.md" + +setuptools.setup( + name="maunium-stickerpicker", + version="0.1.0", + url="https://github.com/maunium/stickerpicker", + + author="Tulir Asokan", + author_email="tulir@maunium.net", + + description="A fast and simple Matrix sticker picker widget", + long_description=long_desc, + long_description_content_type="text/markdown", + + packages=setuptools.find_packages(), + + install_requires=install_requires, + python_requires="~=3.6", + + classifiers=[ + "Development Status :: 4 - Beta", + "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)", + "Framework :: AsyncIO", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + ], + entry_points={"console_scripts": [ + "sticker-import=sticker.import:cmd", + "sticker-pack=sticker.pack:cmd", + ]}, +) diff --git a/sticker/__init__.py b/sticker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sticker/import.py b/sticker/import.py new file mode 100644 index 0000000..f350dc5 --- /dev/null +++ b/sticker/import.py @@ -0,0 +1,158 @@ +# Copyright (c) 2020 Tulir Asokan +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +from typing import Dict +import argparse +import asyncio +import os.path +import json +import re + +from telethon import TelegramClient +from telethon.tl.functions.messages import GetAllStickersRequest, GetStickerSetRequest +from telethon.tl.types.messages import AllStickers +from telethon.tl.types import InputStickerSetShortName, Document, DocumentAttributeSticker +from telethon.tl.types.messages import StickerSet as StickerSetFull + +from .lib import matrix, util + + +async def reupload_document(client: TelegramClient, document: Document) -> matrix.StickerInfo: + print(f"Reuploading {document.id}", end="", flush=True) + data = await client.download_media(document, file=bytes) + print(".", end="", flush=True) + data, width, height = util.convert_image(data) + print(".", end="", flush=True) + mxc = await matrix.upload(data, "image/png", f"{document.id}.png") + print(".", flush=True) + return util.make_sticker(mxc, width, height, len(data)) + + +def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None: + for attr in document.attributes: + if isinstance(attr, DocumentAttributeSticker): + info["body"] = attr.alt + info["id"] = f"tg-{document.id}" + info["net.maunium.telegram.sticker"] = { + "pack": { + "id": str(pack.set.id), + "short_name": pack.set.short_name, + }, + "id": str(document.id), + "emoticons": [], + } + + +async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir: str) -> None: + if pack.set.animated: + print("Animated stickerpacks are currently not supported") + return + + pack_path = os.path.join(output_dir, f"{pack.set.short_name}.json") + try: + os.mkdir(os.path.dirname(pack_path)) + except FileExistsError: + pass + + print(f"Reuploading {pack.set.title} with {pack.set.count} stickers " + f"and writing output to {pack_path}") + + already_uploaded = {} + try: + with open(pack_path) as pack_file: + existing_pack = json.load(pack_file) + already_uploaded = {int(sticker["net.maunium.telegram.sticker"]["id"]): sticker + for sticker in existing_pack["stickers"]} + print(f"Found {len(already_uploaded)} already reuploaded stickers") + except FileNotFoundError: + pass + + reuploaded_documents: Dict[int, matrix.StickerInfo] = {} + for document in pack.documents: + try: + reuploaded_documents[document.id] = already_uploaded[document.id] + print(f"Skipped reuploading {document.id}") + except KeyError: + reuploaded_documents[document.id] = await reupload_document(client, document) + # Always ensure the body and telegram metadata is correct + add_meta(document, reuploaded_documents[document.id], pack) + + for sticker in pack.packs: + if not sticker.emoticon: + continue + for document_id in sticker.documents: + doc = reuploaded_documents[document_id] + # If there was no sticker metadata, use the first emoji we find + if doc["body"] == "": + doc["body"] = sticker.emoticon + doc["net.maunium.telegram.sticker"]["emoticons"].append(sticker.emoticon) + + with open(pack_path, "w") as pack_file: + json.dump({ + "title": pack.set.title, + "id": f"tg-{pack.set.id}", + "net.maunium.telegram.pack": { + "short_name": pack.set.short_name, + "hash": str(pack.set.hash), + }, + "stickers": list(reuploaded_documents.values()), + }, pack_file, ensure_ascii=False) + print(f"Saved {pack.set.title} as {pack.set.short_name}.json") + + util.add_to_index(os.path.basename(pack_path), output_dir) + + +pack_url_regex = re.compile(r"^(?:(?:https?://)?(?:t|telegram)\.(?:me|dog)/addstickers/)?" + r"([A-Za-z0-9-_]+)" + r"(?:\.json)?$") + +parser = argparse.ArgumentParser() + +parser.add_argument("--list", help="List your saved sticker packs", action="store_true") +parser.add_argument("--session", help="Telethon session file name", default="sticker-import") +parser.add_argument("--config", + help="Path to JSON file with Matrix homeserver and access_token", + type=str, default="config.json") +parser.add_argument("--output-dir", help="Directory to write packs to", default="web/packs/", + type=str) +parser.add_argument("pack", help="Sticker pack URLs to import", action="append", nargs="*") + + +async def main(args: argparse.Namespace) -> None: + await matrix.load_config(args.config) + client = TelegramClient(args.session, 298751, "cb676d6bae20553c9996996a8f52b4d7") + await client.start() + + if args.list: + stickers: AllStickers = await client(GetAllStickersRequest(hash=0)) + index = 1 + width = len(str(stickers.sets)) + print("Your saved sticker packs:") + for saved_pack in stickers.sets: + print(f"{index:>{width}}. {saved_pack.title} " + f"(t.me/addstickers/{saved_pack.short_name})") + elif args.pack[0]: + input_packs = [] + for pack_url in args.pack[0]: + match = pack_url_regex.match(pack_url) + if not match: + print(f"'{pack_url}' doesn't look like a sticker pack URL") + return + input_packs.append(InputStickerSetShortName(short_name=match.group(1))) + for input_pack in input_packs: + pack: StickerSetFull = await client(GetStickerSetRequest(input_pack)) + await reupload_pack(client, pack, args.output_dir) + else: + parser.print_help() + + await client.disconnect() + + +def cmd() -> None: + asyncio.get_event_loop().run_until_complete(main(parser.parse_args())) + + +if __name__ == "__main__": + cmd() diff --git a/sticker/lib/__init__.py b/sticker/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sticker/lib/matrix.py b/sticker/lib/matrix.py new file mode 100644 index 0000000..426bdd0 --- /dev/null +++ b/sticker/lib/matrix.py @@ -0,0 +1,77 @@ +# Copyright (c) 2020 Tulir Asokan +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +from typing import Optional, TYPE_CHECKING +import json + +from aiohttp import ClientSession +from yarl import URL + +access_token: Optional[str] = None +homeserver_url: Optional[str] = None + +upload_url: Optional[URL] = None + +if TYPE_CHECKING: + from typing import TypedDict + + + class MediaInfo(TypedDict): + w: int + h: int + size: int + mimetype: str + thumbnail_url: Optional[str] + thumbnail_info: Optional['MediaInfo'] + + + class StickerInfo(TypedDict, total=False): + body: str + url: str + info: MediaInfo + id: str +else: + MediaInfo = None + StickerInfo = None + + +async def load_config(path: str) -> None: + global access_token, homeserver_url, upload_url + try: + with open(path) as config_file: + config = json.load(config_file) + homeserver_url = config["homeserver"] + access_token = config["access_token"] + except FileNotFoundError: + print("Matrix config file not found. Please enter your homeserver and access token.") + homeserver_url = input("Homeserver URL: ") + access_token = input("Access token: ") + whoami_url = URL(homeserver_url) / "_matrix" / "client" / "r0" / "account" / "whoami" + user_id = await whoami(whoami_url, access_token) + with open(path, "w") as config_file: + json.dump({ + "homeserver": homeserver_url, + "user_id": user_id, + "access_token": access_token + }, config_file) + print(f"Wrote config to {path}") + + upload_url = URL(homeserver_url) / "_matrix" / "media" / "r0" / "upload" + + +async def whoami(url: URL, access_token: str) -> str: + headers = {"Authorization": f"Bearer {access_token}"} + async with ClientSession() as sess, sess.get(url, headers=headers) as resp: + resp.raise_for_status() + user_id = (await resp.json())["user_id"] + print(f"Access token validated (user ID: {user_id})") + return user_id + + +async def upload(data: bytes, mimetype: str, filename: str) -> str: + url = upload_url.with_query({"filename": filename}) + headers = {"Content-Type": mimetype, "Authorization": f"Bearer {access_token}"} + async with ClientSession() as sess, sess.post(url, data=data, headers=headers) as resp: + return (await resp.json())["content_uri"] diff --git a/sticker/lib/util.py b/sticker/lib/util.py new file mode 100644 index 0000000..feefea2 --- /dev/null +++ b/sticker/lib/util.py @@ -0,0 +1,67 @@ +# Copyright (c) 2020 Tulir Asokan +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +from io import BytesIO +import os.path +import json + +from PIL import Image + +from . import matrix + + +def convert_image(data: bytes) -> (bytes, int, int): + image: Image.Image = Image.open(BytesIO(data)).convert("RGBA") + new_file = BytesIO() + image.save(new_file, "png") + w, h = image.size + if w > 256 or h > 256: + # Set the width and height to lower values so clients wouldn't show them as huge images + if w > h: + h = int(h / (w / 256)) + w = 256 + else: + w = int(w / (h / 256)) + h = 256 + return new_file.getvalue(), w, h + + +def add_to_index(name: str, output_dir: str) -> None: + index_path = os.path.join(output_dir, "index.json") + try: + with open(index_path) as index_file: + index_data = json.load(index_file) + except (FileNotFoundError, json.JSONDecodeError): + index_data = {"packs": []} + if "homeserver_url" not in index_data and matrix.homeserver_url: + index_data["homeserver_url"] = matrix.homeserver_url + if name not in index_data["packs"]: + index_data["packs"].append(name) + with open(index_path, "w") as index_file: + json.dump(index_data, index_file, indent=" ") + print(f"Added {name} to {index_path}") + + +def make_sticker(mxc: str, width: int, height: int, size: int, + body: str = "") -> matrix.StickerInfo: + return { + "body": body, + "url": mxc, + "info": { + "w": width, + "h": height, + "size": size, + "mimetype": "image/png", + + # Element iOS compatibility hack + "thumbnail_url": mxc, + "thumbnail_info": { + "w": width, + "h": height, + "size": size, + "mimetype": "image/png", + }, + }, + } diff --git a/sticker/pack.py b/sticker/pack.py new file mode 100644 index 0000000..9959d8e --- /dev/null +++ b/sticker/pack.py @@ -0,0 +1,99 @@ +# Copyright (c) 2020 Tulir Asokan +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +from hashlib import sha256 +import argparse +import os.path +import asyncio +import string +import json + +import magic + +from .lib import matrix, util + + +def convert_name(name: str) -> str: + name_translate = { + ord(" "): ord("_"), + } + allowed_chars = string.ascii_letters + string.digits + "_-/.#" + return "".join(filter(lambda char: char in allowed_chars, name.translate(name_translate))) + + +async def main(args: argparse.Namespace) -> None: + await matrix.load_config(args.config) + + dirname = os.path.basename(os.path.abspath(args.path)) + meta_path = os.path.join(args.path, "pack.json") + try: + with open(meta_path) as pack_file: + pack = json.load(pack_file) + print(f"Loaded existing pack meta from {meta_path}") + except FileNotFoundError: + pack = { + "title": args.title or dirname, + "id": args.id or convert_name(dirname), + "stickers": [], + } + old_stickers = {} + else: + old_stickers = {sticker["id"]: sticker for sticker in pack["stickers"]} + pack["stickers"] = [] + for file in os.listdir(args.path): + if file.startswith("."): + continue + path = os.path.join(args.path, file) + if not os.path.isfile(path): + continue + mime = magic.from_file(path, mime=True) + if not mime.startswith("image/"): + continue + + try: + with open(path, "rb") as image_file: + image_data = image_file.read() + except Exception as e: + print(f"Failed to read {file}: {e}") + continue + print(f"Processing {file}", end="", flush=True) + name = os.path.splitext(file)[0] + sticker_id = f"sha256:{sha256(image_data).hexdigest()}" + print(".", end="", flush=True) + if sticker_id in old_stickers: + pack["stickers"].append({ + **old_stickers[sticker_id], + "body": name, + }) + print(f".. using existing upload") + else: + image_data, width, height = util.convert_image(image_data) + print(".", end="", flush=True) + mxc = await matrix.upload(image_data, "image/png", file) + print(".", end="", flush=True) + sticker = util.make_sticker(mxc, width, height, len(image_data), name) + sticker["id"] = sticker_id + pack["stickers"].append(sticker) + print(" uploaded", flush=True) + with open(meta_path, "w") as pack_file: + json.dump(pack, pack_file) + print(f"Wrote pack to {meta_path}") + + +parser = argparse.ArgumentParser() +parser.add_argument("--config", + help="Path to JSON file with Matrix homeserver and access_token", + type=str, default="config.json") +parser.add_argument("--title", help="Override the sticker pack displayname", type=str) +parser.add_argument("--id", help="Override the sticker pack ID", type=str) +parser.add_argument("path", help="Path to the sticker pack directory", type=str) + + +def cmd(): + asyncio.get_event_loop().run_until_complete(main(parser.parse_args())) + + +if __name__ == "__main__": + cmd() diff --git a/sticker/scalar_convert.py b/sticker/scalar_convert.py new file mode 100644 index 0000000..50e8a5c --- /dev/null +++ b/sticker/scalar_convert.py @@ -0,0 +1,41 @@ +import sys +import json + +index_path = "../web/packs/index.json" + +try: + with open(index_path) as index_file: + index_data = json.load(index_file) +except (FileNotFoundError, json.JSONDecodeError): + index_data = {"packs": []} + +with open(sys.argv[-1]) as file: + data = json.load(file) + +for pack in data["assets"]: + title = pack["name"].title() + if "images" not in pack["data"]: + print(f"Skipping {title}") + continue + pack_id = f"scalar-{pack['asset_id']}" + stickers = [] + for sticker in pack["data"]["images"]: + sticker_data = sticker["content"] + sticker_data["id"] = sticker_data["url"].split("/")[-1] + stickers.append(sticker_data) + pack_data = { + "title": title, + "id": pack_id, + "stickers": stickers, + } + filename = f"scalar-{pack['name'].replace(' ', '_')}.json" + pack_path = f"web/packs/{filename}" + with open(pack_path, "w") as pack_file: + json.dump(pack_data, pack_file) + print(f"Wrote {title} to {pack_path}") + if filename not in index_data["packs"]: + index_data["packs"].append(filename) + +with open(index_path, "w") as index_file: + json.dump(index_data, index_file, indent=" ") +print(f"Updated {index_path}") -- cgit v1.2.3