From aa50ae92057db36e033b157bbe393d6c7df9fc05 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 8 Mar 2011 18:07:24 +0000 Subject: added pipe between worker threads diff --git a/nopsyncd/Makefile b/nopsyncd/Makefile index ffba7cb..a540d77 100644 --- a/nopsyncd/Makefile +++ b/nopsyncd/Makefile @@ -25,7 +25,8 @@ endif EXECUTABLE := nopsyncd -C_OBJS := nopsyncd.o +C_OBJS := l_pipe.o \ + nopsyncd.o C_SRCS := $(C_OBJS:%.o=%.c) diff --git a/nopsyncd/l_pipe.c b/nopsyncd/l_pipe.c new file mode 100644 index 0000000..6f5a17a --- /dev/null +++ b/nopsyncd/l_pipe.c @@ -0,0 +1,124 @@ +/* + * rhnop + * + * Copyright (C) 2011 Christian Pointner + * + * This file is part of rhnop. + * + * rhnop is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * rhnop is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with rhnop. If not, see . + */ + +#include +#include + +#include +#include +#include + +#include "l_pipe.h" + +static int pipefds_[2]; + +int pipe_init() +{ + return pipe(pipefds_); +} + +int pipe_close() +{ + close(pipefds_[0]); + close(pipefds_[1]); + return 0; +} + +static int l_pipe_signal(lua_State *L) +{ + char data = 1; + int ret = 0; + + for(;;) { + ret = write(pipefds_[1], &data, 1); + if(ret == 1) break; + + if(!ret) continue; + if(ret == -1 && (errno == EAGAIN || errno == EINTR)) continue; + + break; + } + + lua_pushinteger(L, ret); + return 1; +} + +static int l_pipe_getfd(lua_State *L) +{ + if(!lua_istable(L, -1)) + luaL_error(L, "can't retreive pipe fd"); + + lua_pushliteral(L, "fd"); + lua_gettable(L, -2); + return 1; +} + +static int l_pipe_dirty(lua_State *L) +{ + lua_pushboolean(L, 0); + return 1; +} + +static int l_pipe_getreadfd(lua_State *L) +{ + lua_newtable(L); + lua_pushliteral(L, "fd"); + lua_pushinteger(L, pipefds_[0]); + lua_settable(L, -3); + lua_pushliteral(L, "getfd"); + lua_pushcfunction(L, l_pipe_getfd); + lua_settable(L, -3); + lua_pushliteral(L, "dirty"); + lua_pushcfunction(L, l_pipe_dirty); + lua_settable(L, -3); + return 1; +} + +static int l_pipe_consume(lua_State *L) +{ + char data; + int ret = 0; + + for(;;) { + ret = read(pipefds_[0], &data, 1); + if(ret == 1 || ret == 0) break; + + if(ret == -1 && (errno == EAGAIN || errno == EINTR)) continue; + + break; + } + + lua_pushinteger(L, ret); + return 1; +} + +static const struct luaL_reg pipe_funcs [] = { + { "signal", l_pipe_signal }, + { "getreadfd", l_pipe_getreadfd }, + { "consume", l_pipe_consume }, + { NULL, NULL } +}; + +LUALIB_API int luaopen_pipe(lua_State *L) +{ + luaL_register(L, LUA_PIPELIBNAME, pipe_funcs); + return 1; +} diff --git a/nopsyncd/l_pipe.h b/nopsyncd/l_pipe.h new file mode 100644 index 0000000..b09f33b --- /dev/null +++ b/nopsyncd/l_pipe.h @@ -0,0 +1,33 @@ +/* + * rhnop + * + * Copyright (C) 2011 Christian Pointner + * + * This file is part of rhnop. + * + * rhnop is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * rhnop is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with rhnop. If not, see . + */ + +#ifndef NOPSYNCD_l_pipe_h_INCLUDED +#define NOPSYNCD_l_pipe_h_INCLUDED + +#include + +int pipe_init(); +int pipe_close(); + +#define LUA_PIPELIBNAME "pipe" +LUALIB_API int luaopen_pipe(lua_State *L); + +#endif diff --git a/nopsyncd/nopsyncd.c b/nopsyncd/nopsyncd.c index b8907ee..d478b18 100644 --- a/nopsyncd/nopsyncd.c +++ b/nopsyncd/nopsyncd.c @@ -22,12 +22,15 @@ #include #include #include +#include #include #include #include #include +#include "l_pipe.h" + #define LUA_MAIN_LOOP_FUNC "main_loop" static const luaL_Reg nopsyncd_lualibs[] = { @@ -36,6 +39,7 @@ static const luaL_Reg nopsyncd_lualibs[] = { {LUA_TABLIBNAME, luaopen_table}, {LUA_STRLIBNAME, luaopen_string}, {LUA_MATHLIBNAME, luaopen_math}, + {LUA_MATHLIBNAME, luaopen_pipe}, {NULL, NULL} }; @@ -114,6 +118,8 @@ void* main_loop(void* file) if(!ret) ret = call_main_loop(L, (char*)file); + printf("%s returned with %d\n", (char*)file, ret); + lua_close(L); pthread_exit(NULL); @@ -125,7 +131,13 @@ int main(int argc, char* argv[]) pthread_t qlistener, tcpserver; - int ret = pthread_create(&qlistener, NULL, main_loop, "qlistener.lua"); + int ret = pipe_init(); + if(ret) { + fprintf(stderr, "Error creating pipe: %s\n", strerror(errno)); + return 1; + } + + ret = pthread_create(&qlistener, NULL, main_loop, "qlistener.lua"); if(ret) { fprintf(stderr, "Error creating qlistener thread (code: %d)\n", ret); return 1; @@ -140,6 +152,13 @@ int main(int argc, char* argv[]) pthread_join(qlistener, NULL); pthread_join(tcpserver, NULL); + ret = pipe_close(); + if(ret) { + fprintf(stderr, "Error destroying pipe: %s\n", strerror(errno)); + return 1; + } + + printf("stopping nopsyncd.\n"); return 0; } diff --git a/nopsyncd/qlistener.lua b/nopsyncd/qlistener.lua index c850cc6..7594d68 100755 --- a/nopsyncd/qlistener.lua +++ b/nopsyncd/qlistener.lua @@ -30,7 +30,7 @@ function main_loop() os.exit(1) end - --while true do + while true do local msg, prio = mq.receive(q) if msg == nil then io.stderr:write("recv error: " .. prio .. "\n") @@ -38,5 +38,7 @@ function main_loop() end print("received message '" .. msg .. "' with prio: " .. prio) - --end + + pipe.signal() + end end \ No newline at end of file diff --git a/nopsyncd/tcpserver.lua b/nopsyncd/tcpserver.lua index e7df4e5..ff338ac 100755 --- a/nopsyncd/tcpserver.lua +++ b/nopsyncd/tcpserver.lua @@ -22,5 +22,28 @@ require "socket" function main_loop() - socket.select(nil, nil, 10) + local pipefd = pipe.getreadfd() + + while true do + local readable, _, err = socket.select({ pipefd } , nil) + if(err) then + print("select returned with error: " .. err) + return -1 + else + for _, input in ipairs(readable) do + if(input == pipefd) then + local ret = pipe.consume() + if ret == 1 then + print("pipe was signaled") + else + return ret + end + else + print("select returned unknown file descriptor!?") + end + end + end + end + + return 0 end \ No newline at end of file -- cgit v0.10.2