aboutsummaryrefslogtreecommitdiffstats
path: root/import.py
blob: 424c47a9788b59c075ab52820baca19812727742 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# Copyright (c) 2020 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Dict, TypedDict, Optional
from io import BytesIO
import argparse
import os.path
import asyncio
import json
import re

from aiohttp import ClientSession
from yarl import URL
from PIL import Image

from telethon import TelegramClient
from telethon.tl.functions.messages import GetAllStickersRequest, GetStickerSetRequest
from telethon.tl.types.messages import AllStickers
from telethon.tl.types import InputStickerSetShortName, Document
from telethon.tl.types.messages import StickerSet as StickerSetFull

parser = argparse.ArgumentParser()
parser.add_argument("--list", help="List your saved sticker packs", action="store_true")
parser.add_argument("--session", help="Telethon session file name", default="sticker-import")
parser.add_argument("--config", help="Path to JSON file with Matrix homeserver and access_token",
                    type=str, default="config.json")
parser.add_argument("--output-dir", help="Directory to write packs to", default="web/packs/",
                    type=str)
parser.add_argument("pack", help="Sticker pack URLs to import", action="append", nargs="*")
args = parser.parse_args()


async def whoami(url: URL, access_token: str) -> str:
    headers = {"Authorization": f"Bearer {access_token}"}
    async with ClientSession() as sess, sess.get(url, headers=headers) as resp:
        resp.raise_for_status()
        user_id = (await resp.json())["user_id"]
        print(f"Access token validated (user ID: {user_id})")
        return user_id


try:
    with open(args.config) as config_file:
        config = json.load(config_file)
        homeserver_url = config["homeserver"]
        access_token = config["access_token"]
except FileNotFoundError:
    print("Matrix config file not found. Please enter your homeserver and access token.")
    homeserver_url = input("Homeserver URL: ")
    access_token = input("Access token: ")
    whoami_url = URL(homeserver_url) / "_matrix" / "client" / "r0" / "account" / "whoami"
    user_id = asyncio.run(whoami(whoami_url, access_token))
    with open(args.config, "w") as config_file:
        json.dump({
            "homeserver": homeserver_url,
            "user_id": user_id,
            "access_token": access_token
        }, config_file)
    print(f"Wrote config to {args.config}")

upload_url = URL(homeserver_url) / "_matrix" / "media" / "r0" / "upload"


async def upload(data: bytes, mimetype: str, filename: str) -> str:
    url = upload_url.with_query({"filename": filename})
    headers = {"Content-Type": mimetype, "Authorization": f"Bearer {access_token}"}
    async with ClientSession() as sess, sess.post(url, data=data, headers=headers) as resp:
        return (await resp.json())["content_uri"]


class MatrixMediaInfo(TypedDict):
    w: int
    h: int
    size: int
    mimetype: str
    thumbnail_url: Optional[str]
    thumbnail_info: Optional['MatrixMediaInfo']


class MatrixStickerInfo(TypedDict, total=False):
    body: str
    url: str
    info: MatrixMediaInfo


def convert_image(data: bytes) -> (bytes, int, int):
    image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
    new_file = BytesIO()
    image.save(new_file, "png")
    w, h = image.size
    return new_file.getvalue(), w, h


async def reupload_document(client: TelegramClient, document: Document) -> MatrixStickerInfo:
    print(f"Reuploading {document.id}", end="", flush=True)
    data = await client.download_media(document, file=bytes)
    print(".", end="", flush=True)
    data, width, height = convert_image(data)
    print(".", end="", flush=True)
    mxc = await upload(data, "image/png", f"{document.id}.png")
    print(".", flush=True)
    if width > 256 or height > 256:
        # Set the width and height to lower values so clients wouldn't show them as huge images
        if width > height:
            height = int(height / (width / 256))
            width = 256
        else:
            width = int(width / (height / 256))
            height = 256
    return {
        "body": "",
        "url": mxc,
        "info": {
            "w": width,
            "h": height,
            "size": len(data),
            "mimetype": "image/png",

            # Element iOS compatibility hack
            "thumbnail_url": mxc,
            "thumbnail_info": {
                "w": width,
                "h": height,
                "size": len(data),
                "mimetype": "image/png",
            },
        },
    }


def add_to_index(name: str) -> None:
    index_path = os.path.join(args.output_dir, "index.json")
    try:
        with open(index_path) as index_file:
            index_data = json.load(index_file)
    except (FileNotFoundError, json.JSONDecodeError):
        index_data = {"packs": [], "homeserver_url": homeserver_url}
    if name not in index_data["packs"]:
        index_data["packs"].append(name)
        with open(index_path, "w") as index_file:
            json.dump(index_data, index_file, indent="  ")
        print(f"Added {name} to {index_path}")


async def reupload_pack(client: TelegramClient, pack: StickerSetFull) -> None:
    if pack.set.animated:
        print("Animated stickerpacks are currently not supported")
        return

    pack_path = os.path.join(args.output_dir, f"{pack.set.short_name}.json")
    try:
        os.mkdir(os.path.dirname(pack_path))
    except FileExistsError:
        pass

    print(f"Reuploading {pack.set.title} with {pack.set.count} stickers "
          f"and writing output to {pack_path}")

    already_uploaded = {}
    try:
        with open(pack_path) as pack_file:
            existing_pack = json.load(pack_file)
            already_uploaded = {sticker["net.maunium.telegram.sticker"]["id"]: sticker
                                for sticker in existing_pack["stickers"]}
            print(f"Found {len(already_uploaded)} already reuploaded stickers")
    except FileNotFoundError:
        pass

    reuploaded_documents: Dict[int, MatrixStickerInfo] = {}
    for document in pack.documents:
        try:
            reuploaded_documents[document.id] = already_uploaded[document.id]
            print(f"Skipped reuploading {document.id}")
        except KeyError:
            reuploaded_documents[document.id] = await reupload_document(client, document)

    for sticker in pack.packs:
        for document_id in sticker.documents:
            doc = reuploaded_documents[document_id]
            doc["body"] = sticker.emoticon
            doc["net.maunium.telegram.sticker"] = {
                "pack": {
                    "id": pack.set.id,
                    "short_name": pack.set.short_name,
                },
                "id": document_id,
                "emoticon": sticker.emoticon,
            }

    with open(pack_path, "w") as pack_file:
        json.dump({
            "title": pack.set.title,
            "short_name": pack.set.short_name,
            "id": pack.set.id,
            "hash": pack.set.hash,
            "stickers": list(reuploaded_documents.values()),
        }, pack_file, ensure_ascii=False)
    print(f"Saved {pack.set.title} as {pack.set.short_name}.json")

    add_to_index(os.path.basename(pack_path))


pack_url_regex = re.compile(r"^(?:(?:https?://)?(?:t|telegram)\.(?:me|dog)/addstickers/)?"
                            r"([A-Za-z0-9-_]+)"
                            r"(?:\.json)?$")


async def main():
    client = TelegramClient(args.session, 298751, "cb676d6bae20553c9996996a8f52b4d7")
    await client.start()

    if args.list:
        stickers: AllStickers = await client(GetAllStickersRequest(hash=0))
        index = 1
        width = len(str(stickers.sets))
        print("Your saved sticker packs:")
        for saved_pack in stickers.sets:
            print(f"{index:>{width}}. {saved_pack.title} "
                  f"(t.me/addstickers/{saved_pack.short_name})")
    elif args.pack[0]:
        input_packs = []
        for pack_url in args.pack[0]:
            match = pack_url_regex.match(pack_url)
            if not match:
                print(f"'{pack_url}' doesn't look like a sticker pack URL")
                return
            input_packs.append(InputStickerSetShortName(short_name=match.group(1)))
        for input_pack in input_packs:
            pack: StickerSetFull = await client(GetStickerSetRequest(input_pack))
            await reupload_pack(client, pack)
    else:
        parser.print_help()


asyncio.run(main())
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage