summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@helsinki.at>2011-04-20 13:40:15 (GMT)
committerChristian Pointner <equinox@helsinki.at>2011-04-20 13:40:15 (GMT)
commit336cc3d0101ebc6d7a0e2cd509d351951f804368 (patch)
tree722d8ce63d5dd3fa75884db808114c1c583aafdb
parent9bb444e208beac4e25e0cdcbff5904f818b8e8fd (diff)
switched to multifdsink
no pthread at log (usign GThread instead) file_list ist now thread safe
-rw-r--r--src/file_list.c34
-rw-r--r--src/file_list.h8
-rw-r--r--src/log.c22
-rw-r--r--src/log.h4
-rw-r--r--src/sig_handler.c2
-rw-r--r--src/writer.c47
6 files changed, 90 insertions, 27 deletions
diff --git a/src/file_list.c b/src/file_list.c
index 823c041..c31ecef 100644
--- a/src/file_list.c
+++ b/src/file_list.c
@@ -35,6 +35,8 @@
#include <fcntl.h>
#include <errno.h>
+#include <glib.h>
+
#include "datatypes.h"
#include "file_list.h"
#include "slist.h"
@@ -43,23 +45,30 @@
static void delete_file(void* element)
{
file_t* deletee = (file_t*)element;
+ log_printf(INFO, "removing/closing file '%s'", deletee->path_);
if(deletee->path_) free(deletee->path_);
if(deletee->fd_ >= 0) close(deletee->fd_);
}
int file_list_init(file_list_t* list)
{
- return slist_init(list, &delete_file);
+ list->mutex_ = g_mutex_new();
+ if(!list->mutex_)
+ return -2;
+
+ return slist_init(&(list->list_), &delete_file);
}
void file_list_clear(file_list_t* list)
{
- slist_clear(list);
+ g_mutex_lock(list->mutex_);
+ slist_clear(&(list->list_));
+ g_mutex_unlock(list->mutex_);
}
file_t* file_list_add(file_list_t* list, struct tm* time, const char* type, const char* format, const char* dir, mode_t mode)
{
- if(!list)
+ if(!list || !(list->mutex_))
return NULL;
file_t* tmp = malloc(sizeof(file_t));
@@ -80,33 +89,40 @@ file_t* file_list_add(file_list_t* list, struct tm* time, const char* type, cons
tmp->fd_ = -1;
tmp->mode_ = mode;
- if(slist_add(list, tmp) == NULL) {
+ g_mutex_lock(list->mutex_);
+ if(slist_add(&(list->list_), tmp) == NULL) {
+ g_mutex_unlock(list->mutex_);
free(tmp->path_);
free(tmp);
return NULL;
}
+ g_mutex_unlock(list->mutex_);
return tmp;
}
int file_list_remove(file_list_t* list, int fd)
{
- if(!list)
+ if(!list || !(list->mutex_))
return -1;
- slist_element_t* tmp = list->first_;
+ g_mutex_lock(list->mutex_);
+ slist_element_t* tmp = list->list_.first_;
while(tmp) {
if(((file_t*)tmp->data_)->fd_ == fd) {
- slist_remove(list, tmp);
+ slist_remove(&(list->list_), tmp->data_);
break;
}
tmp = tmp->next_;
}
+ g_mutex_unlock(list->mutex_);
+
+ return 0;
}
int open_file(file_t* file)
{
- if(file->fd_ > 0) // file already open!
+ if(!file || file->fd_ > 0) // file already open!
return -1;
char* orig_path = file->path_;
@@ -141,5 +157,7 @@ int open_file(file_t* file)
if(orig_path != file->path_)
free(orig_path);
+ log_printf(INFO, "opened file '%s' -> %d", file->path_, file->fd_);
+
return 0;
}
diff --git a/src/file_list.h b/src/file_list.h
index fcef137..322df62 100644
--- a/src/file_list.h
+++ b/src/file_list.h
@@ -31,6 +31,8 @@
#include <sys/types.h>
#include <time.h>
+#include <glib.h>
+
#include "slist.h"
struct file_struct {
@@ -40,7 +42,11 @@ struct file_struct {
};
typedef struct file_struct file_t;
-typedef slist_t file_list_t;
+struct file_list_struct {
+ slist_t list_;
+ GMutex* mutex_;
+};
+typedef struct file_list_struct file_list_t;
int file_list_init(file_list_t* list);
void file_list_clear(file_list_t* list);
diff --git a/src/log.c b/src/log.c
index ed13f74..073f216 100644
--- a/src/log.c
+++ b/src/log.c
@@ -32,7 +32,8 @@
#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
-#include <pthread.h>
+
+#include <glib.h>
#define SYSLOG_NAMES
#include <syslog.h>
@@ -182,14 +183,15 @@ void log_init()
{
stdlog.max_prio_ = 0;
stdlog.targets_.first_ = NULL;
- pthread_mutex_init(&(stdlog.log_mutex_), NULL);
+ g_thread_init(NULL);
+ stdlog.log_mutex_ = g_mutex_new();
}
void log_close()
{
- pthread_mutex_lock(&(stdlog.log_mutex_));
+ g_mutex_lock(stdlog.log_mutex_);
log_targets_clear(&stdlog.targets_);
- pthread_mutex_unlock(&(stdlog.log_mutex_));
+ g_mutex_unlock(stdlog.log_mutex_);
}
void update_max_prio()
@@ -208,10 +210,10 @@ int log_add_target(const char* conf)
if(!conf)
return -1;
- pthread_mutex_lock(&(stdlog.log_mutex_));
+ g_mutex_lock(stdlog.log_mutex_);
int ret = log_targets_add(&stdlog.targets_, conf);
if(!ret) update_max_prio();
- pthread_mutex_unlock(&(stdlog.log_mutex_));
+ g_mutex_unlock(stdlog.log_mutex_);
return ret;
}
@@ -227,9 +229,9 @@ void log_printf(log_prio_t prio, const char* fmt, ...)
vsnprintf(msg, MSG_LENGTH_MAX, fmt, args);
va_end(args);
- pthread_mutex_lock(&(stdlog.log_mutex_));
+ g_mutex_lock(stdlog.log_mutex_);
log_targets_log(&stdlog.targets_, prio, msg);
- pthread_mutex_unlock(&(stdlog.log_mutex_));
+ g_mutex_unlock(stdlog.log_mutex_);
}
void log_print_hex_dump(log_prio_t prio, const uint8_t* buf, uint32_t len)
@@ -256,7 +258,7 @@ void log_print_hex_dump(log_prio_t prio, const uint8_t* buf, uint32_t len)
ptr+=3;
}
}
- pthread_mutex_lock(&(stdlog.log_mutex_));
+ g_mutex_lock(stdlog.log_mutex_);
log_targets_log(&stdlog.targets_, prio, msg);
- pthread_mutex_unlock(&(stdlog.log_mutex_));
+ g_mutex_unlock(stdlog.log_mutex_);
}
diff --git a/src/log.h b/src/log.h
index aaf7a1a..29f9fa1 100644
--- a/src/log.h
+++ b/src/log.h
@@ -28,7 +28,7 @@
#ifndef RHARCHIVE_log_h_INCLUDED
#define RHARCHIVE_log_h_INCLUDED
-#include <pthread.h>
+#include <glib.h>
#define MSG_LENGTH_MAX 1024
@@ -71,7 +71,7 @@ void log_targets_clear(log_targets_t* targets);
struct log_struct {
log_prio_t max_prio_;
log_targets_t targets_;
- pthread_mutex_t log_mutex_;
+ GMutex* log_mutex_;
};
typedef struct log_struct log_t;
diff --git a/src/sig_handler.c b/src/sig_handler.c
index 1c49e57..02150fa 100644
--- a/src/sig_handler.c
+++ b/src/sig_handler.c
@@ -98,7 +98,7 @@ int signal_start(GMainLoop *loop)
{
g_assert(!signal_thread);
- signal_thread = g_thread_create_full(signal_thread_func, loop, 8192, TRUE, TRUE, G_THREAD_PRIORITY_HIGH, NULL);
+ signal_thread = g_thread_create_full(signal_thread_func, loop, 8192, FALSE, TRUE, G_THREAD_PRIORITY_HIGH, NULL);
if(!signal_thread)
return -1;
diff --git a/src/writer.c b/src/writer.c
index 0f96eb9..4d8d664 100644
--- a/src/writer.c
+++ b/src/writer.c
@@ -68,17 +68,40 @@ static int init_time_boundaries(writer_t* writer)
return 0;
}
+static void added_cb(GstElement* sink, gint fd, gpointer data)
+{
+ log_printf(INFO, "fdsink: successfully added client %d", fd);
+}
+
+static void removed_cb(GstElement* sink, gint fd, gpointer data)
+{
+ log_printf(INFO, "fdsink: successfully removed client %d", fd);
+}
+
+static void fdremoved_cb(GstElement* sink, gint fd, gpointer data)
+{
+ writer_t *writer = (writer_t*)data;
+ log_printf(INFO, "fdsink: successfully removed fd %d", fd);
+
+ // call post processing script
+ file_list_remove(&(writer->files_), fd);
+}
+
int writer_init(writer_t* writer, GMainLoop *loop, const char* name_format, mode_t mode, const char* output_dir, int interval, int offset)
{
if(!writer)
return -1;
writer->loop_ = loop;
- writer->sink_ = gst_element_factory_make("fdsink", "writer");
+ writer->sink_ = gst_element_factory_make("multifdsink", "writer");
if(!writer->sink_) {
log_printf(ERROR, "the writer object could not be created. Exiting.");
return -1;
}
+ g_signal_connect(G_OBJECT(writer->sink_), "client-added", G_CALLBACK(added_cb), writer);
+ g_signal_connect(G_OBJECT(writer->sink_), "client-removed", G_CALLBACK(removed_cb), writer);
+ g_signal_connect(G_OBJECT(writer->sink_), "client-fd-removed", G_CALLBACK(fdremoved_cb), writer);
+
writer->clock_ = gst_system_clock_obtain();
if(!writer->clock_) {
log_printf(ERROR, "unable to obtain the system clock");
@@ -97,6 +120,21 @@ int writer_init(writer_t* writer, GMainLoop *loop, const char* name_format, mode
return init_time_boundaries(writer);
}
+static void add_fd(writer_t* writer, int fd)
+{
+ log_printf(INFO, "adding fd %d to fdsink", fd);
+// g_object_set(G_OBJECT(writer->sink_), "fd", fd, NULL);
+
+ g_signal_emit_by_name(G_OBJECT(writer->sink_), "add", fd, NULL);
+}
+
+static void remove_fd(writer_t* writer, int fd)
+{
+ log_printf(INFO, "removing fd %d from fdsink", fd);
+ // nothing yet
+ g_signal_emit_by_name(G_OBJECT(writer->sink_), "remove-flush", fd, NULL);
+}
+
static int check_boundaries(writer_t* writer)
{
struct timespec now;
@@ -113,7 +151,8 @@ static int check_boundaries(writer_t* writer)
int ret = open_file(writer->next_);
if(ret) return ret; // TODO: stop writer on open_file error ???
- g_object_set(G_OBJECT(writer->sink_), "fd", writer->next_->fd_, NULL);
+ add_fd(writer, writer->next_->fd_);
+ remove_fd(writer, writer->current_->fd_);
int old_fd = writer->current_->fd_;
writer->current_ = writer->next_;
@@ -124,8 +163,6 @@ static int check_boundaries(writer_t* writer)
writer->next_ = file_list_add(&(writer->files_), &bd_time, "next", writer->name_format_, writer->output_dir_, writer->mode_);
if(writer->next_ == NULL) return -2;
-
- file_list_remove(&(writer->files_), old_fd);
}
return 0;
@@ -164,7 +201,7 @@ int writer_start(writer_t* writer)
if(ret)
return ret;
- g_object_set(G_OBJECT(writer->sink_), "fd", writer->current_->fd_, NULL);
+ add_fd(writer, writer->current_->fd_);
writer->clock_id_ = gst_clock_new_periodic_id(writer->clock_, 0, writer->interval_);
if(!writer->clock_id_) {