From ede1e465039c0cb0b6517a438e12477ab0630722 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Wed, 21 Jan 2026 02:24:30 +0100 Subject: [PATCH] Fix possible replay race condition, move mutex out of replay --- include/encoder/encoder.h | 6 ++++- include/replay_buffer/replay_buffer.h | 7 ------ src/encoder/encoder.c | 29 +++++++++++++++++----- src/main.cpp | 29 ++++++++++++++-------- src/replay_buffer/replay_buffer.c | 34 -------------------------- src/replay_buffer/replay_buffer_disk.c | 14 ----------- src/replay_buffer/replay_buffer_ram.c | 21 +--------------- 7 files changed, 48 insertions(+), 92 deletions(-) diff --git a/include/encoder/encoder.h b/include/encoder/encoder.h index 7e550f6..22f8e9d 100644 --- a/include/encoder/encoder.h +++ b/include/encoder/encoder.h @@ -24,8 +24,12 @@ typedef struct { typedef struct { gsr_replay_buffer *replay_buffer; + pthread_mutex_t file_write_mutex; - bool mutex_created; + bool file_write_mutex_created; + + pthread_mutex_t replay_mutex; + bool replay_mutex_created; gsr_encoder_recording_destination recording_destinations[GSR_MAX_RECORDING_DESTINATIONS]; size_t num_recording_destinations; diff --git a/include/replay_buffer/replay_buffer.h b/include/replay_buffer/replay_buffer.h index a04a3be..6e91103 100644 --- a/include/replay_buffer/replay_buffer.h +++ b/include/replay_buffer/replay_buffer.h @@ -2,7 +2,6 @@ #define GSR_REPLAY_BUFFER_H #include "../defs.h" -#include #include #include @@ -27,17 +26,11 @@ struct gsr_replay_buffer { /* Returns {-1, 0} if not found */ gsr_replay_buffer_iterator (*find_keyframe)(gsr_replay_buffer *self, gsr_replay_buffer_iterator start_iterator, int stream_index, bool invert_stream_index); bool (*iterator_next)(gsr_replay_buffer *self, gsr_replay_buffer_iterator *iterator); - - pthread_mutex_t mutex; - bool mutex_initialized; - gsr_replay_buffer *original_replay_buffer; }; gsr_replay_buffer* gsr_replay_buffer_create(gsr_replay_storage replay_storage, const char *replay_directory, double replay_buffer_time, size_t replay_buffer_num_packets); void gsr_replay_buffer_destroy(gsr_replay_buffer *self); -void gsr_replay_buffer_lock(gsr_replay_buffer *self); -void gsr_replay_buffer_unlock(gsr_replay_buffer *self); bool gsr_replay_buffer_append(gsr_replay_buffer *self, const AVPacket *av_packet, double timestamp); void gsr_replay_buffer_clear(gsr_replay_buffer *self); AVPacket* gsr_replay_buffer_iterator_get_packet(gsr_replay_buffer *self, gsr_replay_buffer_iterator iterator); diff --git a/src/encoder/encoder.c b/src/encoder/encoder.c index 0f8eda5..5aa3959 100644 --- a/src/encoder/encoder.c +++ b/src/encoder/encoder.c @@ -14,9 +14,17 @@ bool gsr_encoder_init(gsr_encoder *self, gsr_replay_storage replay_storage, size if(pthread_mutex_init(&self->file_write_mutex, NULL) != 0) { fprintf(stderr, "gsr error: gsr_encoder_init: failed to create mutex\n"); + gsr_encoder_deinit(self); return false; } - self->mutex_created = true; + self->file_write_mutex_created = true; + + if(pthread_mutex_init(&self->replay_mutex, NULL) != 0) { + fprintf(stderr, "gsr error: gsr_encoder_init: failed to create mutex\n"); + gsr_encoder_deinit(self); + return false; + } + self->replay_mutex_created = true; if(replay_buffer_num_packets > 0) { self->replay_buffer = gsr_replay_buffer_create(replay_storage, replay_directory, replay_buffer_time, replay_buffer_num_packets); @@ -31,14 +39,21 @@ bool gsr_encoder_init(gsr_encoder *self, gsr_replay_storage replay_storage, size } void gsr_encoder_deinit(gsr_encoder *self) { - if(self->mutex_created) { - self->mutex_created = false; + if(self->replay_buffer) { + pthread_mutex_lock(&self->replay_mutex); + gsr_replay_buffer_destroy(self->replay_buffer); + self->replay_buffer = NULL; + pthread_mutex_unlock(&self->replay_mutex); + } + + if(self->file_write_mutex_created) { + self->file_write_mutex_created = false; pthread_mutex_destroy(&self->file_write_mutex); } - if(self->replay_buffer) { - gsr_replay_buffer_destroy(self->replay_buffer); - self->replay_buffer = NULL; + if(self->replay_mutex_created) { + self->replay_mutex_created = false; + pthread_mutex_destroy(&self->replay_mutex); } self->num_recording_destinations = 0; @@ -60,9 +75,11 @@ void gsr_encoder_receive_packets(gsr_encoder *self, AVCodecContext *codec_contex av_packet->dts = pts; if(self->replay_buffer) { + pthread_mutex_lock(&self->replay_mutex); const double time_now = clock_get_monotonic_seconds(); if(!gsr_replay_buffer_append(self->replay_buffer, av_packet, time_now)) fprintf(stderr, "gsr error: gsr_encoder_receive_packets: failed to add replay buffer data\n"); + pthread_mutex_unlock(&self->replay_mutex); } pthread_mutex_lock(&self->file_write_mutex); diff --git a/src/main.cpp b/src/main.cpp index 4c867f9..bf1d71e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1293,33 +1293,37 @@ struct AudioPtsOffset { int stream_index = 0; }; -static void save_replay_async(AVCodecContext *video_codec_context, int video_stream_index, const std::vector &audio_tracks, gsr_replay_buffer *replay_buffer, const args_parser &arg_parser, const std::string &file_extension, bool date_folders, bool hdr, std::vector &video_sources, int current_save_replay_seconds) { +static void save_replay_async(AVCodecContext *video_codec_context, int video_stream_index, const std::vector &audio_tracks, gsr_encoder *encoder, const args_parser &arg_parser, const std::string &file_extension, bool date_folders, bool hdr, std::vector &video_sources, int current_save_replay_seconds) { if(save_replay_thread.valid()) return; - const gsr_replay_buffer_iterator search_start_iterator = current_save_replay_seconds == save_replay_seconds_full ? gsr_replay_buffer_iterator{0, 0} : gsr_replay_buffer_find_packet_index_by_time_passed(replay_buffer, current_save_replay_seconds); - const gsr_replay_buffer_iterator video_start_iterator = gsr_replay_buffer_find_keyframe(replay_buffer, search_start_iterator, video_stream_index, false); + pthread_mutex_lock(&encoder->replay_mutex); + const gsr_replay_buffer_iterator search_start_iterator = current_save_replay_seconds == save_replay_seconds_full ? gsr_replay_buffer_iterator{0, 0} : gsr_replay_buffer_find_packet_index_by_time_passed(encoder->replay_buffer, current_save_replay_seconds); + const gsr_replay_buffer_iterator video_start_iterator = gsr_replay_buffer_find_keyframe(encoder->replay_buffer, search_start_iterator, video_stream_index, false); if(video_start_iterator.packet_index == (size_t)-1) { fprintf(stderr, "gsr error: failed to save replay: failed to find a video keyframe. perhaps replay was saved too fast, before anything has been recorded\n"); + pthread_mutex_unlock(&encoder->replay_mutex); return; } - const int64_t video_pts_offset = gsr_replay_buffer_iterator_get_packet(replay_buffer, video_start_iterator)->pts; + const int64_t video_pts_offset = gsr_replay_buffer_iterator_get_packet(encoder->replay_buffer, video_start_iterator)->pts; std::vector audio_pts_offsets; audio_pts_offsets.reserve(audio_tracks.size()); for(const AudioTrack &audio_track : audio_tracks) { - const gsr_replay_buffer_iterator audio_start_iterator = gsr_replay_buffer_find_keyframe(replay_buffer, video_start_iterator, audio_track.stream_index, false); - const int64_t audio_pts_offset = audio_start_iterator.packet_index == (size_t)-1 ? 0 : gsr_replay_buffer_iterator_get_packet(replay_buffer, audio_start_iterator)->pts; + const gsr_replay_buffer_iterator audio_start_iterator = gsr_replay_buffer_find_keyframe(encoder->replay_buffer, video_start_iterator, audio_track.stream_index, false); + const int64_t audio_pts_offset = audio_start_iterator.packet_index == (size_t)-1 ? 0 : gsr_replay_buffer_iterator_get_packet(encoder->replay_buffer, audio_start_iterator)->pts; audio_pts_offsets.push_back(AudioPtsOffset{audio_pts_offset, audio_track.stream_index}); } - gsr_replay_buffer *cloned_replay_buffer = gsr_replay_buffer_clone(replay_buffer); + gsr_replay_buffer *cloned_replay_buffer = gsr_replay_buffer_clone(encoder->replay_buffer); if(!cloned_replay_buffer) { // TODO: Return this error to mark the replay as failed fprintf(stderr, "gsr error: failed to save replay: failed to clone replay buffer\n"); + pthread_mutex_unlock(&encoder->replay_mutex); return; } + pthread_mutex_unlock(&encoder->replay_mutex); std::string output_filepath = create_new_recording_filepath_from_timestamp(arg_parser.filename, "Replay", file_extension, date_folders); RecordingStartResult recording_start_result = start_recording_create_streams(output_filepath.c_str(), arg_parser, video_codec_context, audio_tracks, hdr, video_sources); @@ -1328,7 +1332,7 @@ static void save_replay_async(AVCodecContext *video_codec_context, int video_str save_replay_output_filepath = std::move(output_filepath); - save_replay_thread = std::async(std::launch::async, [video_stream_index, recording_start_result, video_start_iterator, video_pts_offset, audio_pts_offsets{std::move(audio_pts_offsets)}, video_codec_context, cloned_replay_buffer]() mutable { + save_replay_thread = std::async(std::launch::async, [video_stream_index, recording_start_result, video_start_iterator, video_pts_offset, audio_pts_offsets{std::move(audio_pts_offsets)}, video_codec_context, cloned_replay_buffer, encoder]() mutable { gsr_replay_buffer_iterator replay_iterator = video_start_iterator; for(;;) { AVPacket *replay_packet = gsr_replay_buffer_iterator_get_packet(cloned_replay_buffer, replay_iterator); @@ -1397,7 +1401,9 @@ static void save_replay_async(AVCodecContext *video_codec_context, int video_str } stop_recording_close_streams(recording_start_result.av_format_context); + pthread_mutex_lock(&encoder->replay_mutex); gsr_replay_buffer_destroy(cloned_replay_buffer); + pthread_mutex_unlock(&encoder->replay_mutex); }); } @@ -4590,10 +4596,13 @@ int main(int argc, char **argv) { save_replay_seconds = 0; save_replay_output_filepath.clear(); - save_replay_async(video_codec_context, VIDEO_STREAM_INDEX, audio_tracks, encoder.replay_buffer, arg_parser, file_extension, arg_parser.date_folders, hdr, video_sources, current_save_replay_seconds); + save_replay_async(video_codec_context, VIDEO_STREAM_INDEX, audio_tracks, &encoder, arg_parser, file_extension, arg_parser.date_folders, hdr, video_sources, current_save_replay_seconds); - if(arg_parser.restart_replay_on_save && current_save_replay_seconds == save_replay_seconds_full) + if(arg_parser.restart_replay_on_save && current_save_replay_seconds == save_replay_seconds_full) { + pthread_mutex_lock(&encoder.replay_mutex); gsr_replay_buffer_clear(encoder.replay_buffer); + pthread_mutex_unlock(&encoder.replay_mutex); + } } const double time_at_frame_end = clock_get_monotonic_seconds() - paused_time_offset; diff --git a/src/replay_buffer/replay_buffer.c b/src/replay_buffer/replay_buffer.c index 92aa645..56549ee 100644 --- a/src/replay_buffer/replay_buffer.c +++ b/src/replay_buffer/replay_buffer.c @@ -16,48 +16,14 @@ gsr_replay_buffer* gsr_replay_buffer_create(gsr_replay_storage replay_storage, c replay_buffer = gsr_replay_buffer_disk_create(replay_directory, replay_buffer_time); break; } - - replay_buffer->mutex_initialized = false; - replay_buffer->original_replay_buffer = NULL; - if(pthread_mutex_init(&replay_buffer->mutex, NULL) != 0) { - gsr_replay_buffer_destroy(replay_buffer); - return NULL; - } - - replay_buffer->mutex_initialized = true; return replay_buffer; } void gsr_replay_buffer_destroy(gsr_replay_buffer *self) { self->destroy(self); - if(self->mutex_initialized && !self->original_replay_buffer) { - pthread_mutex_destroy(&self->mutex); - self->mutex_initialized = false; - } - self->original_replay_buffer = NULL; free(self); } -void gsr_replay_buffer_lock(gsr_replay_buffer *self) { - if(self->original_replay_buffer) { - gsr_replay_buffer_lock(self->original_replay_buffer); - return; - } - - if(self->mutex_initialized) - pthread_mutex_lock(&self->mutex); -} - -void gsr_replay_buffer_unlock(gsr_replay_buffer *self) { - if(self->original_replay_buffer) { - gsr_replay_buffer_unlock(self->original_replay_buffer); - return; - } - - if(self->mutex_initialized) - pthread_mutex_unlock(&self->mutex); -} - bool gsr_replay_buffer_append(gsr_replay_buffer *self, const AVPacket *av_packet, double timestamp) { return self->append(self, av_packet, timestamp); } diff --git a/src/replay_buffer/replay_buffer_disk.c b/src/replay_buffer/replay_buffer_disk.c index 3fff9f3..ce42d93 100644 --- a/src/replay_buffer/replay_buffer_disk.c +++ b/src/replay_buffer/replay_buffer_disk.c @@ -94,7 +94,6 @@ static void gsr_replay_buffer_file_unref(gsr_replay_buffer_file *self, const cha static void gsr_replay_buffer_disk_clear(gsr_replay_buffer *replay_buffer) { gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; - gsr_replay_buffer_lock(&self->replay_buffer); for(size_t i = 0; i < self->num_files; ++i) { gsr_replay_buffer_file_unref(self->files[i], self->replay_directory); @@ -107,7 +106,6 @@ static void gsr_replay_buffer_disk_clear(gsr_replay_buffer *replay_buffer) { } self->storage_num_bytes_written = 0; - gsr_replay_buffer_unlock(&self->replay_buffer); } static void gsr_replay_buffer_disk_destroy(gsr_replay_buffer *replay_buffer) { @@ -197,7 +195,6 @@ static void gsr_replay_buffer_disk_remove_first_file(gsr_replay_buffer_disk *sel static bool gsr_replay_buffer_disk_append(gsr_replay_buffer *replay_buffer, const AVPacket *av_packet, double timestamp) { gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; bool success = false; - gsr_replay_buffer_lock(&self->replay_buffer); if(self->storage_fd <= 0) { if(!gsr_replay_buffer_disk_create_next_file(self, timestamp)) @@ -215,7 +212,6 @@ static bool gsr_replay_buffer_disk_append(gsr_replay_buffer *replay_buffer, cons success = data_written; done: - gsr_replay_buffer_unlock(&self->replay_buffer); return success; } @@ -265,11 +261,7 @@ static gsr_replay_buffer* gsr_replay_buffer_disk_clone(gsr_replay_buffer *replay return NULL; gsr_replay_buffer_disk_set_impl_funcs(destination); - gsr_replay_buffer_lock(&self->replay_buffer); - destination->replay_buffer.original_replay_buffer = replay_buffer; - destination->replay_buffer.mutex = self->replay_buffer.mutex; - destination->replay_buffer.mutex_initialized = self->replay_buffer.mutex_initialized; destination->replay_buffer_time = self->replay_buffer_time; destination->storage_counter = self->storage_counter; destination->storage_num_bytes_written = self->storage_num_bytes_written; @@ -283,7 +275,6 @@ static gsr_replay_buffer* gsr_replay_buffer_disk_clone(gsr_replay_buffer *replay snprintf(destination->replay_directory, sizeof(destination->replay_directory), "%s", self->replay_directory); destination->owns_directory = false; - gsr_replay_buffer_unlock(&self->replay_buffer); return (gsr_replay_buffer*)destination; } @@ -319,11 +310,9 @@ static size_t gsr_replay_buffer_file_find_packet_index_by_time_passed(const gsr_ /* Binary search */ static gsr_replay_buffer_iterator gsr_replay_buffer_disk_find_file_index_by_time_passed(gsr_replay_buffer *replay_buffer, int seconds) { gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; - gsr_replay_buffer_lock(&self->replay_buffer); const double now = clock_get_monotonic_seconds(); if(self->num_files == 0) { - gsr_replay_buffer_unlock(&self->replay_buffer); return (gsr_replay_buffer_iterator){0, 0}; } @@ -352,14 +341,12 @@ static gsr_replay_buffer_iterator gsr_replay_buffer_disk_find_file_index_by_time const gsr_replay_buffer_file *file = self->files[file_index]; const size_t packet_index = gsr_replay_buffer_file_find_packet_index_by_time_passed(file, seconds); - gsr_replay_buffer_unlock(&self->replay_buffer); return (gsr_replay_buffer_iterator){packet_index, file_index}; } static gsr_replay_buffer_iterator gsr_replay_buffer_disk_find_keyframe(gsr_replay_buffer *replay_buffer, gsr_replay_buffer_iterator start_iterator, int stream_index, bool invert_stream_index) { gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; gsr_replay_buffer_iterator keyframe_iterator = {(size_t)-1, 0}; - gsr_replay_buffer_lock(&self->replay_buffer); size_t packet_index = start_iterator.packet_index; for(size_t file_index = start_iterator.file_index; file_index < self->num_files; ++file_index) { const gsr_replay_buffer_file *file = self->files[file_index]; @@ -374,7 +361,6 @@ static gsr_replay_buffer_iterator gsr_replay_buffer_disk_find_keyframe(gsr_repla packet_index = 0; } done: - gsr_replay_buffer_unlock(&self->replay_buffer); return keyframe_iterator; } diff --git a/src/replay_buffer/replay_buffer_ram.c b/src/replay_buffer/replay_buffer_ram.c index 890588f..c5df328 100644 --- a/src/replay_buffer/replay_buffer_ram.c +++ b/src/replay_buffer/replay_buffer_ram.c @@ -54,7 +54,6 @@ static void gsr_av_packet_ram_unref(gsr_av_packet_ram *self) { static void gsr_replay_buffer_ram_destroy(gsr_replay_buffer *replay_buffer) { gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; - gsr_replay_buffer_lock(&self->replay_buffer); for(size_t i = 0; i < self->num_packets; ++i) { if(self->packets[i]) { gsr_av_packet_ram_unref(self->packets[i]); @@ -62,7 +61,6 @@ static void gsr_replay_buffer_ram_destroy(gsr_replay_buffer *replay_buffer) { } } self->num_packets = 0; - gsr_replay_buffer_unlock(&self->replay_buffer); if(self->packets) { free(self->packets); @@ -75,12 +73,9 @@ static void gsr_replay_buffer_ram_destroy(gsr_replay_buffer *replay_buffer) { static bool gsr_replay_buffer_ram_append(gsr_replay_buffer *replay_buffer, const AVPacket *av_packet, double timestamp) { gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; - gsr_replay_buffer_lock(&self->replay_buffer); gsr_av_packet_ram *packet = gsr_av_packet_ram_create(av_packet, timestamp); - if(!packet) { - gsr_replay_buffer_unlock(&self->replay_buffer); + if(!packet) return false; - } if(self->packets[self->index]) { gsr_av_packet_ram_unref(self->packets[self->index]); @@ -93,13 +88,11 @@ static bool gsr_replay_buffer_ram_append(gsr_replay_buffer *replay_buffer, const if(self->num_packets > self->capacity_num_packets) self->num_packets = self->capacity_num_packets; - gsr_replay_buffer_unlock(&self->replay_buffer); return true; } static void gsr_replay_buffer_ram_clear(gsr_replay_buffer *replay_buffer) { gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; - gsr_replay_buffer_lock(&self->replay_buffer); for(size_t i = 0; i < self->num_packets; ++i) { if(self->packets[i]) { gsr_av_packet_ram_unref(self->packets[i]); @@ -108,7 +101,6 @@ static void gsr_replay_buffer_ram_clear(gsr_replay_buffer *replay_buffer) { } self->num_packets = 0; self->index = 0; - gsr_replay_buffer_unlock(&self->replay_buffer); } static gsr_av_packet_ram* gsr_replay_buffer_ram_get_packet_at_index(gsr_replay_buffer *replay_buffer, size_t index) { @@ -141,17 +133,12 @@ static gsr_replay_buffer* gsr_replay_buffer_ram_clone(gsr_replay_buffer *replay_ return NULL; gsr_replay_buffer_ram_set_impl_funcs(destination); - gsr_replay_buffer_lock(&self->replay_buffer); - destination->replay_buffer.original_replay_buffer = replay_buffer; - destination->replay_buffer.mutex = self->replay_buffer.mutex; - destination->replay_buffer.mutex_initialized = self->replay_buffer.mutex_initialized; destination->capacity_num_packets = self->capacity_num_packets; destination->index = self->index; destination->packets = calloc(destination->capacity_num_packets, sizeof(gsr_av_packet_ram*)); if(!destination->packets) { free(destination); - gsr_replay_buffer_unlock(&self->replay_buffer); return NULL; } @@ -160,18 +147,15 @@ static gsr_replay_buffer* gsr_replay_buffer_ram_clone(gsr_replay_buffer *replay_ destination->packets[i] = gsr_av_packet_ram_ref(self->packets[i]); } - gsr_replay_buffer_unlock(&self->replay_buffer); return (gsr_replay_buffer*)destination; } /* Binary search */ static gsr_replay_buffer_iterator gsr_replay_buffer_ram_find_packet_index_by_time_passed(gsr_replay_buffer *replay_buffer, int seconds) { gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; - gsr_replay_buffer_lock(&self->replay_buffer); const double now = clock_get_monotonic_seconds(); if(self->num_packets == 0) { - gsr_replay_buffer_unlock(&self->replay_buffer); return (gsr_replay_buffer_iterator){0, 0}; } @@ -194,14 +178,12 @@ static gsr_replay_buffer_iterator gsr_replay_buffer_ram_find_packet_index_by_tim } } - gsr_replay_buffer_unlock(&self->replay_buffer); return (gsr_replay_buffer_iterator){index, 0}; } static gsr_replay_buffer_iterator gsr_replay_buffer_ram_find_keyframe(gsr_replay_buffer *replay_buffer, gsr_replay_buffer_iterator start_iterator, int stream_index, bool invert_stream_index) { gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; size_t keyframe_index = (size_t)-1; - gsr_replay_buffer_lock(&self->replay_buffer); for(size_t i = start_iterator.packet_index; i < self->num_packets; ++i) { const gsr_av_packet_ram *packet = gsr_replay_buffer_ram_get_packet_at_index(replay_buffer, i); if((packet->packet.flags & AV_PKT_FLAG_KEY) && (invert_stream_index ? packet->packet.stream_index != stream_index : packet->packet.stream_index == stream_index)) { @@ -209,7 +191,6 @@ static gsr_replay_buffer_iterator gsr_replay_buffer_ram_find_keyframe(gsr_replay break; } } - gsr_replay_buffer_unlock(&self->replay_buffer); return (gsr_replay_buffer_iterator){keyframe_index, 0}; }