diff options
author | Christian Pointner <equinox@helsinki.at> | 2011-04-20 13:40:15 (GMT) |
---|---|---|
committer | Christian Pointner <equinox@helsinki.at> | 2011-04-20 13:40:15 (GMT) |
commit | 336cc3d0101ebc6d7a0e2cd509d351951f804368 (patch) | |
tree | 722d8ce63d5dd3fa75884db808114c1c583aafdb | |
parent | 9bb444e208beac4e25e0cdcbff5904f818b8e8fd (diff) |
switched to multifdsink
no pthread at log (usign GThread instead)
file_list ist now thread safe
-rw-r--r-- | src/file_list.c | 34 | ||||
-rw-r--r-- | src/file_list.h | 8 | ||||
-rw-r--r-- | src/log.c | 22 | ||||
-rw-r--r-- | src/log.h | 4 | ||||
-rw-r--r-- | src/sig_handler.c | 2 | ||||
-rw-r--r-- | src/writer.c | 47 |
6 files changed, 90 insertions, 27 deletions
diff --git a/src/file_list.c b/src/file_list.c index 823c041..c31ecef 100644 --- a/src/file_list.c +++ b/src/file_list.c @@ -35,6 +35,8 @@ #include <fcntl.h> #include <errno.h> +#include <glib.h> + #include "datatypes.h" #include "file_list.h" #include "slist.h" @@ -43,23 +45,30 @@ static void delete_file(void* element) { file_t* deletee = (file_t*)element; + log_printf(INFO, "removing/closing file '%s'", deletee->path_); if(deletee->path_) free(deletee->path_); if(deletee->fd_ >= 0) close(deletee->fd_); } int file_list_init(file_list_t* list) { - return slist_init(list, &delete_file); + list->mutex_ = g_mutex_new(); + if(!list->mutex_) + return -2; + + return slist_init(&(list->list_), &delete_file); } void file_list_clear(file_list_t* list) { - slist_clear(list); + g_mutex_lock(list->mutex_); + slist_clear(&(list->list_)); + g_mutex_unlock(list->mutex_); } file_t* file_list_add(file_list_t* list, struct tm* time, const char* type, const char* format, const char* dir, mode_t mode) { - if(!list) + if(!list || !(list->mutex_)) return NULL; file_t* tmp = malloc(sizeof(file_t)); @@ -80,33 +89,40 @@ file_t* file_list_add(file_list_t* list, struct tm* time, const char* type, cons tmp->fd_ = -1; tmp->mode_ = mode; - if(slist_add(list, tmp) == NULL) { + g_mutex_lock(list->mutex_); + if(slist_add(&(list->list_), tmp) == NULL) { + g_mutex_unlock(list->mutex_); free(tmp->path_); free(tmp); return NULL; } + g_mutex_unlock(list->mutex_); return tmp; } int file_list_remove(file_list_t* list, int fd) { - if(!list) + if(!list || !(list->mutex_)) return -1; - slist_element_t* tmp = list->first_; + g_mutex_lock(list->mutex_); + slist_element_t* tmp = list->list_.first_; while(tmp) { if(((file_t*)tmp->data_)->fd_ == fd) { - slist_remove(list, tmp); + slist_remove(&(list->list_), tmp->data_); break; } tmp = tmp->next_; } + g_mutex_unlock(list->mutex_); + + return 0; } int open_file(file_t* file) { - if(file->fd_ > 0) // file already open! + if(!file || file->fd_ > 0) // file already open! return -1; char* orig_path = file->path_; @@ -141,5 +157,7 @@ int open_file(file_t* file) if(orig_path != file->path_) free(orig_path); + log_printf(INFO, "opened file '%s' -> %d", file->path_, file->fd_); + return 0; } diff --git a/src/file_list.h b/src/file_list.h index fcef137..322df62 100644 --- a/src/file_list.h +++ b/src/file_list.h @@ -31,6 +31,8 @@ #include <sys/types.h> #include <time.h> +#include <glib.h> + #include "slist.h" struct file_struct { @@ -40,7 +42,11 @@ struct file_struct { }; typedef struct file_struct file_t; -typedef slist_t file_list_t; +struct file_list_struct { + slist_t list_; + GMutex* mutex_; +}; +typedef struct file_list_struct file_list_t; int file_list_init(file_list_t* list); void file_list_clear(file_list_t* list); @@ -32,7 +32,8 @@ #include <stdarg.h> #include <stdlib.h> #include <stdio.h> -#include <pthread.h> + +#include <glib.h> #define SYSLOG_NAMES #include <syslog.h> @@ -182,14 +183,15 @@ void log_init() { stdlog.max_prio_ = 0; stdlog.targets_.first_ = NULL; - pthread_mutex_init(&(stdlog.log_mutex_), NULL); + g_thread_init(NULL); + stdlog.log_mutex_ = g_mutex_new(); } void log_close() { - pthread_mutex_lock(&(stdlog.log_mutex_)); + g_mutex_lock(stdlog.log_mutex_); log_targets_clear(&stdlog.targets_); - pthread_mutex_unlock(&(stdlog.log_mutex_)); + g_mutex_unlock(stdlog.log_mutex_); } void update_max_prio() @@ -208,10 +210,10 @@ int log_add_target(const char* conf) if(!conf) return -1; - pthread_mutex_lock(&(stdlog.log_mutex_)); + g_mutex_lock(stdlog.log_mutex_); int ret = log_targets_add(&stdlog.targets_, conf); if(!ret) update_max_prio(); - pthread_mutex_unlock(&(stdlog.log_mutex_)); + g_mutex_unlock(stdlog.log_mutex_); return ret; } @@ -227,9 +229,9 @@ void log_printf(log_prio_t prio, const char* fmt, ...) vsnprintf(msg, MSG_LENGTH_MAX, fmt, args); va_end(args); - pthread_mutex_lock(&(stdlog.log_mutex_)); + g_mutex_lock(stdlog.log_mutex_); log_targets_log(&stdlog.targets_, prio, msg); - pthread_mutex_unlock(&(stdlog.log_mutex_)); + g_mutex_unlock(stdlog.log_mutex_); } void log_print_hex_dump(log_prio_t prio, const uint8_t* buf, uint32_t len) @@ -256,7 +258,7 @@ void log_print_hex_dump(log_prio_t prio, const uint8_t* buf, uint32_t len) ptr+=3; } } - pthread_mutex_lock(&(stdlog.log_mutex_)); + g_mutex_lock(stdlog.log_mutex_); log_targets_log(&stdlog.targets_, prio, msg); - pthread_mutex_unlock(&(stdlog.log_mutex_)); + g_mutex_unlock(stdlog.log_mutex_); } @@ -28,7 +28,7 @@ #ifndef RHARCHIVE_log_h_INCLUDED #define RHARCHIVE_log_h_INCLUDED -#include <pthread.h> +#include <glib.h> #define MSG_LENGTH_MAX 1024 @@ -71,7 +71,7 @@ void log_targets_clear(log_targets_t* targets); struct log_struct { log_prio_t max_prio_; log_targets_t targets_; - pthread_mutex_t log_mutex_; + GMutex* log_mutex_; }; typedef struct log_struct log_t; diff --git a/src/sig_handler.c b/src/sig_handler.c index 1c49e57..02150fa 100644 --- a/src/sig_handler.c +++ b/src/sig_handler.c @@ -98,7 +98,7 @@ int signal_start(GMainLoop *loop) { g_assert(!signal_thread); - signal_thread = g_thread_create_full(signal_thread_func, loop, 8192, TRUE, TRUE, G_THREAD_PRIORITY_HIGH, NULL); + signal_thread = g_thread_create_full(signal_thread_func, loop, 8192, FALSE, TRUE, G_THREAD_PRIORITY_HIGH, NULL); if(!signal_thread) return -1; diff --git a/src/writer.c b/src/writer.c index 0f96eb9..4d8d664 100644 --- a/src/writer.c +++ b/src/writer.c @@ -68,17 +68,40 @@ static int init_time_boundaries(writer_t* writer) return 0; } +static void added_cb(GstElement* sink, gint fd, gpointer data) +{ + log_printf(INFO, "fdsink: successfully added client %d", fd); +} + +static void removed_cb(GstElement* sink, gint fd, gpointer data) +{ + log_printf(INFO, "fdsink: successfully removed client %d", fd); +} + +static void fdremoved_cb(GstElement* sink, gint fd, gpointer data) +{ + writer_t *writer = (writer_t*)data; + log_printf(INFO, "fdsink: successfully removed fd %d", fd); + + // call post processing script + file_list_remove(&(writer->files_), fd); +} + int writer_init(writer_t* writer, GMainLoop *loop, const char* name_format, mode_t mode, const char* output_dir, int interval, int offset) { if(!writer) return -1; writer->loop_ = loop; - writer->sink_ = gst_element_factory_make("fdsink", "writer"); + writer->sink_ = gst_element_factory_make("multifdsink", "writer"); if(!writer->sink_) { log_printf(ERROR, "the writer object could not be created. Exiting."); return -1; } + g_signal_connect(G_OBJECT(writer->sink_), "client-added", G_CALLBACK(added_cb), writer); + g_signal_connect(G_OBJECT(writer->sink_), "client-removed", G_CALLBACK(removed_cb), writer); + g_signal_connect(G_OBJECT(writer->sink_), "client-fd-removed", G_CALLBACK(fdremoved_cb), writer); + writer->clock_ = gst_system_clock_obtain(); if(!writer->clock_) { log_printf(ERROR, "unable to obtain the system clock"); @@ -97,6 +120,21 @@ int writer_init(writer_t* writer, GMainLoop *loop, const char* name_format, mode return init_time_boundaries(writer); } +static void add_fd(writer_t* writer, int fd) +{ + log_printf(INFO, "adding fd %d to fdsink", fd); +// g_object_set(G_OBJECT(writer->sink_), "fd", fd, NULL); + + g_signal_emit_by_name(G_OBJECT(writer->sink_), "add", fd, NULL); +} + +static void remove_fd(writer_t* writer, int fd) +{ + log_printf(INFO, "removing fd %d from fdsink", fd); + // nothing yet + g_signal_emit_by_name(G_OBJECT(writer->sink_), "remove-flush", fd, NULL); +} + static int check_boundaries(writer_t* writer) { struct timespec now; @@ -113,7 +151,8 @@ static int check_boundaries(writer_t* writer) int ret = open_file(writer->next_); if(ret) return ret; // TODO: stop writer on open_file error ??? - g_object_set(G_OBJECT(writer->sink_), "fd", writer->next_->fd_, NULL); + add_fd(writer, writer->next_->fd_); + remove_fd(writer, writer->current_->fd_); int old_fd = writer->current_->fd_; writer->current_ = writer->next_; @@ -124,8 +163,6 @@ static int check_boundaries(writer_t* writer) writer->next_ = file_list_add(&(writer->files_), &bd_time, "next", writer->name_format_, writer->output_dir_, writer->mode_); if(writer->next_ == NULL) return -2; - - file_list_remove(&(writer->files_), old_fd); } return 0; @@ -164,7 +201,7 @@ int writer_start(writer_t* writer) if(ret) return ret; - g_object_set(G_OBJECT(writer->sink_), "fd", writer->current_->fd_, NULL); + add_fd(writer, writer->current_->fd_); writer->clock_id_ = gst_clock_new_periodic_id(writer->clock_, 0, writer->interval_); if(!writer->clock_id_) { |