diff options
Diffstat (limited to 'src/server/cache/RawCache.hx')
| -rw-r--r-- | src/server/cache/RawCache.hx | 469 |
1 files changed, 469 insertions, 0 deletions
diff --git a/src/server/cache/RawCache.hx b/src/server/cache/RawCache.hx new file mode 100644 index 0000000..ed8679c --- /dev/null +++ b/src/server/cache/RawCache.hx @@ -0,0 +1,469 @@ +package server.cache; + +import js.node.Buffer; +import js.node.ChildProcess; +import js.node.Fs.Fs; +import js.node.Http; +import js.node.Https; +import js.node.http.ClientRequest; +import js.node.http.IncomingMessage; +import sys.FileSystem; +import sys.io.File; + +typedef Segment = { + i:Int, + url:String, + started:Bool, + completed:Bool, + name:String +} + +class RawCache { + final main:Main; + final cache:Cache; + + public function new(main:Main, cache:Cache):Void { + this.main = main; + this.cache = cache; + } + + public function cacheRawVideo(client:Client, url:String, callback:(name:String) -> Void) { + final isM3U8 = url.contains(".m3u8"); + final ext = isM3U8 ? "m3u8" : "mp4"; + + final matchName = ~/^([^:.]+)\.(.+)/; + final decodedUrl = try url.urlDecode() catch (e) url; + final lastPart = decodedUrl.substr(decodedUrl.lastIndexOf("/") + 1); + var outName = matchName.match(lastPart) ? matchName.matched(1) + + '.$ext' : 'video.$ext'; + outName = cache.getFreeFileName(outName); + + if (cache.exists(outName)) { + callback(outName); + return; + } + + trace('Caching $url to $outName...'); + main.send(client, { + type: Progress, + progress: { + type: Caching, + ratio: 0, + data: outName + } + }); + + if (isM3U8) { + handleM3u8(client, url, outName, callback); + } else { + handleMp4(client, url, outName, callback); + } + } + + function handleMp4(client:Client, url:String, outName:String, callback:(name:String) -> Void) { + downloadFile(client, url, outName, (downloaded, total) -> { + main.send(client, { + type: Progress, + progress: { + type: Downloading, + ratio: (downloaded / total).clamp(0, 1) + } + }); + }, () -> { + cache.add(outName); + callback(outName); + }, (err) -> { + log(client, 'Mp4 download failed: $err'); + cancelProgress(client); + }); + } + + function handleM3u8(client:Client, url:String, outName:String, callback:(name:String) -> Void):Void { + final useProxy = true; + downloadM3u8Playlist(client, url, useProxy, (playlist, totalSize, segments) -> { + // only playlist file donwloaded + if (useProxy) totalSize = playlist.length; + + if (!cache.removeOlderCache(totalSize + cache.freeSpaceBlock)) { + log(client, cache.notEnoughSpaceErrorText); + cancelProgress(client); + return; + } + + if (useProxy) { + main.send(client, { + type: Progress, + progress: { + type: Caching, + ratio: 1, + data: outName + } + }); + File.saveContent('${cache.cacheDir}/$outName', playlist); + cache.add(outName); + callback(outName); + return; + } + + var activeDownloads = 0; + final maxParallelDownloads = 10; + var downloaded = 0; + + function downloadNextBatch():Void { + for (segment in segments) { + if (activeDownloads >= maxParallelDownloads) break; + if (segment.started) continue; + segment.started = true; + activeDownloads++; + trace("download segment", segment.i); + + downloadFile(client, segment.url, segment.name, + (downloadedBytes, totalBytes) -> {}, + + () -> { + activeDownloads--; + segment.completed = true; + downloaded++; + + final progress = downloaded / segments.length; + main.send(client, { + type: Progress, + progress: { + type: Downloading, + ratio: progress.clamp(0, 1) + } + }); + + if (downloaded == segments.length) { + trace('All ${downloaded}/${segments.length} segments downloaded'); + + File.saveContent('${cache.cacheDir}/$outName', playlist); + cache.add(outName); + callback(outName); + // buildTsFiles( + // segments.map(item -> item.name), + // outName, + // client, + // callback + // ); + } else { + downloadNextBatch(); + } + }, + + (err) -> { + activeDownloads--; + downloaded++; + log(client, 'TS segment ${segment.i} download failed: $err'); + cancelProgress(client); + cleanupFiles(segments.map(item -> item.name)); + } + ); + } + } + + // Start the initial batch of downloads + downloadNextBatch(); + }, (err) -> { + log(client, 'M3U8 processing failed: $err'); + cancelProgress(client); + }); + } + + function request(url:String, ?options:Null<HttpsRequestOptions>, ?callback:Null<IncomingMessage-> + Void>):ClientRequest { + final httpsOptions:HttpsRequestOptions = options ?? {}; + // Allow self-signed certificates + httpsOptions.rejectUnauthorized = false; + httpsOptions.headers ??= {}; + + if (url.startsWith("https:")) { + return Https.request(url, httpsOptions, callback); + } else { + return Http.request(url, httpsOptions, callback); + } + } + + function downloadM3u8Playlist( + client:Client, + url:String, + useProxy:Bool, + onSuccess:(playlist:String, totalSize:Int, segments:Array<Segment>) -> Void, + onError:(err:String) -> Void + ) { + final options:HttpsRequestOptions = { + headers: { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Accept": "*/*", + } + }; + + final req = request(url, options, (res:IncomingMessage) -> { + if (res.statusCode >= 300 && res.statusCode < 400) { + final redirectUrl = res.headers.get("location"); + if (redirectUrl != null) { + downloadM3u8Playlist(client, redirectUrl, useProxy, onSuccess, onError); + return; + } + } + + final body:Array<Any> = []; + res.on("data", chunk -> body.push(chunk)); + res.on("end", () -> { + try { + final buffer = Buffer.concat(body); + final content = buffer.toString(); + if (!~/^#EXTM3U/.match(content)) { + onError("Invalid M3U8 playlist"); + return; + } + + final baseUrl = url.substring(0, url.lastIndexOf("/") + 1); + final segments:Array<Segment> = []; + + final lines = content.split("\n"); + for (lineI => line in lines) { + final line = line.trim(); + if (line.length == 0) continue; + if (line.startsWith("#")) continue; + final segmentUrl = !line.contains("://") ? baseUrl + line : line; + final i = segments.length; + final segment:Segment = { + i: i, + url: segmentUrl, + started: false, + completed: false, + name: 'segment$i.ts', + } + segments.push(segment); + lines[lineI] = './${segment.name}'; + if (useProxy) { + lines[lineI] = '/proxy?url=$segmentUrl'; + } + } + + // Head request can return full stream size, so lets do loose assumption + final req = request(segments[0].url, {method: Get}, (res:IncomingMessage) -> { + final contentLength = Std.parseInt(res.headers["content-length"]) ?? 0; + final totalSize = contentLength * (segments.length + 1); + if (totalSize == 0) { + onError("Failed to get segment sizes: no content-length"); + return; + } + onSuccess(lines.join("\n"), totalSize, segments); + }); + req.on("error", (err) -> { + onError("Request error: failed to get segment sizes"); + }); + req.end(); + } catch (e) { + onError('Playlist processing error: $e'); + } + }); + }); + + req.on("error", onError); + req.end(); + } + + function downloadFile( + client:Client, url:String, fileName:String, + onProgress:(downloaded:Int, total:Int) -> Void, + onComplete:() -> Void, + onError:(err:String) -> Void + ):Void { + final outPath = '${cache.cacheDir}/$fileName'; + final file = Fs.createWriteStream(outPath); + final options:HttpsRequestOptions = { + method: Get, + headers: { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537." + + Std.random(100), + "Accept": "*/*", + } + }; + final req = request(url, options, (res:IncomingMessage) -> { + final total = Std.parseInt(res.headers["content-length"]) ?? 0; + var downloaded = 0; + + // Handle response data chunks + res.on("data", (chunk) -> { + downloaded += chunk.length; + onProgress(downloaded, total); + + // Handle backpressure + if (!file.write(chunk)) { + res.pause(); + file.once("drain", () -> res.resume()); + } + }); + + // Handle response completion + res.on("end", () -> file.end()); + + // Handle response errors + res.on("error", (err) -> { + file.destroy(); + onError('Response error: $err'); + }); + }); + + // Handle file write completion + file.on("finish", onComplete); + + // Handle file system errors + file.on("error", (err) -> { + req.destroy(); + onError('File error: $err'); + }); + + // Handle request errors + req.on("error", (err) -> { + file.destroy(); + onError('Request failed: $err'); + }); + + req.end(); + } + + function buildTsFiles(tempFiles:Array<String>, outName:String, client:Client, callback:String->Void) { + final missingFiles = tempFiles.filter(f -> + !FileSystem.exists('${cache.cacheDir}/$f')); + if (missingFiles.length > 0) { + log(client, 'Concatenation failed: ${missingFiles.length} segments are missing'); + main.send(client, { + type: Progress, + progress: { + type: Canceled, + ratio: 1 + } + }); + cleanupFiles(tempFiles); + return; + } + + // Create concat file with absolute paths and proper escaping + final concatFile = 'concat_list.txt'; + final concatContent = tempFiles.map(f -> { + final path = './$f'; + return 'file \'${path}\''; + }).join("\n"); + + File.saveContent('${cache.cacheDir}/$concatFile', concatContent); + + // Prepare FFmpeg args with proper bitstream filters for TS files + final args = [ + "-y", // Overwrite output without asking + "-f", "concat", // Use concat format + "-safe", "0", // Allow absolute paths + "-i", concatFile, // Input file list + "-c", "copy", // Copy streams without re-encoding + "-bsf:a", "aac_adtstoasc", // Fix AAC audio streams from TS files + "-movflags", "+faststart", // Optimize for web streaming + outName // Output filename + ]; + + trace('Executing FFmpeg with args: ${args.join(" ")}'); + + // Create process with proper error capturing + final process = ChildProcess.spawn("ffmpeg", args, { + cwd: cache.cacheDir, + // stderr: "pipe" // Capture stderr for error reporting + }); + + final errorOutput:Array<Buffer> = []; + process.stderr.on("data", (data) -> errorOutput.push(data)); + + // Set a reasonable timeout + final timeout = 5 * 60 * 1000; // 5 minutes + final timeoutId = js.Node.setTimeout(() -> { + process.kill(); + log(client, 'FFmpeg process timed out after ${timeout / 1000} seconds'); + main.send(client, { + type: Progress, + progress: { + type: Canceled, + ratio: 1 + } + }); + cleanupFiles(tempFiles.concat([concatFile])); + }, timeout); + + process.on("close", (code:Int) -> { + js.Node.clearTimeout(timeoutId); + + if (code != 0) { + final errorMsg = Buffer.concat(errorOutput).toString(); + log(client, 'FFmpeg concatenation failed with code $code'); + trace('FFmpeg error output: $errorMsg'); + + // Log detailed error to admins + final admins = main.clients.filter(client -> client.isAdmin); + for (admin in admins) { + log(admin, 'FFmpeg error: $errorMsg'); + } + + main.send(client, { + type: Progress, + progress: { + type: Canceled, + ratio: 1 + } + }); + } else { + // Verify the output file exists and has content + if (FileSystem.exists('${cache.cacheDir}/$outName') + && FileSystem.stat('${cache.cacheDir}/$outName').size > 0) { + cache.add(outName); + callback(outName); + } else { + log(client, 'FFmpeg process completed but output file is missing or empty'); + main.send(client, { + type: Progress, + progress: { + type: Canceled, + ratio: 1 + } + }); + } + } + + // Clean up temporary files after everything is done + cleanupFiles(tempFiles.concat([concatFile])); + }); + + // Handle process errors (like if FFmpeg isn't found) + process.on("error", (err) -> { + js.Node.clearTimeout(timeoutId); + log(client, 'Failed to start FFmpeg: $err'); + main.send(client, { + type: Progress, + progress: { + type: Canceled, + ratio: 1 + } + }); + cleanupFiles(tempFiles.concat([concatFile])); + }); + } + + function cleanupFiles(files:Array<String>):Void { + for (file in files) { + if (FileSystem.exists(file)) FileSystem.deleteFile(file); + } + } + + function log(client:Client, msg:String):Void { + cache.log(client, msg); + } + + function cancelProgress(client:Client):Void { + main.send(client, { + type: Progress, + progress: { + type: Canceled, + ratio: 0 + } + }); + } +} |
