fftools/ffmpeg: convert to a threaded architecture

Change the main loop and every component (demuxers, decoders, filters,
encoders, muxers) to use the previously added transcode scheduler. Every
instance of every such component was already running in a separate
thread, but now they can actually run in parallel.

Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by
JEEB to be more correct and deterministic.
This commit is contained in:
Anton Khirnov
2023-07-18 16:37:52 +02:00
parent 9b8cc36ce0
commit d119ae2fd8
12 changed files with 609 additions and 2030 deletions

View File

@ -54,24 +54,6 @@ struct Decoder {
Scheduler *sch;
unsigned sch_idx;
pthread_t thread;
/**
* Queue for sending coded packets from the main thread to
* the decoder thread.
*
* An empty packet is sent to flush the decoder without terminating
* decoding.
*/
ThreadQueue *queue_in;
/**
* Queue for sending decoded frames from the decoder thread
* to the main thread.
*
* An empty frame is sent to signal that a single packet has been fully
* processed.
*/
ThreadQueue *queue_out;
};
// data that is local to the decoder thread and not visible outside of it
@ -80,24 +62,6 @@ typedef struct DecThreadContext {
AVPacket *pkt;
} DecThreadContext;
static int dec_thread_stop(Decoder *d)
{
void *ret;
if (!d->queue_in)
return 0;
tq_send_finish(d->queue_in, 0);
tq_receive_finish(d->queue_out, 0);
pthread_join(d->thread, &ret);
tq_free(&d->queue_in);
tq_free(&d->queue_out);
return (intptr_t)ret;
}
void dec_free(Decoder **pdec)
{
Decoder *dec = *pdec;
@ -105,8 +69,6 @@ void dec_free(Decoder **pdec)
if (!dec)
return;
dec_thread_stop(dec);
av_frame_free(&dec->frame);
av_packet_free(&dec->pkt);
@ -148,25 +110,6 @@ fail:
return AVERROR(ENOMEM);
}
static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
{
int i, ret = 0;
for (i = 0; i < ist->nb_filters; i++) {
ret = ifilter_send_frame(ist->filters[i], decoded_frame,
i < ist->nb_filters - 1 ||
ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
if (ret == AVERROR_EOF)
ret = 0; /* ignore */
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR,
"Failed to inject frame into filter network: %s\n", av_err2str(ret));
break;
}
}
return ret;
}
static AVRational audio_samplerate_update(void *logctx, Decoder *d,
const AVFrame *frame)
{
@ -421,28 +364,14 @@ static int process_subtitle(InputStream *ist, AVFrame *frame)
if (!subtitle)
return 0;
ret = send_frame_to_filters(ist, frame);
ret = sch_dec_send(d->sch, d->sch_idx, frame);
if (ret < 0)
return ret;
av_frame_unref(frame);
subtitle = (AVSubtitle*)frame->buf[0]->data;
if (!subtitle->num_rects)
return 0;
for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
OutputStream *ost = ist->outputs[oidx];
if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE)
continue;
ret = enc_subtitle(output_files[ost->file_index], ost, subtitle);
if (ret < 0)
return ret;
}
return 0;
return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
}
int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
{
Decoder *d = ist->decoder;
int ret = AVERROR_BUG;
@ -468,12 +397,24 @@ int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
AVFrame *frame)
{
Decoder *d = ist->decoder;
Decoder *d = ist->decoder;
AVPacket *flush_pkt = NULL;
AVSubtitle subtitle;
int got_output;
int ret;
if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) {
frame->pts = pkt->pts;
frame->time_base = pkt->time_base;
frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT;
ret = sch_dec_send(d->sch, d->sch_idx, frame);
return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
} else if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION) {
return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts, pkt->time_base,
AV_TIME_BASE_Q));
}
if (!pkt) {
flush_pkt = av_packet_alloc();
if (!flush_pkt)
@ -496,7 +437,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
ist->frames_decoded++;
// XXX the queue for transferring data back to the main thread runs
// XXX the queue for transferring data to consumers runs
// on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
// inside the frame
// eventually, subtitles should be switched to use AVFrames natively
@ -509,26 +450,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
frame->width = ist->dec_ctx->width;
frame->height = ist->dec_ctx->height;
ret = tq_send(d->queue_out, 0, frame);
if (ret < 0)
av_frame_unref(frame);
return ret;
}
static int send_filter_eof(InputStream *ist)
{
Decoder *d = ist->decoder;
int i, ret;
for (i = 0; i < ist->nb_filters; i++) {
int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
d->last_frame_pts + d->last_frame_duration_est;
ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb);
if (ret < 0)
return ret;
}
return 0;
return process_subtitle(ist, frame);
}
static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
@ -635,9 +557,11 @@ static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
ist->frames_decoded++;
ret = tq_send(d->queue_out, 0, frame);
if (ret < 0)
return ret;
ret = sch_dec_send(d->sch, d->sch_idx, frame);
if (ret < 0) {
av_frame_unref(frame);
return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
}
}
}
@ -679,7 +603,6 @@ fail:
void *decoder_thread(void *arg)
{
InputStream *ist = arg;
InputFile *ifile = input_files[ist->file_index];
Decoder *d = ist->decoder;
DecThreadContext dt;
int ret = 0, input_status = 0;
@ -691,19 +614,31 @@ void *decoder_thread(void *arg)
dec_thread_set_name(ist);
while (!input_status) {
int dummy, flush_buffers;
int flush_buffers, have_data;
input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
flush_buffers = input_status >= 0 && !dt.pkt->buf;
if (!dt.pkt->buf)
input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
have_data = input_status >= 0 &&
(dt.pkt->buf || dt.pkt->side_data_elems ||
(intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT ||
(intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION);
flush_buffers = input_status >= 0 && !have_data;
if (!have_data)
av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
flush_buffers ? "flush" : "EOF");
ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame);
av_packet_unref(dt.pkt);
av_frame_unref(dt.frame);
// AVERROR_EOF - EOF from the decoder
// AVERROR_EXIT - EOF from the scheduler
// we treat them differently when flushing
if (ret == AVERROR_EXIT) {
ret = AVERROR_EOF;
flush_buffers = 0;
}
if (ret == AVERROR_EOF) {
av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
flush_buffers ? "resetting" : "finishing");
@ -711,11 +646,10 @@ void *decoder_thread(void *arg)
if (!flush_buffers)
break;
/* report last frame duration to the demuxer thread */
/* report last frame duration to the scheduler */
if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
Timestamp ts = { .ts = d->last_frame_pts + d->last_frame_duration_est,
.tb = d->last_frame_tb };
av_thread_message_queue_send(ifile->audio_ts_queue, &ts, 0);
dt.pkt->pts = d->last_frame_pts + d->last_frame_duration_est;
dt.pkt->time_base = d->last_frame_tb;
}
avcodec_flush_buffers(ist->dec_ctx);
@ -724,149 +658,47 @@ void *decoder_thread(void *arg)
av_err2str(ret));
break;
}
// signal to the consumer thread that the entire packet was processed
ret = tq_send(d->queue_out, 0, dt.frame);
if (ret < 0) {
if (ret != AVERROR_EOF)
av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
break;
}
}
// EOF is normal thread termination
if (ret == AVERROR_EOF)
ret = 0;
// on success send EOF timestamp to our downstreams
if (ret >= 0) {
float err_rate;
av_frame_unref(dt.frame);
dt.frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_EOF;
dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
d->last_frame_pts + d->last_frame_duration_est;
dt.frame->time_base = d->last_frame_tb;
ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
if (ret < 0 && ret != AVERROR_EOF) {
av_log(NULL, AV_LOG_FATAL,
"Error signalling EOF timestamp: %s\n", av_err2str(ret));
goto finish;
}
ret = 0;
err_rate = (ist->frames_decoded || ist->decode_errors) ?
ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
if (err_rate > max_error_rate) {
av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
err_rate, max_error_rate);
ret = FFMPEG_ERROR_RATE_EXCEEDED;
} else if (err_rate)
av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
}
finish:
tq_receive_finish(d->queue_in, 0);
tq_send_finish (d->queue_out, 0);
// make sure the demuxer does not get stuck waiting for audio durations
// that will never arrive
if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, AVERROR_EOF);
dec_thread_uninit(&dt);
av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
return (void*)(intptr_t)ret;
}
int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
{
Decoder *d = ist->decoder;
int ret = 0, thread_ret;
// thread already joined
if (!d->queue_in)
return AVERROR_EOF;
// send the packet/flush request/EOF to the decoder thread
if (pkt || no_eof) {
av_packet_unref(d->pkt);
if (pkt) {
ret = av_packet_ref(d->pkt, pkt);
if (ret < 0)
goto finish;
}
ret = tq_send(d->queue_in, 0, d->pkt);
if (ret < 0)
goto finish;
} else
tq_send_finish(d->queue_in, 0);
// retrieve all decoded data for the packet
while (1) {
int dummy;
ret = tq_receive(d->queue_out, &dummy, d->frame);
if (ret < 0)
goto finish;
// packet fully processed
if (!d->frame->buf[0])
return 0;
// process the decoded frame
if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
ret = process_subtitle(ist, d->frame);
} else {
ret = send_frame_to_filters(ist, d->frame);
}
av_frame_unref(d->frame);
if (ret < 0)
goto finish;
}
finish:
thread_ret = dec_thread_stop(d);
if (thread_ret < 0) {
av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
av_err2str(thread_ret));
ret = err_merge(ret, thread_ret);
}
// non-EOF errors here are all fatal
if (ret < 0 && ret != AVERROR_EOF)
return ret;
// signal EOF to our downstreams
ret = send_filter_eof(ist);
if (ret < 0) {
av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
return ret;
}
return AVERROR_EOF;
}
static int dec_thread_start(InputStream *ist)
{
Decoder *d = ist->decoder;
ObjPool *op;
int ret = 0;
op = objpool_alloc_packets();
if (!op)
return AVERROR(ENOMEM);
d->queue_in = tq_alloc(1, 1, op, pkt_move);
if (!d->queue_in) {
objpool_free(&op);
return AVERROR(ENOMEM);
}
op = objpool_alloc_frames();
if (!op)
goto fail;
d->queue_out = tq_alloc(1, 4, op, frame_move);
if (!d->queue_out) {
objpool_free(&op);
goto fail;
}
ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
if (ret) {
ret = AVERROR(ret);
av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
av_err2str(ret));
goto fail;
}
return 0;
fail:
if (ret >= 0)
ret = AVERROR(ENOMEM);
tq_free(&d->queue_in);
tq_free(&d->queue_out);
return ret;
}
static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
{
InputStream *ist = s->opaque;
@ -1118,12 +950,5 @@ int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
if (ret < 0)
return ret;
ret = dec_thread_start(ist);
if (ret < 0) {
av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
av_err2str(ret));
return ret;
}
return 0;
}