// // rhimportd // // The Radio Helsinki Rivendell Import Daemon // // // Copyright (C) 2015 Christian Pointner // // This file is part of rhimportd. // // rhimportd is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // rhimportd is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with rhimportd. If not, see . // package main import ( "flag" "fmt" "helsinki.at/rhimport" "log" "net/http" "os" "os/signal" "sync" "time" // "io/ioutil" ) var ( rhl = log.New(os.Stderr, "[rhimportd]\t", log.LstdFlags) rhdl = log.New(os.Stderr, "[rhimportd-dbg]\t", log.LstdFlags) //rhdl = log.New(ioutil.Discard, "[rhimportd-dbg]\t", log.LstdFlags) ) func session_test_progress1(step int, step_name string, progress float64, userdata interface{}) bool { out := userdata.(chan<- string) select { case out <- fmt.Sprintf("CB1 %d, %s: %3.2f%%", step, step_name, progress*100): default: } if step > 1 { return false } return true } func session_test_progress2(step int, step_name string, progress float64, userdata interface{}) bool { out := userdata.(chan<- string) select { case out <- fmt.Sprintf("CB2 %d, %s: %3.2f%%", step, step_name, progress*100): default: } return true } func session_test_done(res rhimport.ImportResult, userdata interface{}) bool { done := userdata.(chan<- rhimport.ImportResult) done <- res return true } func session_test(conf *rhimport.Config, rddb *rhimport.RdDbChan) { sessions, err := rhimport.NewSessionStore(conf) if err != nil { rhl.Println("Error initializing Session Store:", err) return } defer sessions.Cleanup() store := sessions.GetInterface() ctx := rhimport.NewImportContext(conf, rddb, "heslinki") ctx.Trusted = true ctx.ShowId = 10002 ctx.ClearShowCarts = true ctx.SourceUri = "http://www.tonycuffe.com/mp3/tail%20toddle.mp3" id, _, code, errstring := store.New(ctx) if code != http.StatusOK { rhl.Printf("MAIN: Error SessionStore.New(): %s", errstring) return } var session *rhimport.SessionChan if session, code, errstring = store.Get(ctx.UserName, id); code != http.StatusOK { rhl.Printf("MAIN: Error SessionStore.Get(): %s", errstring) return } dch := make(chan rhimport.ImportResult, 1) if err = session.AddDoneHandler((chan<- rhimport.ImportResult)(dch), session_test_done); err != nil { rhl.Printf("MAIN: Error Session.AddDoneHandler(): %s", err) return } pch := make(chan string, 10) if err = session.AddProgressHandler((chan<- string)(pch), session_test_progress1); err != nil { rhl.Printf("MAIN: Error Session.AddProgressHandler(): %s", err) } go func() { time.Sleep(3 * time.Second) if err = session.AddProgressHandler((chan<- string)(pch), session_test_progress2); err != nil { rhl.Printf("MAIN: Error Session.AddProgressHandler(): %s", err) } }() rhl.Printf("MAIN: calling run for %s/%s", ctx.UserName, id) session.Run(30 * time.Second) for { select { case p := <-pch: fmt.Println(p) case r := <-dch: fmt.Printf("Import finished: %+v\n", r) break } } rhl.Printf("MAIN: calling remove for %s/%s", ctx.UserName, id) if code, errstring = store.Remove(ctx.UserName, id); code != http.StatusOK { rhl.Printf("MAIN: Error SessionStore.Remove(): %s", errstring) } rhl.Printf("MAIN: remove done") } func main() { web_addr_s := flag.String("web-addr", ":4080", "addr:port to listen on") telnet_addr_s := flag.String("telnet-addr", ":4023", "addr:port to listen on") rdconf_s := flag.String("rdconf", "/etc/rd.conf", "path to the Rivendell config file") rdxport_url_s := flag.String("rdxport-url", "http://localhost/rd-bin/rdxport.cgi", "the url to the Rivendell web-api") temp_dir_s := flag.String("tmp-dir", os.TempDir(), "path to temporary files") local_fetch_dir_s := flag.String("local-fetch-dir", os.TempDir(), "path to files that can be imported using local://") help := flag.Bool("help", false, "show usage") flag.Parse() if *help { flag.Usage() return } conf, err := rhimport.NewConfig(rdconf_s, rdxport_url_s, temp_dir_s, local_fetch_dir_s) if err != nil { rhl.Println("Error reading configuration:", err) return } rddb, err := rhimport.NewRdDb(conf) if err != nil { rhl.Println("Error initializing Rivdenll DB:", err) return } defer rddb.Cleanup() go session_test(conf, rddb.GetInterface()) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() rhl.Println("starting web-ctrl") StartControlWeb(*web_addr_s, conf, rddb.GetInterface()) rhl.Println("web-ctrl finished") }() wg.Add(1) go func() { defer wg.Done() rhl.Println("starting telnet-ctrl") StartControlTelnet(*telnet_addr_s, conf, rddb.GetInterface()) rhl.Println("telnet-ctrl finished") }() alldone := make(chan bool) go func() { defer func() { alldone <- true }() wg.Wait() }() c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) select { case <-c: rhl.Println("received interrupt, shutdown") return case <-alldone: return } }