{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "provenance": [], "authorship_tag": "ABX9TyOwX80jyPXCpuCbpRs2azko", "include_colab_link": true }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "markdown", "metadata": { "id": "view-in-github", "colab_type": "text" }, "source": [ "\"Open" ] }, { "cell_type": "markdown", "source": [ "# Khinsider to WebDAV\n", "Download album from Khinsider and upload to WebDAV" ], "metadata": { "id": "gKrjW5sGQcQT" } }, { "cell_type": "markdown", "source": [ "## Khinsider\n", "Uses obskry's khinsider.py to download video game music. Module below has been modified to run in a notebook environment.\n", "\n", "https://github.com/obskyr/khinsider/blob/master/khinsider.py" ], "metadata": { "id": "jI0Dpv6-Q6fT" } }, { "cell_type": "code", "source": [ "from __future__ import print_function, unicode_literals\n", "\n", "import os\n", "import re\n", "import sys\n", "from functools import wraps\n", "from itertools import chain\n", "\n", "# Install required libraries\n", "!pip install requests beautifulsoup4\n", "\n", "try:\n", " from urllib.parse import unquote, urljoin, urlsplit\n", "except ImportError: # Python 2\n", " from urlparse import unquote, urljoin, urlsplit\n", "\n", "from os import getcwd\n", "\n", "import requests\n", "from bs4 import BeautifulSoup\n", "\n", "BASE_URL = 'https://downloads.khinsider.com/'\n", "\n", "# Suppress printing\n", "class Silence(object):\n", " def __enter__(self):\n", " self._stdout = sys.stdout\n", " self._stderr = sys.stderr\n", " sys.stdout = open(os.devnull, 'w')\n", " sys.stderr = open(os.devnull, 'w')\n", "\n", " def __exit__(self, *_):\n", " sys.stdout = self._stdout\n", " sys.stderr = self._stderr\n", "\n", "\n", "# --- Helper functions ---\n", "\n", "FILENAME_INVALID_RE = re.compile(r'[<>:\"/\\\\|?*]')\n", "def to_valid_filename(s):\n", " s = s.rstrip(' .')\n", "\n", " if s in {'', '.', '..', '~', 'CON', 'PRN', 'AUX', 'NUL', 'COM1', 'COM2',\n", " 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9', 'LPT1',\n", " 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'}:\n", " return s + '_'\n", "\n", " return FILENAME_INVALID_RE.sub('-', s)\n", "\n", "def unicodePrint(*args, **kwargs):\n", " unicodeType = str if sys.version_info[0] > 2 else unicode\n", " encoding = sys.stdout.encoding or 'utf-8'\n", " args = [\n", " arg.encode(encoding, 'replace').decode(encoding)\n", " if isinstance(arg, unicodeType) else arg\n", " for arg in args\n", " ]\n", " print(*args, **kwargs)\n", "\n", "def lazyProperty(func):\n", " attrName = '_lazy_' + func.__name__\n", " @property\n", " @wraps(func)\n", " def lazyVersion(self):\n", " if not hasattr(self, attrName):\n", " setattr(self, attrName, func(self))\n", " return getattr(self, attrName)\n", " return lazyVersion\n", "\n", "def getSoup(*args, **kwargs):\n", " r = requests.get(*args, **kwargs)\n", " return toSoup(r)\n", "\n", "REMOVE_RE = re.compile(br\"^\\s*$\", re.MULTILINE)\n", "BAD_AMPERSAND_RE = re.compile(br\"&#([^0-9x]|x[^0-9A-Fa-f])\")\n", "def toSoup(r):\n", " content = r.content\n", " # Fix errors in khinsider's HTML.\n", " content = REMOVE_RE.sub(b'', content)\n", " content = BAD_AMPERSAND_RE.sub(b'&#\\1', content)\n", "\n", " with Silence():\n", " return BeautifulSoup(content, 'html.parser')\n", "\n", "def getAppropriateFile(song, formatOrder):\n", " if formatOrder is None:\n", " return song.files[0]\n", "\n", " for extension in formatOrder:\n", " for file in song.files:\n", " if os.path.splitext(file.filename)[1][1:].lower() == extension:\n", " return file\n", "\n", " return song.files[0]\n", "\n", "def friendlyDownloadFile(file, path, index, total, verbose=False):\n", " numberStr = \"{}/{}\".format(\n", " str(index).zfill(len(str(total))),\n", " str(total)\n", " )\n", "\n", " if file is None and verbose:\n", " print(\"Song {} is nonexistent (404: Not Found). Skipping over.\".format(numberStr), file=sys.stderr)\n", " return False\n", "\n", " encoding = 'utf-8'\n", " filename = file.filename.encode(encoding, 'replace').decode(encoding)\n", "\n", " byTheWay = \"\"\n", " if filename != file.filename:\n", " byTheWay = \" (replaced characters not in the filesystem's \\\"{}\\\" encoding)\".format(encoding)\n", "\n", " filename = to_valid_filename(filename)\n", " path = os.path.join(path, filename)\n", "\n", " if not os.path.exists(path):\n", " if verbose:\n", " unicodePrint(\"Downloading {}: {}{}...\".format(numberStr, filename, byTheWay))\n", " for triesElapsed in range(3):\n", " if verbose and triesElapsed:\n", " unicodePrint(\"Couldn't download {}. Trying again...\".format(filename), file=sys.stderr)\n", " try:\n", " file.download(path)\n", " except (requests.ConnectionError, requests.Timeout):\n", " pass\n", " else:\n", " break\n", " else:\n", " if verbose:\n", " unicodePrint(\"Couldn't download {}. Skipping over.\".format(filename), file=sys.stderr)\n", " return False\n", " else:\n", " if verbose:\n", " unicodePrint(\"Skipping over {}: {}{}. Already exists.\".format(numberStr, filename, byTheWay))\n", "\n", " return True\n", "\n", "class KhinsiderError(Exception):\n", " pass\n", "\n", "class NonexistentSongError(KhinsiderError):\n", " pass\n", "\n", "class SoundtrackError(Exception):\n", " def __init__(self, soundtrack):\n", " self.soundtrack = soundtrack\n", "\n", "class NonexistentSoundtrackError(SoundtrackError, ValueError):\n", " def __str__(self):\n", " ost = '\"{}\" '.format(self.soundtrack.id) if len(self.soundtrack.id) <= 80 else \"\"\n", " s = \"The soundtrack {}does not exist.\".format(ost)\n", " return s\n", "\n", "class NonexistentFormatsError(SoundtrackError, ValueError):\n", " def __init__(self, soundtrack, requestedFormats):\n", " super(NonexistentFormatsError, self).__init__(soundtrack)\n", " self.requestedFormats = requestedFormats\n", " def __str__(self):\n", " ost = '\"{}\" '.format(self.soundtrack.id) if len(self.soundtrack.id) <= 80 else \"\"\n", " s = \"The soundtrack {}is not available in the requested formats ({}).\".format(\n", " ost,\n", " \", \".join('\"{}\"'.format(extension) for extension in self.requestedFormats))\n", " return s\n", "\n", "class Soundtrack(object):\n", " \"\"\"A KHInsider soundtrack. Initialize with a soundtrack ID.\n", "\n", " Properties:\n", " * id: The soundtrack's unique ID, used at the end of its URL.\n", " * url: The full URL of the soundtrack.\n", " * name: The textual title of the soundtrack.\n", " * availableFormats: A list of the formats the soundtrack is available in.\n", " * songs: A list of Song objects representing the songs in the soundtrack.\n", " * images: A list of File objects representing the images in the soundtrack.\n", " \"\"\"\n", "\n", " def __init__(self, soundtrackId):\n", " self.id = soundtrackId\n", " self.url = urljoin(BASE_URL, 'game-soundtracks/album/' + self.id)\n", "\n", " def __repr__(self):\n", " return \"<{}: {}>\".format(self.__class__.__name__, self.id)\n", "\n", " def _isLoaded(self, property):\n", " return hasattr(self, '_lazy_' + property)\n", "\n", " @lazyProperty\n", " def _contentSoup(self):\n", " soup = getSoup(self.url)\n", " contentSoup = soup.find(id='pageContent')\n", " if contentSoup.find('p').string == \"No such album\":\n", " raise NonexistentSoundtrackError(self)\n", " return contentSoup\n", "\n", " @lazyProperty\n", " def name(self):\n", " return next(self._contentSoup.find('h2').stripped_strings)\n", "\n", " @lazyProperty\n", " def availableFormats(self):\n", " table = self._contentSoup.find('table', id='songlist')\n", " header = table.find('tr')\n", " headings = [td.get_text(strip=True) for td in header(['th', 'td'])]\n", " formats = [s.lower() for s in headings if s not in {\"\", \"Track\", \"Song Name\", \"Download\", \"Size\"}]\n", " formats = formats or ['mp3']\n", " return formats\n", "\n", " @lazyProperty\n", " def songs(self):\n", " table = self._contentSoup.find('table', id='songlist')\n", " anchors = [tr.find('a') for tr in table('tr') if not tr.find('th')]\n", " urls = [a['href'] for a in anchors]\n", " songs = [Song(urljoin(self.url, url)) for url in urls]\n", " return songs\n", "\n", " @lazyProperty\n", " def images(self):\n", " table = self._contentSoup.find('table')\n", " if not table:\n", " return []\n", " anchors = [a for a in table('a') if a.find('img')]\n", " urls = [a['href'] for a in anchors]\n", " images = [File(urljoin(self.url, url)) for url in urls]\n", " return images\n", "\n", " def download(self, path='', makeDirs=True, formatOrder=None, verbose=False):\n", " path = os.path.join(getcwd(), path)\n", " path = os.path.abspath(os.path.realpath(path))\n", " if formatOrder:\n", " formatOrder = [extension.lower() for extension in formatOrder]\n", " if not set(self.availableFormats) & set(formatOrder):\n", " raise NonexistentFormatsError(self, formatOrder)\n", "\n", " if verbose and not self._isLoaded('songs'):\n", " print(\"Getting song list...\")\n", " files = []\n", " for song in self.songs:\n", " try:\n", " files.append(getAppropriateFile(song, formatOrder))\n", " except NonexistentSongError:\n", " files.append(None)\n", " files.extend(self.images)\n", " totalFiles = len(files)\n", "\n", " if makeDirs and not os.path.isdir(path):\n", " os.makedirs(os.path.abspath(os.path.realpath(path)))\n", "\n", " success = True\n", " for fileNumber, file in enumerate(files, 1):\n", " if not friendlyDownloadFile(file, path, fileNumber, totalFiles, verbose):\n", " success = False\n", "\n", " return success\n", "\n", "class Song(object):\n", " \"\"\"A song on KHInsider.\n", "\n", " Properties:\n", " * url: The full URL of the song page.\n", " * name: The name of the song.\n", " * files: A list of the song's files - there may be several if the song\n", " is available in more than one format.\n", " \"\"\"\n", "\n", " def __init__(self, url):\n", " self.url = url\n", "\n", " def __repr__(self):\n", " return \"<{}: {}>\".format(self.__class__.__name__, self.url)\n", "\n", " @lazyProperty\n", " def _soup(self):\n", " r = requests.get(self.url, timeout=10)\n", " if r.url.rsplit('/', 1)[-1] == '404':\n", " raise NonexistentSongError(\"Nonexistent song page (404).\")\n", " return getSoup(self.url)\n", "\n", " @lazyProperty\n", " def name(self):\n", " return self._soup('p')[2]('b')[1].get_text()\n", "\n", " @lazyProperty\n", " def files(self):\n", " anchors = self._soup('a', href=re.compile(r'^https?://[^/]+/(?:soundtracks|ost)/.+$'))\n", " return [File(urljoin(self.url, a['href'])) for a in anchors]\n", "\n", "class File(object):\n", " \"\"\"A file belonging to a soundtrack on KHInsider.\n", "\n", " Properties:\n", " * url: The full URL of the file.\n", " * filename: The file's... filename. You got it.\n", " \"\"\"\n", "\n", " def __init__(self, url):\n", " self.url = url\n", "\n", " try:\n", " url = str(url)\n", " except UnicodeError:\n", " url = url.encode('utf-8')\n", " self.filename = unquote(url.rsplit(str('/'), 1)[-1])\n", " try:\n", " self.filename = self.filename.decode('utf-8')\n", " except AttributeError:\n", " pass\n", "\n", " def __repr__(self):\n", " return \"<{}: {}>\".format(self.__class__.__name__, self.url)\n", "\n", " def download(self, path):\n", " \"\"\"Download the file to `path`.\"\"\"\n", " response = requests.get(self.url, timeout=10)\n", " with open(path, 'wb') as outFile:\n", " outFile.write(response.content)\n", "\n", "def download_soundtrack(soundtrack_id, path='', make_dirs=True, format_order=None, verbose=False):\n", " \"\"\"Download the soundtrack with the ID `soundtrackId`.\n", " See Soundtrack.download for more information.\n", " \"\"\"\n", " soundtrack = Soundtrack(soundtrack_id)\n", " soundtrack.name # Load content in advance.\n", " path = to_valid_filename(soundtrack.name) if path is None else path\n", " if verbose:\n", " print(f\"Downloading to \\\"{path}\\\".\")\n", " return soundtrack.download(path, make_dirs, format_order, verbose)\n", "\n", "def search_soundtrack(term):\n", " \"\"\"Return a tuple of two lists of Soundtrack objects for the search term\n", " `term`. The first tuple contains album name results, and the second song\n", " name results.\n", " \"\"\"\n", " r = requests.get(urljoin(BASE_URL, 'search'), params={'search': term})\n", " path = urlsplit(r.url).path\n", " if path.split('/', 2)[1] == 'game-soundtracks':\n", " return [Soundtrack(path.rsplit('/', 1)[-1])]\n", "\n", " soup = toSoup(r)\n", "\n", " tables = soup('table', class_='albumList')\n", " if not tables:\n", " raise SearchError(soup.find('p').get_text(strip=True))\n", "\n", " soundtracks = [soundtracksInSearchTable(table) for table in tables]\n", " if len(soundtracks) == 1:\n", " if \"song\" in soup.find(id='pageContent').find('p').get_text():\n", " soundtracks.insert(0, [])\n", " else:\n", " soundtracks.append([])\n", "\n", " return soundtracks\n", "\n", "def soundtracksInSearchTable(table):\n", " anchors = (tr('td')[1].find('a') for tr in table('tr')[1:])\n", " soundtrackParams = [(a['href'].split('/')[-1], a.get_text(strip=True)) for a in anchors]\n", "\n", " soundtracks = []\n", " for id, name in soundtrackParams:\n", " curSoundtrack = Soundtrack(id)\n", " curSoundtrack._lazy_name = name\n", " soundtracks.append(curSoundtrack)\n", "\n", " return soundtracks\n", "\n", "def print_search_results(search_results):\n", " pad_len = max(len(x.id) for x in chain(*search_results))\n", " result_str = \"\"\n", " has_previous_list = False\n", " for heading, soundtracks in zip((\"Album title results:\", \"Song name results:\"), search_results):\n", " if soundtracks:\n", " if has_previous_list:\n", " result_str += \"\\n\"\n", " result_str += heading + \"\\n\"\n", " for soundtrack in soundtracks:\n", " result_str += \"{} {}. {}\\n\".format(soundtrack.id, '.' * (pad_len - len(soundtrack.id)), soundtrack.name)\n", " has_previous_list = True\n", " print(result_str)\n", "\n", "\n", "ALBUM_NAME = \"\" # @param {type: \"string\"}\n", "DOWNLOAD_PATH = \"output/\"+ALBUM_NAME\n", "if download_soundtrack(ALBUM_NAME, path=DOWNLOAD_PATH, format_order=['mp3'], verbose=True):\n", " print(\"Download was successful! Go ahead and proceed\")\n", "else:\n", " print(\"Download was unsuccessful. Please check the logs above\")" ], "metadata": { "id": "L2TvwtMkQaD6", "cellView": "form" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "# Connect\n", "Connect and see if there are any problems with your credentials" ], "metadata": { "id": "9N7JL34GGHaZ" } }, { "cell_type": "code", "source": [ "!pip install webdavclient3\n", "from webdav3.client import Client\n", "\n", "HOSTNAME = \"\" # @param {type: \"string\"}\n", "USERNAME = \"\" # @param {type: \"string\"}\n", "PASSWORD = \"\" # @param {type: \"string\"}\n", "\n", "options = {\n", " 'webdav_hostname': HOSTNAME,\n", " 'webdav_login': USERNAME,\n", " 'webdav_password': PASSWORD\n", "}\n", "client = Client(options)\n", "client.verify = False # To not check SSL certificates (Default = True)\n", "files1 = client.list()\n", "files1" ], "metadata": { "id": "aU33YKZUF-JN" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "TARGET_PATH = \"Music/Video Game OST/\" # @param {type: \"string\"}\n", "import os\n", "folders = [f for f in os.listdir(\"output\") if os.path.isdir(os.path.join(\"output\", f))]\n", "for folder in folders:\n", " print(\"Processing \" + folder)\n", " target_path = os.path.join(TARGET_PATH, folder)\n", " client.upload_sync(target_path, os.path.join(\"output\", folder))" ], "metadata": { "id": "hTJxy2y21HRo" }, "execution_count": null, "outputs": [] } ] }