aboutsummaryrefslogtreecommitdiffstats
path: root/app.py
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2026-03-06 01:52:03 -0800
committerPinapelz <yukais@pinapelz.com>2026-03-06 01:52:03 -0800
commitc83f3aea31cd96479dfb1f5dc28eb54001c3aa21 (patch)
tree1919adc61608feb36f63fea078a7878f82188fb8 /app.py
init commit
Diffstat (limited to 'app.py')
-rw-r--r--app.py354
1 files changed, 354 insertions, 0 deletions
diff --git a/app.py b/app.py
new file mode 100644
index 0000000..c283511
--- /dev/null
+++ b/app.py
@@ -0,0 +1,354 @@
+import sqlite3
+import json
+import tempfile
+import os
+
+import blackboxprotobuf
+import streamlit as st
+
+st.set_page_config(
+ page_title="Teamspeak 6 Extractor",
+ page_icon="🔍",
+ layout="wide",
+)
+
+TS_UUID = "11111111-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
+
+
+def get_connection(db_path: str) -> sqlite3.Connection:
+ uri = f"file:{db_path}?mode=ro&immutable=1"
+ conn = sqlite3.connect(uri, uri=True, check_same_thread=False)
+ conn.text_factory = bytes
+ return conn
+
+
+def with_conn(fn, *args, **kwargs):
+ conn = get_connection(st.session_state["db_path_open"])
+ try:
+ return fn(conn, *args, **kwargs)
+ finally:
+ conn.close()
+
+
+def fetch_all_rows(conn: sqlite3.Connection) -> list[tuple]:
+ cursor = conn.cursor()
+ cursor.execute("SELECT rowid, key FROM ProtobufItems ORDER BY rowid")
+ return cursor.fetchall()
+
+
+def fetch_row(conn: sqlite3.Connection, rowid: int) -> bytes | None:
+ cursor = conn.cursor()
+ cursor.execute("SELECT value FROM ProtobufItems WHERE rowid=?", (rowid,))
+ row = cursor.fetchone()
+ if row is None:
+ return None
+ blob = row[0]
+ if isinstance(blob, str):
+ blob = blob.encode("latin1")
+ return blob
+
+
+def decode_blob(blob: bytes) -> tuple[dict, dict]:
+ return blackboxprotobuf.decode_message(blob)
+
+
+def make_json_serialisable(obj):
+ if isinstance(obj, bytes):
+ return obj.decode("latin1")
+ if isinstance(obj, dict):
+ return {k: make_json_serialisable(v) for k, v in obj.items()}
+ if isinstance(obj, list):
+ return [make_json_serialisable(i) for i in obj]
+ return obj
+
+
+def to_str(val) -> str:
+ if isinstance(val, bytes):
+ return val.decode("utf-8", errors="replace")
+ return str(val) if val is not None else ""
+
+
+def message_contains_uuid(obj) -> bool:
+ if isinstance(obj, (bytes, str)):
+ return TS_UUID in to_str(obj)
+ if isinstance(obj, dict):
+ return any(message_contains_uuid(v) for v in obj.values())
+ if isinstance(obj, list):
+ return any(message_contains_uuid(v) for v in obj)
+ return False
+
+
+def extract_credentials(message: dict) -> dict | None:
+ block21 = message.get("21") or message.get(21)
+ if not isinstance(block21, dict):
+ return None
+ block5 = block21.get("5") or block21.get(5)
+ if not isinstance(block5, dict):
+ return None
+ homeserver = to_str(block5.get("1") or block5.get(1) or "")
+ username = to_str(block5.get("2") or block5.get(2) or "")
+ password = to_str(block5.get("3") or block5.get(3) or "")
+ if not any([homeserver, username, password]):
+ return None
+ return {"homeserver": homeserver, "username": username, "password": password}
+
+
+def sort_rows(rows: list[tuple], messages: dict) -> list[tuple]:
+ """
+ 3-tier sort priority:
+ 0 — UUID present AND credentials found at ["21"]["5"] (best match)
+ 1 — UUID present but no credentials
+ 2 — everything else
+ """
+ def key(item):
+ rowid, _ = item
+ msg = messages.get(rowid, {})
+ has_uuid = message_contains_uuid(msg)
+ has_creds = extract_credentials(msg) is not None
+ if has_uuid and has_creds:
+ return 0
+ if has_uuid:
+ return 1
+ return 2
+ return sorted(rows, key=key)
+
+
+def _init_state():
+ defaults = {
+ "db_path": "",
+ "db_temp_path": None,
+ "db_path_open": None,
+ "rows": [],
+ "row_messages": {},
+ "selected_rowid": None,
+ "blob": None,
+ "message": None,
+ "typedef": None,
+ "edit_json": "",
+ }
+ for k, v in defaults.items():
+ if k not in st.session_state:
+ st.session_state[k] = v
+
+
+_init_state()
+
+
+def load_all_messages(db_path: str, rows: list[tuple]) -> dict:
+ messages = {}
+ conn = get_connection(db_path)
+ try:
+ for rowid, _ in rows:
+ blob = fetch_row(conn, rowid)
+ if blob:
+ try:
+ msg, _ = decode_blob(blob)
+ messages[rowid] = msg
+ except Exception:
+ messages[rowid] = {}
+ finally:
+ conn.close()
+ return messages
+
+
+def open_db(path: str):
+ conn = get_connection(path)
+ rows = fetch_all_rows(conn)
+ conn.close()
+
+ row_messages = load_all_messages(path, rows)
+ sorted_rows = sort_rows(rows, row_messages)
+
+ st.session_state["db_path_open"] = path
+ st.session_state["rows"] = sorted_rows
+ st.session_state["row_messages"] = row_messages
+ st.session_state["selected_rowid"] = None
+ st.session_state["blob"] = None
+ st.session_state["message"] = None
+ st.session_state["typedef"] = None
+ st.session_state["edit_json"] = ""
+
+
+def load_row(rowid: int):
+ blob = with_conn(fetch_row, rowid)
+ if blob:
+ message, typedef = decode_blob(blob)
+ st.session_state["selected_rowid"] = rowid
+ st.session_state["blob"] = blob
+ st.session_state["message"] = message
+ st.session_state["typedef"] = typedef
+ st.session_state["edit_json"] = json.dumps(
+ make_json_serialisable(message), indent=2
+ )
+
+
+with st.sidebar:
+ st.markdown(
+ "Upload your local `settings.db` file, drag & drop or click to choose. "
+ )
+
+ uploaded = st.file_uploader(
+ "Upload settings.db",
+ type=["db", "sqlite", "sqlite3"],
+ accept_multiple_files=False,
+ )
+
+ if uploaded is not None:
+ prev_tmp = st.session_state.get("db_temp_path")
+ if prev_tmp and os.path.exists(prev_tmp):
+ try:
+ os.remove(prev_tmp)
+ except Exception:
+ pass
+
+ tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".db")
+ tmp.write(uploaded.getvalue())
+ tmp.flush()
+ tmp.close()
+ st.session_state["db_temp_path"] = tmp.name
+ st.session_state["db_path"] = tmp.name
+
+ try:
+ open_db(tmp.name)
+ st.success("Database opened read-only")
+ except Exception as exc:
+ st.error(f"Could not open uploaded database: {exc}")
+
+ st.divider()
+
+ if st.session_state["db_path_open"]:
+ rows = st.session_state["rows"]
+ row_messages = st.session_state["row_messages"]
+
+ def row_label(rowid, key) -> str:
+ label = key.decode("utf-8", errors="replace") if isinstance(key, bytes) else str(key)
+ msg = row_messages.get(rowid, {})
+ has_uuid = message_contains_uuid(msg)
+ has_creds = extract_credentials(msg) is not None
+ if has_uuid and has_creds:
+ prefix = "🔑 "
+ elif has_uuid:
+ prefix = "⭐ "
+ else:
+ prefix = ""
+ return f"{prefix}rowid {rowid} — {label}"
+
+ labels = [row_label(rowid, key) for rowid, key in rows]
+ choice = st.selectbox("Select row", options=labels, index=0)
+ idx = labels.index(choice)
+ selected_rowid = rows[idx][0]
+
+ if selected_rowid != st.session_state.get("selected_rowid"):
+ try:
+ load_row(selected_rowid)
+ except Exception as exc:
+ st.error(f"Failed to decode row: {exc}")
+
+ st.divider()
+ if st.button("Close database", use_container_width=True):
+ tmp = st.session_state.get("db_temp_path")
+ if tmp and os.path.exists(tmp):
+ try:
+ os.remove(tmp)
+ except Exception:
+ pass
+ for k in ["db_path", "db_temp_path", "db_path_open", "rows",
+ "row_messages", "selected_rowid", "blob", "message",
+ "typedef", "edit_json"]:
+ st.session_state[k] = [] if k == "rows" else ({} if k == "row_messages" else None if k != "db_path" else "")
+ st.rerun()
+
+if not st.session_state["db_path_open"]:
+ st.markdown(
+ """
+ ## Teamspeak 6 Matrix Credential Extractor
+
+ This is a tool built for extracting your Matrix credentials from Teamspeak 6 to access the chat functionalities from Matrix compatible clients.
+
+ ### Usage
+ Using the sidebar, load your `settings.db. You can find it in the locations listed below:
+
+ - Windows: `%APPDATA%\TeamSpeak\Default`
+ - Linux: `~/.config/TeamSpeak/Default`
+ - Mac: `~/Library/Preferences/TeamSpeak/Default`
+
+ ### Limitations
+ Only unencrypted group chats will work properly, encrypted chat is extremely flakey.
+
+ Teamspeak has various custom tools built-on top of the Matrix protocol
+ so not everything is supported/compatible. **Use at your own risk**
+ """
+ )
+ st.stop()
+
+if not st.session_state["selected_rowid"]:
+ st.info("Select a row from the sidebar to inspect it.")
+ st.stop()
+
+rowid = st.session_state["selected_rowid"]
+message = st.session_state["message"]
+
+st.header(f"Row `{rowid}`")
+
+creds = extract_credentials(message) if message else None
+if creds:
+ st.success("✅ Matrix credentials found in thsi row!")
+ c1, c2, c3 = st.columns(3)
+ with c1:
+ st.markdown("**Home server**")
+ st.code(creds["homeserver"])
+ with c2:
+ st.markdown("**Username (yes it's that long)**")
+ st.code(creds["username"])
+ with c3:
+ st.markdown("**Password**")
+ st.code(creds["password"])
+ st.caption("Use the information here to login to your favorite Matrix client!")
+ st.caption(f"Your full Matrix username is {creds["username"]}@{creds["homeserver"]}")
+
+ st.divider()
+elif message:
+ st.info('No credentials found in this row, try choosing a different one')
+
+tab_decoded, tab_typedef, tab_view, tab_raw = st.tabs([
+ "📋 Decoded",
+ "🗂 Type definition",
+ "🔍 Read-only view",
+ "🔢 Raw bytes",
+])
+
+with tab_decoded:
+ if message is not None:
+ st.json(make_json_serialisable(message), expanded=True)
+ else:
+ st.warning("No decoded message available.")
+
+with tab_typedef:
+ if st.session_state["typedef"] is not None:
+ st.json(st.session_state["typedef"], expanded=True)
+ else:
+ st.warning("No type definition available.")
+
+with tab_view:
+ if st.session_state["edit_json"]:
+ st.code(st.session_state["edit_json"], language="json")
+ else:
+ st.warning("No message to display.")
+
+with tab_raw:
+ blob = st.session_state["blob"]
+ if blob:
+ col_info, col_dl = st.columns([3, 1])
+ with col_info:
+ st.metric("Blob size", f"{len(blob):,} bytes")
+ if st.checkbox("Show raw hex"):
+ hex_str = blob.hex()
+ lines = []
+ for i in range(0, len(hex_str), 32):
+ offset = i // 2
+ chunk = hex_str[i : i + 32]
+ spaced = " ".join(chunk[j : j + 2] for j in range(0, len(chunk), 2))
+ lines.append(f"{offset:06x} {spaced}")
+ st.code("\n".join(lines), language="none")
+ else:
+ st.warning("No blob data available.")
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage