Hello! 欢迎来到小浪资源网!



通过ffmpeg子进程进行视频数据IO


在求职过程中,我需要完成一个处理视频数据的原型项目,这其中涉及到使用ffmpeg进行视频预处理,并将多个视频文件连接在一起播放。由于缺乏经验,我借助生成式ai聊天机器人(google gemini)的帮助完成了这个挑战。

通过ffmpeg子进程进行视频数据IO

项目目标是串联播放多个视频。我采用了最直接的方法——将视频文件连接起来。为此,首先需要将视频文件重新编码成合适的格式。在与gemini的讨论中,它建议使用MPEG-TS格式。

MPEG传输流(MPEG-TS)通过封装分组基本流工作,这些流包括音频、视频和PSIP数据,被打包成小段。每个流被分割成188字节的部分并交织在一起。这种方式确保了更短的延迟和更高的容错能力,使其成为理想的视频会议格式,尤其是在大帧可能导致音频延迟的情况下。 引自https://castr.com/blog/mpeg-transport-stream-mpeg-ts/

经过重新编码后,视频数据将被送入队列,供后续模块处理。确定输入(视频文件链接列表)和输出(重新编码的视频数据)后,我需要找到合适的ffmpeg命令。由于FFmpeg功能复杂,我利用Gemini的帮助,快速得到了正确的命令:

ffmpeg -hwaccel cuda -i pipe:0 -c:v h264_nvenc -b:v 1.5m -c:a aac -b:a 128k -f mpegts -y pipe:1

通过ffmpeg子进程进行视频数据IOGemini对FFmpeg命令的解释

该命令通过标准输入(stdin)接收视频数据,并将重新编码后的视频数据输出到标准输出(stdout)。

为了实现异步读取和写入,我使用了httpx库和asyncio。代码如下:

import httpx import asyncio import subprocess from functools import partial from queue import Queue  # ... (日志记录设置) ...  client = httpx.AsyncClient()  async def write_input(client, video_link, process):     assert isinstance(process.stdin, asyncio.StreamWriter)     async with client.stream("GET", video_link) as response:         logger.info("data: streaming video to queue", link=video_link)         async for chunk in response.aiter_raw(1024):             process.stdin.write(chunk)             await process.stdin.drain()         if process.stdin.can_write_eof():             process.stdin.write_eof()         process.stdin.close()         await process.stdin.wait_closed()     logger.info("data: done downloading video to ffmpeg")  async def video_send(queue, client, video_link):     logger.info("data: fetching video from link", link=video_link)     process = await asyncio.create_subprocess_exec(         "ffmpeg",         "-hwaccel",         "cuda",         "-i",         "pipe:0",         "-c:v",         "h264_nvenc",         "-b:v",         "1.5m",         "-c:a",         "aac",         "-b:a",         "128k",         "-f",         "mpegts",         "-y",         "pipe:1",         stdin=subprocess.PIPE,         stdout=subprocess.PIPE,     )     asyncio.create_task(write_input(client, video_link, process))     assert isinstance(process.stdout, asyncio.StreamReader)     while True:         chunk = await process.stdout.read(1024)         if not chunk:             break         else:             await asyncio.to_thread(partial(queue.put, chunk))     await process.wait()     logger.info("DATA: Done sending video to queue")  # ... (主函数调用video_send) ...

代码实现了异步地从网络下载视频并将其发送到FFmpeg进行处理,然后将处理后的数据放入队列。整个过程充分利用了asyncio的异步特性,避免了因等待下载而造成的延迟。

这个项目花了我一个晚上才完成,期间不断查阅文档并寻求Gemini的帮助。 希望本文对您有所帮助。

相关阅读