diff options
Diffstat (limited to 'src/server')
| -rw-r--r-- | src/server/Cache.hx | 139 | ||||
| -rw-r--r-- | src/server/HttpServer.hx | 214 | ||||
| -rw-r--r-- | src/server/Main.hx | 34 | ||||
| -rw-r--r-- | src/server/Utils.hx | 6 |
4 files changed, 309 insertions, 84 deletions
diff --git a/src/server/Cache.hx b/src/server/Cache.hx index 450a893..fa5889c 100644 --- a/src/server/Cache.hx +++ b/src/server/Cache.hx @@ -1,5 +1,6 @@ package server; +import haxe.io.Path; import js.lib.Promise; import js.node.ChildProcess; import js.node.Fs.Fs; @@ -16,7 +17,8 @@ class Cache { public final isYtReady = false; /** In bytes **/ - public var storageLimit = 3 * 1024 * 1024 * 1024; + var storageLimit = 3 * 1024 * 1024 * 1024; + final freeSpaceBlock = 10 * 1024 * 1024; // 10MB public function new(main:Main, cacheDir:String) { this.main = main; @@ -55,16 +57,22 @@ class Cache { return; } final outName = videoId + ".mp4"; - if (cachedFiles.contains(outName) && FileSystem.exists('$cacheDir/$outName')) { + if (cachedFiles.contains(outName) && isFileExists(outName)) { callback(outName); return; } final ytdl:Dynamic = untyped require("@distube/ytdl-core"); - log(client, 'Caching $url to $outName...'); - // final opts = {playerClients: ["IOS", "WEB_CREATOR"]}; + trace('Caching $url to $outName...'); + main.send(client, { + type: Progress, + progress: { + type: Caching, + ratio: 0, + data: outName + } + }); final promise:Promise<YouTubeVideoInfo> = ytdl.getInfo(url); promise.then(info -> { - // trace(info.formats.filter(item -> item.audioCodec != null)); trace('Get info with ${info.formats.length} formats'); final audioFormat:YoutubeVideoFormat = try { ytdl.chooseFormat(info.formats.filter(item -> { @@ -73,26 +81,23 @@ class Cache { } catch (e) { log(client, "Error: audio format not found"); trace(e); - trace(info.formats); + trace(info.formats.filter(item -> item.hasAudio)); return; } final videoFormat = getBestYoutubeVideoFormat(info.formats) ?? { log(client, "Error: video format not found"); - trace(info.formats); + trace(info.formats.filter(item -> item.hasVideo)); return; } - trace("Picked audio and video formats"); final dlVideo:Readable<Dynamic> = ytdl(url, { format: videoFormat, - // playerClients: opts.playerClients }); dlVideo.pipe(Fs.createWriteStream('$cacheDir/input-video')); dlVideo.on("error", err -> log(client, "Error during video download: " + err)); final dlAudio:Readable<Dynamic> = ytdl(url, { format: audioFormat, - // playerClients: opts.playerClients }); dlAudio.pipe(Fs.createWriteStream('$cacheDir/input-audio')); dlAudio.on("error", err -> log(client, "Error during audio download: " + err)); @@ -100,8 +105,13 @@ class Cache { var count = 0; function onComplete(type:String):Void { count++; - log(client, '$type track downloaded ($count/2)'); + trace('$type track downloaded ($count/2)'); if (count < 2) return; + var size = FileSystem.stat('$cacheDir/input-video').size; + size += FileSystem.stat('$cacheDir/input-audio').size; + // clean some space for full mp4 + removeOlderCache(size + freeSpaceBlock); + final args = '-y -i input-video -i input-audio -c copy -map 0:v -map 1:a ./$outName'.split(" "); final process = ChildProcess.spawn("ffmpeg", args, { cwd: cacheDir, @@ -117,40 +127,111 @@ class Cache { } final inVideo = '$cacheDir/input-video'; final inAudio = '$cacheDir/input-audio'; - if (FileSystem.exists(inVideo)) FileSystem.deleteFile(inVideo); - if (FileSystem.exists(inAudio)) FileSystem.deleteFile(inAudio); + FileSystem.deleteFile(inVideo); + FileSystem.deleteFile(inAudio); - if (!cachedFiles.contains(outName)) { - cachedFiles.unshift(outName); - } - removeOlderCache(); + add(outName); callback(outName); }); } dlVideo.on("finish", () -> onComplete("Video")); dlAudio.on("finish", () -> onComplete("Audio")); - // dlVideo.on('progress', (c, d, t) -> { - // final progress = Std.int((d / t * 100) * 10) / 10; - // trace(progress); - // }); + var isAudioStart = true; + dlAudio.on("progress", (chunkLength:Int, downloaded:Int, contentLength:Int) -> { + if (isAudioStart) { + isAudioStart = false; + removeOlderCache(contentLength); + } + }); + var isVideoStart = true; + dlVideo.on("progress", (chunkLength:Int, downloaded:Int, contentLength:Int) -> { + if (isVideoStart) { + isVideoStart = false; + removeOlderCache(contentLength); + } + final ratio = (downloaded / contentLength).clamp(0, 1); + main.send(client, { + type: Progress, + progress: { + type: Downloading, + ratio: ratio + } + }); + }); }).catchError(err -> { log(client, "" + err); }); } - function removeOlderCache():Void { - while (getUsedSpace() > storageLimit) { - final name = cachedFiles.pop(); - final path = '$cacheDir/$name'; + public function setStorageLimit(bytes:Int) { + storageLimit = cast bytes; + storageLimit = storageLimit.limitMin(0); + final statfs = (Fs : Dynamic).statfs ?? return; + statfs("/", (err, stats) -> { + if (err != null) { + trace(err); + return; + } + final availSpace = (stats.bsize * stats.bavail - freeSpaceBlock).limitMin(0); + removeOlderCache(); + final freeSpace = getFreeSpace(); + if (availSpace < freeSpace) { + // shrink limit lower than disk space + storageLimit += availSpace - freeSpace; + storageLimit = storageLimit.limitMin(0); + removeOlderCache(); + } + }); + } + + public function add(name:String) { + if (!cachedFiles.contains(name)) { + cachedFiles.unshift(name); + } + } + + public function removeOlderCache(addFileSize = 0):Void { + var space = getUsedSpace(addFileSize); + while (space > storageLimit) { + final name = cachedFiles.pop() ?? break; + final path = getFilePath(name); if (FileSystem.exists(path)) FileSystem.deleteFile(path); + space = getUsedSpace(addFileSize); } } - function getUsedSpace():Int { - var total = 0; + public function getFreeFileName(baseName = "video"):String { + var i = 1; + while (true) { + final n = i == 1 ? "" : '$i'; + final name = '$baseName$n.mp4'; + if (!isFileExists(name)) return name; + i++; + } + } + + public function getFilePath(name:String):String { + return '$cacheDir/$name'; + } + + public function getFileUrl(name:String):String { + final folder = Path.withoutDirectory(cacheDir); + return '/$folder/$name'; + } + + public function isFileExists(name:String):Bool { + return FileSystem.exists(getFilePath(name)); + } + + public function getFreeSpace():Int { + return storageLimit - getUsedSpace(); + } + + public function getUsedSpace(addFileSize = 0):Int { + var total = addFileSize.limitMin(0); for (name in cachedFiles.reversed()) { - final path = '$cacheDir/$name'; + final path = getFilePath(name); if (!FileSystem.exists(path)) { cachedFiles.remove(name); continue; @@ -161,7 +242,7 @@ class Cache { } function getBestYoutubeVideoFormat(formats:Array<YoutubeVideoFormat>):Null<YoutubeVideoFormat> { - final qPriority = [1080, 720, 480, 360, 240]; + final qPriority = [1080, 720, 480, 360, 240, 144]; for (q in qPriority) { final quality = '${q}p'; for (format in formats) { diff --git a/src/server/HttpServer.hx b/src/server/HttpServer.hx index 7622708..6602e86 100644 --- a/src/server/HttpServer.hx +++ b/src/server/HttpServer.hx @@ -1,8 +1,10 @@ package server; +import Types.UploadResponse; +import haxe.Json; import haxe.io.Path; import js.node.Buffer; -import js.node.Fs; +import js.node.Fs.Fs; import js.node.Http; import js.node.Https; import js.node.Path as JsPath; @@ -12,6 +14,14 @@ import js.node.http.ServerResponse; import js.node.url.URL; import sys.FileSystem; +@:structInit +private class HttpServerConfig { + public final dir:String; + public final customDir:String = null; + public final allowLocalRequests = false; + public final cache:Cache = null; +} + class HttpServer { static final mimeTypes = [ "html" => "text/html", @@ -36,31 +46,49 @@ class HttpServer { "wasm" => "application/wasm" ]; - static var dir:String; - static var customDir:String; - static var hasCustomRes = false; - static var allowedLocalFiles:Map<String, Bool> = []; - static var allowLocalRequests = false; - static final CHUNK_SIZE = 1024 * 1024 * 5; // 5 MB + final main:Main; + final dir:String; + final customDir:String; + final hasCustomRes = false; + final allowedLocalFiles:Map<String, Bool> = []; + final allowLocalRequests = false; + final cache:Cache = null; + final CHUNK_SIZE = 1024 * 1024 * 5; // 5 MB + final uploadingFiles:Map<String, Int> = []; + final uploadingFilesLastChunks:Map<String, Buffer> = []; + + public function new(main:Main, config:HttpServerConfig):Void { + this.main = main; + dir = config.dir; + customDir = config.customDir; + allowLocalRequests = config.allowLocalRequests; + cache = config.cache; - public static function init(dir:String, ?customDir:String, allowLocalRequests:Bool):Void { - HttpServer.dir = dir; - if (customDir == null) return; - HttpServer.customDir = customDir; - hasCustomRes = FileSystem.exists(customDir); - HttpServer.allowLocalRequests = allowLocalRequests; + if (customDir != null) hasCustomRes = FileSystem.exists(customDir); } - public static function serveFiles(req:IncomingMessage, res:ServerResponse):Void { + public function serveFiles(req:IncomingMessage, res:ServerResponse):Void { final url = try { new URL(safeDecodeURI(req.url), "http://localhost"); - } catch (e) new URL("/", "http://localhost"); + } catch (e) { + new URL("/", "http://localhost"); + } var filePath = getPath(dir, url); final ext = Path.extension(filePath).toLowerCase(); res.setHeader("Accept-Ranges", "bytes"); res.setHeader("Content-Type", getMimeType(ext)); + if (cache != null && req.method == "POST") { + switch url.pathname { + case "/upload-last-chunk": + uploadFileLastChunk(req, res); + case "/upload": + uploadFile(req, res); + } + return; + } + if (allowLocalRequests && req.socket.remoteAddress == req.socket.localAddress || allowedLocalFiles[url.pathname]) { if (isMediaExtension(ext)) { @@ -103,14 +131,104 @@ class HttpServer { }); } - static function getPath(dir:String, url:URL):String { + function uploadFileLastChunk(req:IncomingMessage, res:ServerResponse) { + final name = cache.getFreeFileName(req.headers["content-name"]); + final filePath = cache.getFilePath(name); + final body:Array<Any> = []; + req.on("data", chunk -> body.push(chunk)); + req.on("end", () -> { + final buffer = Buffer.concat(body); + uploadingFilesLastChunks[filePath] = buffer; + res.writeHead(200, { + 'Content-Type': 'application/json', + }); + final json:UploadResponse = { + info: "File last chunk uploaded" + } + res.end(Json.stringify(json)); + }); + } + + function uploadFile(req:IncomingMessage, res:ServerResponse) { + final name = cache.getFreeFileName(req.headers["content-name"]); + final clientName = req.headers["client-name"]; + final filePath = cache.getFilePath(name); + final size = Std.parseInt(req.headers["content-length"]) ?? return; + var written = 0; + + inline function end(json:UploadResponse):Void { + uploadingFiles.remove(name); + uploadingFilesLastChunks.remove(name); + + res.statusCode = 200; + res.end(Json.stringify(json)); + } + + if (cache.getFreeSpace() < size) { + end({ + info: "Error: Not enough free space on server or file size is out of cache storage limit.", + errorId: "freeSpace" + }); + return; + } + + final stream = Fs.createWriteStream(filePath); + req.pipe(stream); + + inline function onStart() { + cache.removeOlderCache(size); + cache.add(name); + uploadingFiles[filePath] = size; + } + var isStart = true; + req.on("data", chunk -> { + var url:String = null; + if (isStart) { + isStart = false; + onStart(); + url = cache.getFileUrl(name); + } + written += chunk.length; + final ratio = (written / size).clamp(0, 1); + final percent = (ratio * 100).toFixed(2); + final client = main.clients.getByName(clientName) ?? return; + main.send(client, { + type: Progress, + progress: { + type: Uploading, + ratio: ratio, + data: url + } + }); + }); + stream.on("close", () -> { + end({ + info: "File write stream closed.", + }); + }); + stream.on("error", err -> { + trace(err); + end({ + info: "File write stream error.", + }); + }); + req.on("error", err -> { + trace("Request Error:", err); + stream.destroy(); + end({ + info: "File request error.", + }); + }); + } + + function getPath(dir:String, url:URL):String { var filePath = dir + url.pathname; filePath = filePath.urlDecode(); if (!FileSystem.isDirectory(filePath)) return filePath; return Path.addTrailingSlash(filePath) + "index.html"; } - static function readFileError(err:Dynamic, res:ServerResponse, filePath:String):Void { + function readFileError(err:Dynamic, res:ServerResponse, filePath:String):Void { res.setHeader("Content-Type", getMimeType("html")); if (err.code == "ENOENT") { res.statusCode = 404; @@ -122,9 +240,13 @@ class HttpServer { } } - static function serveMedia(req:IncomingMessage, res:ServerResponse, filePath:String):Bool { + function serveMedia(req:IncomingMessage, res:ServerResponse, filePath:String):Bool { if (!Fs.existsSync(filePath)) return false; - final videoSize = Fs.statSync(filePath).size; + var videoSize:Int = cast Fs.statSync(filePath).size; + // use future content length to start playing it before uploaded + if (uploadingFiles.exists(filePath)) { + videoSize = uploadingFiles[filePath]; + } final rangeHeader:String = req.headers["range"]; if (rangeHeader == null) { res.statusCode = 200; @@ -142,23 +264,33 @@ class HttpServer { res.setHeader("Content-Range", 'bytes $start-$end/$videoSize'); res.setHeader("Content-Length", '$contentLength'); - // HTTP Status 206 for Partial Content - res.statusCode = 206; - // create video read stream for this particular chunk - final videoStream = Fs.createReadStream(filePath, {start: cast start, end: cast end}); + res.statusCode = 206; // partial content + + // check for last chunk cache for instant play while uploading + final buffer = uploadingFilesLastChunks[filePath]; + if (buffer != null && end == videoSize - 1 && contentLength < buffer.byteLength) { + final bufferStart = (buffer.byteLength - contentLength).limitMin(0); + res.end(buffer.slice(bufferStart)); + return true; + } + // stream the video chunk to the client + final videoStream = Fs.createReadStream( + filePath, + {start: start, end: end} + ); videoStream.pipe(res); res.on("error", () -> videoStream.destroy()); res.on("close", () -> videoStream.destroy()); return true; } - static function parseRangeHeader(rangeHeader:String, videoSize:Float):{start:Float, end:Float} { + function parseRangeHeader(rangeHeader:String, videoSize:Int):{start:Int, end:Int} { final ranges = ~/[-=]/g.split(rangeHeader); - var start = Std.parseFloat(ranges[1]); + var start = Std.parseInt(ranges[1]); if (Utils.isOutOfRange(start, 0, videoSize - 1)) start = 0; - var end = Std.parseFloat(ranges[2]); - if (Math.isNaN(end)) end = start + CHUNK_SIZE; + var end = Std.parseInt(ranges[2]); + if (end == null) end = start + CHUNK_SIZE; if (Utils.isOutOfRange(end, start, videoSize - 1)) end = videoSize - 1; return { start: start, @@ -166,14 +298,14 @@ class HttpServer { }; } - static function isMediaExtension(ext:String):Bool { + function isMediaExtension(ext:String):Bool { return ext == "mp4" || ext == "webm" || ext == "mp3" || ext == "wav"; } - static final matchLang = ~/^[A-z]+/; - static final matchVarString = ~/\${([A-z_]+)}/g; + final matchLang = ~/^[A-z]+/; + final matchVarString = ~/\${([A-z_]+)}/g; - static function localizeHtml(data:String, lang:String):String { + function localizeHtml(data:String, lang:String):String { if (lang != null && matchLang.match(lang)) { lang = matchLang.matched(0); } else lang = "en"; @@ -184,7 +316,7 @@ class HttpServer { return data; } - static function proxyUrl(req:IncomingMessage, res:ServerResponse):Bool { + function proxyUrl(req:IncomingMessage, res:ServerResponse):Bool { final url = req.url.replace("/proxy?url=", ""); final proxy = proxyRequest(url, req, res, proxyRes -> { final url = proxyRes.headers["location"] ?? return false; @@ -201,7 +333,7 @@ class HttpServer { return true; } - static function proxyRequest( + function proxyRequest( url:String, req:IncomingMessage, res:ServerResponse, @@ -209,7 +341,9 @@ class HttpServer { ):Null<ClientRequest> { final url = try { new URL(safeDecodeURI(url)); - } catch (e) return null; + } catch (e) { + return null; + } if (url.host == req.headers["host"]) return null; final options = { host: url.hostname, @@ -222,7 +356,7 @@ class HttpServer { final request = url.protocol == "https:" ? Https.request : Http.request; final proxy = request(options, proxyRes -> { if (cancelProxyRequest(proxyRes)) return; - proxyRes.headers["Content-Type"] = "application/octet-stream"; + proxyRes.headers["content-type"] = "application/octet-stream"; res.writeHead(proxyRes.statusCode, proxyRes.headers); proxyRes.pipe(res); }); @@ -232,18 +366,18 @@ class HttpServer { return proxy; } - static function isChildOf(parent:String, child:String):Bool { + function isChildOf(parent:String, child:String):Bool { final rel = JsPath.relative(parent, child); return rel.length > 0 && !rel.startsWith('..') && !JsPath.isAbsolute(rel); } - static function getMimeType(ext:String):String { + function getMimeType(ext:String):String { return mimeTypes[ext] ?? return "application/octet-stream"; } - static final ctrlCharacters = ~/[\u0000-\u001F\u007F-\u009F\u2000-\u200D\uFEFF]/g; + final ctrlCharacters = ~/[\u0000-\u001F\u007F-\u009F\u2000-\u200D\uFEFF]/g; - static function safeDecodeURI(data:String):String { + function safeDecodeURI(data:String):String { try { data = decodeURI(data); } catch (err) { @@ -253,7 +387,7 @@ class HttpServer { return data; } - static inline function decodeURI(data:String):String { + inline function decodeURI(data:String):String { return js.Syntax.code("decodeURI({0})", data); } } diff --git a/src/server/Main.hx b/src/server/Main.hx index c863cce..4f0fd03 100644 --- a/src/server/Main.hx +++ b/src/server/Main.hx @@ -25,8 +25,6 @@ import json2object.JsonParser; import sys.FileSystem; import sys.io.File; -using ClientTools; - private typedef MainOptions = { loadState:Bool } @@ -52,11 +50,14 @@ class Main { final playersCacheSupport:Array<PlayerType> = []; var port:Int; final userList:UserList; - final clients:Array<Client> = []; + + public final clients:Array<Client> = []; + final freeIds:Array<Int> = []; final wsEventParser = new JsonParser<WsEvent>(); final consoleInput:ConsoleInput; final cache:Cache; + final cacheDir:String; final videoList = new VideoList(); final videoTimer = new VideoTimer(); final messages:Array<Message> = []; @@ -84,6 +85,8 @@ class Main { verbose = args.exists("verbose"); statePath = '$rootDir/user/state.json'; logsDir = '$rootDir/user/logs'; + cacheDir = '$rootDir/user/res/cache'; + // process.on("exit", exit); process.on("SIGINT", exit); // ctrl+c process.on("SIGUSR1", exit); // kill pid @@ -100,15 +103,16 @@ class Main { logError("unhandledRejection", reason); exit(); }); + logger = new Logger(logsDir, 10, verbose); consoleInput = new ConsoleInput(this); consoleInput.initConsoleInput(); - cache = new Cache(this, '$rootDir/user/res/cache'); + cache = new Cache(this, cacheDir); if (cache.isYtReady) playersCacheSupport.push(YoutubeType); initIntergationHandlers(); loadState(); config = loadUserConfig(); - cache.storageLimit = cast config.cacheStorageLimitGiB * 1024 * 1024 * 1024; + cache.setStorageLimit(cast config.cacheStorageLimitGiB * 1024 * 1024 * 1024); userList = loadUsers(); config.isVerbose = verbose; config.salt = generateConfigSalt(); @@ -154,11 +158,16 @@ class Main { } final dir = '$rootDir/res'; - HttpServer.init(dir, '$rootDir/user/res', config.localAdmins); + final httpServer = new HttpServer(this, { + dir: dir, + customDir: '$rootDir/user/res', + allowLocalRequests: config.localAdmins, + cache: cache, + }); Lang.init('$dir/langs'); final server = Http.createServer((req, res) -> { - HttpServer.serveFiles(req, res); + httpServer.serveFiles(req, res); }); wss = new WSServer({server: server}); wss.on("connection", onConnect); @@ -330,7 +339,7 @@ class Main { if (isHeroku && process.env["APP_URL"] != null) { var url = process.env["APP_URL"]; if (!url.startsWith("http")) url = 'http://$url'; - new Timer(10 * 60 * 1000).run = function() { + new Timer(10 * 60 * 1000).run = () -> { if (clients.length == 0) return; trace('Ping $url'); Http.get(url, r -> {}); @@ -644,6 +653,7 @@ class Main { broadcast(data); case ServerMessage: + case Progress: case AddVideo: if (isPlaylistLockedFor(client)) return; if (!checkPermission(client, AddVideoPerm)) return; @@ -682,7 +692,7 @@ class Main { addVideo(); } else { cache.cacheYoutubeVideo(client, item.url, (name) -> { - item = item.withUrl('/cache/$name'); + item = item.withUrl(cache.getFileUrl(name)); if (item.duration > 1) item.duration -= 1; addVideo(); }); @@ -973,17 +983,17 @@ class Main { }); } - function send(client:Client, data:WsEvent):Void { + public function send(client:Client, data:WsEvent):Void { client.ws.send(Json.stringify(data), null); } - function broadcast(data:WsEvent):Void { + public function broadcast(data:WsEvent):Void { final json = Json.stringify(data); for (client in clients) client.ws.send(json, null); } - function broadcastExcept(skipped:Client, data:WsEvent):Void { + public function broadcastExcept(skipped:Client, data:WsEvent):Void { final json = Json.stringify(data); for (client in clients) { if (client == skipped) continue; diff --git a/src/server/Utils.hx b/src/server/Utils.hx index 8fcc4c6..eb28115 100644 --- a/src/server/Utils.hx +++ b/src/server/Utils.hx @@ -70,7 +70,7 @@ class Utils { r.setEncoding("utf8"); final data = new StringBuf(); r.on("data", chunk -> data.add(chunk)); - r.on("end", _ -> callback(data.toString())); + r.on("end", () -> callback(data.toString())); }).on("error", onError).on("timeout", onError); } @@ -90,8 +90,8 @@ class Utils { return "127.0.0.1"; } - public static function isOutOfRange(value:Float, min:Float, max:Float):Bool { - return value == null || Math.isNaN(value) || value < min || value > max; + public static function isOutOfRange(value:Int, min:Int, max:Int):Bool { + return value == null || value < min || value > max; } public static function sortedPush(ids:Array<Int>, id:Int):Void { |
