From 0592564264fff57ccfd9677957196951f9f1c6cf Mon Sep 17 00:00:00 2001 From: RblSb Date: Sun, 26 Jan 2025 23:22:33 +0300 Subject: Video upload feature And you can play video as soon as upload starts! This is pretty useful for thicc ones. Video will be keeped in cache and will comply cache size limit. I'm also implemented system storage check to change cache limit if it's lower than config value, so upload will be blocked if there is lower than 10MiB available on disk. --- src/server/Cache.hx | 139 +++++++++++++++++++++++------- src/server/HttpServer.hx | 216 ++++++++++++++++++++++++++++++++++++++--------- src/server/Main.hx | 34 +++++--- src/server/Utils.hx | 6 +- 4 files changed, 310 insertions(+), 85 deletions(-) (limited to 'src/server') diff --git a/src/server/Cache.hx b/src/server/Cache.hx index 450a893..fa5889c 100644 --- a/src/server/Cache.hx +++ b/src/server/Cache.hx @@ -1,5 +1,6 @@ package server; +import haxe.io.Path; import js.lib.Promise; import js.node.ChildProcess; import js.node.Fs.Fs; @@ -16,7 +17,8 @@ class Cache { public final isYtReady = false; /** In bytes **/ - public var storageLimit = 3 * 1024 * 1024 * 1024; + var storageLimit = 3 * 1024 * 1024 * 1024; + final freeSpaceBlock = 10 * 1024 * 1024; // 10MB public function new(main:Main, cacheDir:String) { this.main = main; @@ -55,16 +57,22 @@ class Cache { return; } final outName = videoId + ".mp4"; - if (cachedFiles.contains(outName) && FileSystem.exists('$cacheDir/$outName')) { + if (cachedFiles.contains(outName) && isFileExists(outName)) { callback(outName); return; } final ytdl:Dynamic = untyped require("@distube/ytdl-core"); - log(client, 'Caching $url to $outName...'); - // final opts = {playerClients: ["IOS", "WEB_CREATOR"]}; + trace('Caching $url to $outName...'); + main.send(client, { + type: Progress, + progress: { + type: Caching, + ratio: 0, + data: outName + } + }); final promise:Promise = ytdl.getInfo(url); promise.then(info -> { - // trace(info.formats.filter(item -> item.audioCodec != null)); trace('Get info with ${info.formats.length} formats'); final audioFormat:YoutubeVideoFormat = try { ytdl.chooseFormat(info.formats.filter(item -> { @@ -73,26 +81,23 @@ class Cache { } catch (e) { log(client, "Error: audio format not found"); trace(e); - trace(info.formats); + trace(info.formats.filter(item -> item.hasAudio)); return; } final videoFormat = getBestYoutubeVideoFormat(info.formats) ?? { log(client, "Error: video format not found"); - trace(info.formats); + trace(info.formats.filter(item -> item.hasVideo)); return; } - trace("Picked audio and video formats"); final dlVideo:Readable = ytdl(url, { format: videoFormat, - // playerClients: opts.playerClients }); dlVideo.pipe(Fs.createWriteStream('$cacheDir/input-video')); dlVideo.on("error", err -> log(client, "Error during video download: " + err)); final dlAudio:Readable = ytdl(url, { format: audioFormat, - // playerClients: opts.playerClients }); dlAudio.pipe(Fs.createWriteStream('$cacheDir/input-audio')); dlAudio.on("error", err -> log(client, "Error during audio download: " + err)); @@ -100,8 +105,13 @@ class Cache { var count = 0; function onComplete(type:String):Void { count++; - log(client, '$type track downloaded ($count/2)'); + trace('$type track downloaded ($count/2)'); if (count < 2) return; + var size = FileSystem.stat('$cacheDir/input-video').size; + size += FileSystem.stat('$cacheDir/input-audio').size; + // clean some space for full mp4 + removeOlderCache(size + freeSpaceBlock); + final args = '-y -i input-video -i input-audio -c copy -map 0:v -map 1:a ./$outName'.split(" "); final process = ChildProcess.spawn("ffmpeg", args, { cwd: cacheDir, @@ -117,40 +127,111 @@ class Cache { } final inVideo = '$cacheDir/input-video'; final inAudio = '$cacheDir/input-audio'; - if (FileSystem.exists(inVideo)) FileSystem.deleteFile(inVideo); - if (FileSystem.exists(inAudio)) FileSystem.deleteFile(inAudio); + FileSystem.deleteFile(inVideo); + FileSystem.deleteFile(inAudio); - if (!cachedFiles.contains(outName)) { - cachedFiles.unshift(outName); - } - removeOlderCache(); + add(outName); callback(outName); }); } dlVideo.on("finish", () -> onComplete("Video")); dlAudio.on("finish", () -> onComplete("Audio")); - // dlVideo.on('progress', (c, d, t) -> { - // final progress = Std.int((d / t * 100) * 10) / 10; - // trace(progress); - // }); + var isAudioStart = true; + dlAudio.on("progress", (chunkLength:Int, downloaded:Int, contentLength:Int) -> { + if (isAudioStart) { + isAudioStart = false; + removeOlderCache(contentLength); + } + }); + var isVideoStart = true; + dlVideo.on("progress", (chunkLength:Int, downloaded:Int, contentLength:Int) -> { + if (isVideoStart) { + isVideoStart = false; + removeOlderCache(contentLength); + } + final ratio = (downloaded / contentLength).clamp(0, 1); + main.send(client, { + type: Progress, + progress: { + type: Downloading, + ratio: ratio + } + }); + }); }).catchError(err -> { log(client, "" + err); }); } - function removeOlderCache():Void { - while (getUsedSpace() > storageLimit) { - final name = cachedFiles.pop(); - final path = '$cacheDir/$name'; + public function setStorageLimit(bytes:Int) { + storageLimit = cast bytes; + storageLimit = storageLimit.limitMin(0); + final statfs = (Fs : Dynamic).statfs ?? return; + statfs("/", (err, stats) -> { + if (err != null) { + trace(err); + return; + } + final availSpace = (stats.bsize * stats.bavail - freeSpaceBlock).limitMin(0); + removeOlderCache(); + final freeSpace = getFreeSpace(); + if (availSpace < freeSpace) { + // shrink limit lower than disk space + storageLimit += availSpace - freeSpace; + storageLimit = storageLimit.limitMin(0); + removeOlderCache(); + } + }); + } + + public function add(name:String) { + if (!cachedFiles.contains(name)) { + cachedFiles.unshift(name); + } + } + + public function removeOlderCache(addFileSize = 0):Void { + var space = getUsedSpace(addFileSize); + while (space > storageLimit) { + final name = cachedFiles.pop() ?? break; + final path = getFilePath(name); if (FileSystem.exists(path)) FileSystem.deleteFile(path); + space = getUsedSpace(addFileSize); } } - function getUsedSpace():Int { - var total = 0; + public function getFreeFileName(baseName = "video"):String { + var i = 1; + while (true) { + final n = i == 1 ? "" : '$i'; + final name = '$baseName$n.mp4'; + if (!isFileExists(name)) return name; + i++; + } + } + + public function getFilePath(name:String):String { + return '$cacheDir/$name'; + } + + public function getFileUrl(name:String):String { + final folder = Path.withoutDirectory(cacheDir); + return '/$folder/$name'; + } + + public function isFileExists(name:String):Bool { + return FileSystem.exists(getFilePath(name)); + } + + public function getFreeSpace():Int { + return storageLimit - getUsedSpace(); + } + + public function getUsedSpace(addFileSize = 0):Int { + var total = addFileSize.limitMin(0); for (name in cachedFiles.reversed()) { - final path = '$cacheDir/$name'; + final path = getFilePath(name); if (!FileSystem.exists(path)) { cachedFiles.remove(name); continue; @@ -161,7 +242,7 @@ class Cache { } function getBestYoutubeVideoFormat(formats:Array):Null { - final qPriority = [1080, 720, 480, 360, 240]; + final qPriority = [1080, 720, 480, 360, 240, 144]; for (q in qPriority) { final quality = '${q}p'; for (format in formats) { diff --git a/src/server/HttpServer.hx b/src/server/HttpServer.hx index 7622708..6602e86 100644 --- a/src/server/HttpServer.hx +++ b/src/server/HttpServer.hx @@ -1,8 +1,10 @@ package server; +import Types.UploadResponse; +import haxe.Json; import haxe.io.Path; import js.node.Buffer; -import js.node.Fs; +import js.node.Fs.Fs; import js.node.Http; import js.node.Https; import js.node.Path as JsPath; @@ -12,6 +14,14 @@ import js.node.http.ServerResponse; import js.node.url.URL; import sys.FileSystem; +@:structInit +private class HttpServerConfig { + public final dir:String; + public final customDir:String = null; + public final allowLocalRequests = false; + public final cache:Cache = null; +} + class HttpServer { static final mimeTypes = [ "html" => "text/html", @@ -36,31 +46,49 @@ class HttpServer { "wasm" => "application/wasm" ]; - static var dir:String; - static var customDir:String; - static var hasCustomRes = false; - static var allowedLocalFiles:Map = []; - static var allowLocalRequests = false; - static final CHUNK_SIZE = 1024 * 1024 * 5; // 5 MB - - public static function init(dir:String, ?customDir:String, allowLocalRequests:Bool):Void { - HttpServer.dir = dir; - if (customDir == null) return; - HttpServer.customDir = customDir; - hasCustomRes = FileSystem.exists(customDir); - HttpServer.allowLocalRequests = allowLocalRequests; + final main:Main; + final dir:String; + final customDir:String; + final hasCustomRes = false; + final allowedLocalFiles:Map = []; + final allowLocalRequests = false; + final cache:Cache = null; + final CHUNK_SIZE = 1024 * 1024 * 5; // 5 MB + final uploadingFiles:Map = []; + final uploadingFilesLastChunks:Map = []; + + public function new(main:Main, config:HttpServerConfig):Void { + this.main = main; + dir = config.dir; + customDir = config.customDir; + allowLocalRequests = config.allowLocalRequests; + cache = config.cache; + + if (customDir != null) hasCustomRes = FileSystem.exists(customDir); } - public static function serveFiles(req:IncomingMessage, res:ServerResponse):Void { + public function serveFiles(req:IncomingMessage, res:ServerResponse):Void { final url = try { new URL(safeDecodeURI(req.url), "http://localhost"); - } catch (e) new URL("/", "http://localhost"); + } catch (e) { + new URL("/", "http://localhost"); + } var filePath = getPath(dir, url); final ext = Path.extension(filePath).toLowerCase(); res.setHeader("Accept-Ranges", "bytes"); res.setHeader("Content-Type", getMimeType(ext)); + if (cache != null && req.method == "POST") { + switch url.pathname { + case "/upload-last-chunk": + uploadFileLastChunk(req, res); + case "/upload": + uploadFile(req, res); + } + return; + } + if (allowLocalRequests && req.socket.remoteAddress == req.socket.localAddress || allowedLocalFiles[url.pathname]) { if (isMediaExtension(ext)) { @@ -103,14 +131,104 @@ class HttpServer { }); } - static function getPath(dir:String, url:URL):String { + function uploadFileLastChunk(req:IncomingMessage, res:ServerResponse) { + final name = cache.getFreeFileName(req.headers["content-name"]); + final filePath = cache.getFilePath(name); + final body:Array = []; + req.on("data", chunk -> body.push(chunk)); + req.on("end", () -> { + final buffer = Buffer.concat(body); + uploadingFilesLastChunks[filePath] = buffer; + res.writeHead(200, { + 'Content-Type': 'application/json', + }); + final json:UploadResponse = { + info: "File last chunk uploaded" + } + res.end(Json.stringify(json)); + }); + } + + function uploadFile(req:IncomingMessage, res:ServerResponse) { + final name = cache.getFreeFileName(req.headers["content-name"]); + final clientName = req.headers["client-name"]; + final filePath = cache.getFilePath(name); + final size = Std.parseInt(req.headers["content-length"]) ?? return; + var written = 0; + + inline function end(json:UploadResponse):Void { + uploadingFiles.remove(name); + uploadingFilesLastChunks.remove(name); + + res.statusCode = 200; + res.end(Json.stringify(json)); + } + + if (cache.getFreeSpace() < size) { + end({ + info: "Error: Not enough free space on server or file size is out of cache storage limit.", + errorId: "freeSpace" + }); + return; + } + + final stream = Fs.createWriteStream(filePath); + req.pipe(stream); + + inline function onStart() { + cache.removeOlderCache(size); + cache.add(name); + uploadingFiles[filePath] = size; + } + var isStart = true; + req.on("data", chunk -> { + var url:String = null; + if (isStart) { + isStart = false; + onStart(); + url = cache.getFileUrl(name); + } + written += chunk.length; + final ratio = (written / size).clamp(0, 1); + final percent = (ratio * 100).toFixed(2); + final client = main.clients.getByName(clientName) ?? return; + main.send(client, { + type: Progress, + progress: { + type: Uploading, + ratio: ratio, + data: url + } + }); + }); + stream.on("close", () -> { + end({ + info: "File write stream closed.", + }); + }); + stream.on("error", err -> { + trace(err); + end({ + info: "File write stream error.", + }); + }); + req.on("error", err -> { + trace("Request Error:", err); + stream.destroy(); + end({ + info: "File request error.", + }); + }); + } + + function getPath(dir:String, url:URL):String { var filePath = dir + url.pathname; filePath = filePath.urlDecode(); if (!FileSystem.isDirectory(filePath)) return filePath; return Path.addTrailingSlash(filePath) + "index.html"; } - static function readFileError(err:Dynamic, res:ServerResponse, filePath:String):Void { + function readFileError(err:Dynamic, res:ServerResponse, filePath:String):Void { res.setHeader("Content-Type", getMimeType("html")); if (err.code == "ENOENT") { res.statusCode = 404; @@ -122,9 +240,13 @@ class HttpServer { } } - static function serveMedia(req:IncomingMessage, res:ServerResponse, filePath:String):Bool { + function serveMedia(req:IncomingMessage, res:ServerResponse, filePath:String):Bool { if (!Fs.existsSync(filePath)) return false; - final videoSize = Fs.statSync(filePath).size; + var videoSize:Int = cast Fs.statSync(filePath).size; + // use future content length to start playing it before uploaded + if (uploadingFiles.exists(filePath)) { + videoSize = uploadingFiles[filePath]; + } final rangeHeader:String = req.headers["range"]; if (rangeHeader == null) { res.statusCode = 200; @@ -142,23 +264,33 @@ class HttpServer { res.setHeader("Content-Range", 'bytes $start-$end/$videoSize'); res.setHeader("Content-Length", '$contentLength'); - // HTTP Status 206 for Partial Content - res.statusCode = 206; - // create video read stream for this particular chunk - final videoStream = Fs.createReadStream(filePath, {start: cast start, end: cast end}); + res.statusCode = 206; // partial content + + // check for last chunk cache for instant play while uploading + final buffer = uploadingFilesLastChunks[filePath]; + if (buffer != null && end == videoSize - 1 && contentLength < buffer.byteLength) { + final bufferStart = (buffer.byteLength - contentLength).limitMin(0); + res.end(buffer.slice(bufferStart)); + return true; + } + // stream the video chunk to the client + final videoStream = Fs.createReadStream( + filePath, + {start: start, end: end} + ); videoStream.pipe(res); res.on("error", () -> videoStream.destroy()); res.on("close", () -> videoStream.destroy()); return true; } - static function parseRangeHeader(rangeHeader:String, videoSize:Float):{start:Float, end:Float} { + function parseRangeHeader(rangeHeader:String, videoSize:Int):{start:Int, end:Int} { final ranges = ~/[-=]/g.split(rangeHeader); - var start = Std.parseFloat(ranges[1]); + var start = Std.parseInt(ranges[1]); if (Utils.isOutOfRange(start, 0, videoSize - 1)) start = 0; - var end = Std.parseFloat(ranges[2]); - if (Math.isNaN(end)) end = start + CHUNK_SIZE; + var end = Std.parseInt(ranges[2]); + if (end == null) end = start + CHUNK_SIZE; if (Utils.isOutOfRange(end, start, videoSize - 1)) end = videoSize - 1; return { start: start, @@ -166,14 +298,14 @@ class HttpServer { }; } - static function isMediaExtension(ext:String):Bool { + function isMediaExtension(ext:String):Bool { return ext == "mp4" || ext == "webm" || ext == "mp3" || ext == "wav"; } - static final matchLang = ~/^[A-z]+/; - static final matchVarString = ~/\${([A-z_]+)}/g; + final matchLang = ~/^[A-z]+/; + final matchVarString = ~/\${([A-z_]+)}/g; - static function localizeHtml(data:String, lang:String):String { + function localizeHtml(data:String, lang:String):String { if (lang != null && matchLang.match(lang)) { lang = matchLang.matched(0); } else lang = "en"; @@ -184,7 +316,7 @@ class HttpServer { return data; } - static function proxyUrl(req:IncomingMessage, res:ServerResponse):Bool { + function proxyUrl(req:IncomingMessage, res:ServerResponse):Bool { final url = req.url.replace("/proxy?url=", ""); final proxy = proxyRequest(url, req, res, proxyRes -> { final url = proxyRes.headers["location"] ?? return false; @@ -201,7 +333,7 @@ class HttpServer { return true; } - static function proxyRequest( + function proxyRequest( url:String, req:IncomingMessage, res:ServerResponse, @@ -209,7 +341,9 @@ class HttpServer { ):Null { final url = try { new URL(safeDecodeURI(url)); - } catch (e) return null; + } catch (e) { + return null; + } if (url.host == req.headers["host"]) return null; final options = { host: url.hostname, @@ -222,7 +356,7 @@ class HttpServer { final request = url.protocol == "https:" ? Https.request : Http.request; final proxy = request(options, proxyRes -> { if (cancelProxyRequest(proxyRes)) return; - proxyRes.headers["Content-Type"] = "application/octet-stream"; + proxyRes.headers["content-type"] = "application/octet-stream"; res.writeHead(proxyRes.statusCode, proxyRes.headers); proxyRes.pipe(res); }); @@ -232,18 +366,18 @@ class HttpServer { return proxy; } - static function isChildOf(parent:String, child:String):Bool { + function isChildOf(parent:String, child:String):Bool { final rel = JsPath.relative(parent, child); return rel.length > 0 && !rel.startsWith('..') && !JsPath.isAbsolute(rel); } - static function getMimeType(ext:String):String { + function getMimeType(ext:String):String { return mimeTypes[ext] ?? return "application/octet-stream"; } - static final ctrlCharacters = ~/[\u0000-\u001F\u007F-\u009F\u2000-\u200D\uFEFF]/g; + final ctrlCharacters = ~/[\u0000-\u001F\u007F-\u009F\u2000-\u200D\uFEFF]/g; - static function safeDecodeURI(data:String):String { + function safeDecodeURI(data:String):String { try { data = decodeURI(data); } catch (err) { @@ -253,7 +387,7 @@ class HttpServer { return data; } - static inline function decodeURI(data:String):String { + inline function decodeURI(data:String):String { return js.Syntax.code("decodeURI({0})", data); } } diff --git a/src/server/Main.hx b/src/server/Main.hx index c863cce..4f0fd03 100644 --- a/src/server/Main.hx +++ b/src/server/Main.hx @@ -25,8 +25,6 @@ import json2object.JsonParser; import sys.FileSystem; import sys.io.File; -using ClientTools; - private typedef MainOptions = { loadState:Bool } @@ -52,11 +50,14 @@ class Main { final playersCacheSupport:Array = []; var port:Int; final userList:UserList; - final clients:Array = []; + + public final clients:Array = []; + final freeIds:Array = []; final wsEventParser = new JsonParser(); final consoleInput:ConsoleInput; final cache:Cache; + final cacheDir:String; final videoList = new VideoList(); final videoTimer = new VideoTimer(); final messages:Array = []; @@ -84,6 +85,8 @@ class Main { verbose = args.exists("verbose"); statePath = '$rootDir/user/state.json'; logsDir = '$rootDir/user/logs'; + cacheDir = '$rootDir/user/res/cache'; + // process.on("exit", exit); process.on("SIGINT", exit); // ctrl+c process.on("SIGUSR1", exit); // kill pid @@ -100,15 +103,16 @@ class Main { logError("unhandledRejection", reason); exit(); }); + logger = new Logger(logsDir, 10, verbose); consoleInput = new ConsoleInput(this); consoleInput.initConsoleInput(); - cache = new Cache(this, '$rootDir/user/res/cache'); + cache = new Cache(this, cacheDir); if (cache.isYtReady) playersCacheSupport.push(YoutubeType); initIntergationHandlers(); loadState(); config = loadUserConfig(); - cache.storageLimit = cast config.cacheStorageLimitGiB * 1024 * 1024 * 1024; + cache.setStorageLimit(cast config.cacheStorageLimitGiB * 1024 * 1024 * 1024); userList = loadUsers(); config.isVerbose = verbose; config.salt = generateConfigSalt(); @@ -154,11 +158,16 @@ class Main { } final dir = '$rootDir/res'; - HttpServer.init(dir, '$rootDir/user/res', config.localAdmins); + final httpServer = new HttpServer(this, { + dir: dir, + customDir: '$rootDir/user/res', + allowLocalRequests: config.localAdmins, + cache: cache, + }); Lang.init('$dir/langs'); final server = Http.createServer((req, res) -> { - HttpServer.serveFiles(req, res); + httpServer.serveFiles(req, res); }); wss = new WSServer({server: server}); wss.on("connection", onConnect); @@ -330,7 +339,7 @@ class Main { if (isHeroku && process.env["APP_URL"] != null) { var url = process.env["APP_URL"]; if (!url.startsWith("http")) url = 'http://$url'; - new Timer(10 * 60 * 1000).run = function() { + new Timer(10 * 60 * 1000).run = () -> { if (clients.length == 0) return; trace('Ping $url'); Http.get(url, r -> {}); @@ -644,6 +653,7 @@ class Main { broadcast(data); case ServerMessage: + case Progress: case AddVideo: if (isPlaylistLockedFor(client)) return; if (!checkPermission(client, AddVideoPerm)) return; @@ -682,7 +692,7 @@ class Main { addVideo(); } else { cache.cacheYoutubeVideo(client, item.url, (name) -> { - item = item.withUrl('/cache/$name'); + item = item.withUrl(cache.getFileUrl(name)); if (item.duration > 1) item.duration -= 1; addVideo(); }); @@ -973,17 +983,17 @@ class Main { }); } - function send(client:Client, data:WsEvent):Void { + public function send(client:Client, data:WsEvent):Void { client.ws.send(Json.stringify(data), null); } - function broadcast(data:WsEvent):Void { + public function broadcast(data:WsEvent):Void { final json = Json.stringify(data); for (client in clients) client.ws.send(json, null); } - function broadcastExcept(skipped:Client, data:WsEvent):Void { + public function broadcastExcept(skipped:Client, data:WsEvent):Void { final json = Json.stringify(data); for (client in clients) { if (client == skipped) continue; diff --git a/src/server/Utils.hx b/src/server/Utils.hx index 8fcc4c6..eb28115 100644 --- a/src/server/Utils.hx +++ b/src/server/Utils.hx @@ -70,7 +70,7 @@ class Utils { r.setEncoding("utf8"); final data = new StringBuf(); r.on("data", chunk -> data.add(chunk)); - r.on("end", _ -> callback(data.toString())); + r.on("end", () -> callback(data.toString())); }).on("error", onError).on("timeout", onError); } @@ -90,8 +90,8 @@ class Utils { return "127.0.0.1"; } - public static function isOutOfRange(value:Float, min:Float, max:Float):Bool { - return value == null || Math.isNaN(value) || value < min || value > max; + public static function isOutOfRange(value:Int, min:Int, max:Int):Bool { + return value == null || value < min || value > max; } public static function sortedPush(ids:Array, id:Int):Void { -- cgit v1.2.3