//
//  rhimportd
//
//  The Radio Helsinki Rivendell Import Daemon
//
//
//  Copyright (C) 2015-2016 Christian Pointner <equinox@helsinki.at>
//
//  This file is part of rhimportd.
//
//  rhimportd is free software: you can redistribute it and/or modify
//  it under the terms of the GNU General Public License as published by
//  the Free Software Foundation, either version 3 of the License, or
//  any later version.
//
//  rhimportd is distributed in the hope that it will be useful,
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//  GNU General Public License for more details.
//
//  You should have received a copy of the GNU General Public License
//  along with rhimportd. If not, see <http://www.gnu.org/licenses/>.
//

package main

import (
	"encoding/json"
	"fmt"
	"html"
	"io"
	"io/ioutil"
	"math"
	"net/http"
	"time"

	"code.helsinki.at/rhrd-go/rddb"
	"code.helsinki.at/rhrd-go/rhimport"
	"github.com/gorilla/websocket"
)

type webSocketRequestData struct {
	Command            string `json:"COMMAND"`
	Id                 string `json:"ID"`
	RefId              string `json:"REFERENCE_ID"`
	UserName           string `json:"LOGIN_NAME"`
	Password           string `json:"PASSWORD"`
	ShowId             uint   `json:"SHOW_ID"`
	ClearShowCarts     bool   `json:"CLEAR_SHOW_CARTS"`
	MusicPoolGroup     string `json:"MUSIC_POOL_GROUP"`
	Cart               uint   `json:"CART_NUMBER"`
	ClearCart          bool   `json:"CLEAR_CART"`
	Cut                uint   `json:"CUT_NUMBER"`
	Channels           uint   `json:"CHANNELS"`
	NormalizationLevel int    `json:"NORMALIZATION_LEVEL"`
	AutotrimLevel      int    `json:"AUTOTRIM_LEVEL"`
	UseMetaData        bool   `json:"USE_METADATA"`
	SourceUri          string `json:"SOURCE_URI"`
	Timeout            uint   `json:"TIMEOUT"`
}

func newWebSocketRequestData(conf *rhimport.Config) *webSocketRequestData {
	rd := new(webSocketRequestData)
	rd.Command = ""
	rd.Id = ""
	rd.UserName = ""
	rd.Password = ""
	rd.ShowId = 0
	rd.ClearShowCarts = false
	rd.MusicPoolGroup = ""
	rd.Cart = 0
	rd.ClearCart = false
	rd.Cut = 0
	rd.Channels = conf.ImportParamDefaults.Channels
	rd.NormalizationLevel = conf.ImportParamDefaults.NormalizationLevel
	rd.AutotrimLevel = conf.ImportParamDefaults.AutotrimLevel
	rd.UseMetaData = conf.ImportParamDefaults.UseMetaData
	rd.SourceUri = ""
	rd.Timeout = 0
	return rd
}

type webSocketResponseBaseData struct {
	ResponseCode int    `json:"RESPONSE_CODE"`
	Type         string `json:"TYPE"`
	ErrorString  string `json:"ERROR_STRING"`
	Id           string `json:"ID"`
	RefId        string `json:"REFERENCE_ID"`
}

type webSocketResponseListData struct {
	webSocketResponseBaseData
	Added   map[string]string `json:"SESSIONS_ADDED"`
	Removed map[string]string `json:"SESSIONS_REMOVED"`
}

type webSocketResponseProgressData struct {
	webSocketResponseBaseData
	Step     int     `json:"PROGRESS_STEP"`
	StepName string  `json:"PROGRESS_STEP_NAME"`
	Current  float64 `json:"CURRENT"`
	Total    float64 `json:"TOTAL"`
	Progress float64 `json:"PROGRESS"`
	Title    string  `json:"TITLE"`
	Cart     uint    `json:"CART_NUMBER"`
	Cut      uint    `json:"CUT_NUMBER"`
}

type webSocketResponseDoneData struct {
	webSocketResponseBaseData
	Cart uint `json:"CART_NUMBER"`
	Cut  uint `json:"CUT_NUMBER"`
}

func sendWebSocketResponse(ws *websocket.Conn, rd interface{}) {
	if err := ws.WriteJSON(rd); err != nil {
		rhdl.Println("WebScoket Client", ws.RemoteAddr(), "write error:", err)
	}
}

func sendWebSocketErrorResponse(ws *websocket.Conn, code int, errStr string) {
	rd := &webSocketResponseBaseData{}
	rd.ResponseCode = code
	rd.Type = "error"
	rd.ErrorString = errStr
	sendWebSocketResponse(ws, rd)
}

func sendWebSocketAckResponse(ws *websocket.Conn, code int, id, refid string) {
	rd := &webSocketResponseBaseData{}
	rd.ResponseCode = code
	rd.Type = "ack"
	rd.ErrorString = "OK"
	rd.Id = id
	rd.RefId = refid
	sendWebSocketResponse(ws, rd)
}

func sendWebSocketListResponse(ws *websocket.Conn, added map[string]string, removed map[string]string) {
	rd := &webSocketResponseListData{}
	rd.ResponseCode = http.StatusOK
	rd.Type = "list"
	rd.ErrorString = "OK"
	if added == nil {
		rd.Added = make(map[string]string)
	} else {
		rd.Added = added
	}
	if removed == nil {
		rd.Removed = make(map[string]string)
	} else {
		rd.Removed = removed
	}
	sendWebSocketResponse(ws, rd)
}

func sendWebSocketProgressResponse(ws *websocket.Conn, id, refid string, step int, stepName string, current, total float64, title string, cart, cut uint) {
	progress := current / total
	if math.IsNaN(progress) || math.IsInf(progress, 0) {
		progress = 0.0
	}

	rd := &webSocketResponseProgressData{}
	rd.ResponseCode = http.StatusOK
	rd.Type = "progress"
	rd.ErrorString = "OK"
	rd.Id = id
	rd.RefId = refid
	rd.Step = step
	rd.StepName = stepName
	rd.Current = current
	rd.Total = total
	rd.Progress = progress * 100
	rd.Title = title
	rd.Cart = cart
	rd.Cut = cut
	sendWebSocketResponse(ws, rd)
}

func sendWebSocketDoneResponse(ws *websocket.Conn, code int, errStr, id, refid string, cart, cut uint) {
	rd := &webSocketResponseDoneData{}
	rd.ResponseCode = code
	rd.Type = "done"
	rd.ErrorString = errStr
	rd.Id = id
	rd.RefId = refid
	rd.Cart = cart
	rd.Cut = cut
	sendWebSocketResponse(ws, rd)
}

type webSocketSession struct {
	id           string
	refId        string
	session      *rhimport.SessionChan
	progresschan chan rhimport.ProgressData
	donechan     chan rhimport.Result
}

func newWebSocketSession() *webSocketSession {
	session := &webSocketSession{}
	session.progresschan = make(chan rhimport.ProgressData, 10)
	session.donechan = make(chan rhimport.Result, 1)
	return session
}

func webSocketProgress(step int, stepName string, current, total float64, title string, cart, cut uint, userdata interface{}) bool {
	if math.IsNaN(current) || math.IsInf(current, 0) {
		current = 0.0
	}
	if math.IsNaN(total) || math.IsInf(total, 0) {
		total = 0.0
	}
	c := userdata.(chan<- rhimport.ProgressData)
	select {
	case c <- rhimport.ProgressData{Step: step, StepName: stepName, Current: current, Total: total, Title: title, Cart: cart, Cut: cut}:
	default:
	}
	return true
}

func webSocketDone(res rhimport.Result, userdata interface{}) bool {
	c := userdata.(chan<- rhimport.Result)
	c <- res
	return true
}

func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) {
	ctx := rhimport.NewContext(conf, nil)
	ctx.UserName = reqdata.UserName
	ctx.Password = reqdata.Password
	ctx.Trusted = false
	ctx.ShowId = reqdata.ShowId
	ctx.ClearShowCarts = reqdata.ClearShowCarts
	ctx.GroupName = reqdata.MusicPoolGroup
	ctx.Cart = reqdata.Cart
	ctx.ClearCart = reqdata.ClearCart
	ctx.Cut = reqdata.Cut
	ctx.Channels = reqdata.Channels
	ctx.NormalizationLevel = reqdata.NormalizationLevel
	ctx.AutotrimLevel = reqdata.AutotrimLevel
	ctx.UseMetaData = reqdata.UseMetaData
	ctx.SourceUri = reqdata.SourceUri

	id, s, code, errstring := sessions.New(ctx, reqdata.RefId)
	if code != http.StatusOK {
		return code, errstring
	}
	self.id = id
	self.refId = reqdata.RefId
	self.session = s
	if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil {
		return http.StatusInternalServerError, err.Error()
	}
	if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil {
		return http.StatusInternalServerError, err.Error()
	}
	s.Run(time.Duration(reqdata.Timeout) * time.Second)
	return http.StatusOK, "SUCCESS"
}

func (self *webSocketSession) reconnectSession(reqdata *webSocketRequestData, sessions *rhimport.SessionStoreChan) (int, string) {
	s, refId, code, errstring := sessions.Get(reqdata.UserName, reqdata.Id)
	if code != http.StatusOK {
		return code, errstring
	}
	self.id = reqdata.Id
	self.refId = refId
	self.session = s
	if err := s.AddDoneHandler((chan<- rhimport.Result)(self.donechan), webSocketDone); err != nil {
		return http.StatusInternalServerError, err.Error()
	}
	if err := s.AddProgressHandler((chan<- rhimport.ProgressData)(self.progresschan), webSocketProgress); err != nil {
		return http.StatusInternalServerError, err.Error()
	}
	s.Run(time.Duration(reqdata.Timeout) * time.Second)
	return http.StatusOK, "SUCCESS"
}

type webSocketSessionListUpdate struct {
	added   map[string]string
	removed map[string]string
}

type webSocketSessionListUpdateCBArg struct {
	closed  <-chan bool
	updates chan<- webSocketSessionListUpdate
}

func webSocketListUpdate(added, removed map[string]string, userdata interface{}) bool {
	arg := userdata.(webSocketSessionListUpdateCBArg)
	if len(arg.closed) > 0 {
		return false
	}
	if added == nil && removed == nil {
		return true
	}
	select {
	case arg.updates <- webSocketSessionListUpdate{added, removed}:
	default:
	}
	return true
}

func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) {
	defer ws.Close()

	closed := make(chan bool, 1)
	defer func() {
		closed <- true
	}()
	listUpdates := make(chan webSocketSessionListUpdate, 5)
	listUpdateCBArg := webSocketSessionListUpdateCBArg{closed, listUpdates}
	alreadyListening := false

	session := newWebSocketSession()
	for {
		select {
		case reqdata, ok := <-reqchan:
			if !ok {
				return
			}
			switch reqdata.Command {
			case "new":
				if session.id != "" {
					sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session")
				} else {
					code, errstring := session.startNewSession(&reqdata, conf, sessions)
					if code != http.StatusOK {
						sendWebSocketErrorResponse(ws, code, errstring)
					} else {
						sendWebSocketAckResponse(ws, code, session.id, session.refId)
					}
				}
			case "cancel":
				if session.id == "" {
					sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection doesn't handle any session")
				} else {
					session.session.Cancel()
				}
			case "reconnect":
				if session.id != "" {
					sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session")
				} else {
					code, errstring := session.reconnectSession(&reqdata, sessions)
					if code != http.StatusOK {
						sendWebSocketErrorResponse(ws, code, errstring)
					} else {
						sendWebSocketAckResponse(ws, code, session.id, session.refId)
					}
				}
			case "list":
				if alreadyListening {
					sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already listens for session-list updates")
				} else {
					sessions, code, errstring := sessions.List(reqdata.UserName, reqdata.Password, false, listUpdateCBArg, webSocketListUpdate)
					if code != http.StatusOK {
						sendWebSocketErrorResponse(ws, code, errstring)
					} else {
						alreadyListening = true
						sendWebSocketListResponse(ws, sessions, nil)
					}
				}
			default:
				sendWebSocketErrorResponse(ws, http.StatusBadRequest, fmt.Sprintf("unknown command '%s'", reqdata.Command))
			}
		case u := <-listUpdates:
			sendWebSocketListResponse(ws, u.added, u.removed)
		case p := <-session.progresschan:
			sendWebSocketProgressResponse(ws, session.id, session.refId, p.Step, p.StepName, p.Current, p.Total, p.Title, p.Cart, p.Cut)
		case d := <-session.donechan:
			sendWebSocketDoneResponse(ws, d.ResponseCode, d.ErrorString, session.id, session.refId, d.Cart, d.Cut)
			rhdl.Println("WebSocket Client", ws.RemoteAddr(), "done sent: sending close message.")
			ws.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Minute))
		}
	}
}

func webSocketHandler(conf *rhimport.Config, db *rddb.DBChan, sessions *rhimport.SessionStoreChan, trusted bool, w http.ResponseWriter, r *http.Request) {
	rhdl.Printf("WebSocketHandler: request for '%s'", html.EscapeString(r.URL.Path))

	ws, err := websocket.Upgrade(w, r, nil, 64*1024, 64*1024)
	if _, ok := err.(websocket.HandshakeError); ok {
		http.Error(w, "Not a websocket handshake", 400)
		return
	} else if err != nil {
		rhdl.Println("WebSocket Client", ws.RemoteAddr(), "error:", err)
		return
	}
	rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected")
	reqchan := make(chan webSocketRequestData)
	go webSocketSessionHandler(reqchan, ws, conf, sessions)
	defer close(reqchan)

	for {
		t, r, err := ws.NextReader()
		if err != nil {
			rhdl.Println("WebSocket Client", ws.RemoteAddr(), "disconnected:", err)
			return
		}

		switch t {
		case websocket.TextMessage:
			reqdata := newWebSocketRequestData(conf)
			if err := json.NewDecoder(r).Decode(&reqdata); err != nil {
				if err == io.EOF {
					err = io.ErrUnexpectedEOF
				}
				rhdl.Println("WebSocket Client", ws.RemoteAddr(), "request error:", err)
				sendWebSocketErrorResponse(ws, http.StatusBadRequest, err.Error())
				return
			}
			// rhdl.Printf("Websocket Client %s got: %+v", ws.RemoteAddr(), reqdata)
			reqchan <- *reqdata
		case websocket.BinaryMessage:
			sendWebSocketErrorResponse(ws, http.StatusBadRequest, "binary messages are not allowed")
			io.Copy(ioutil.Discard, r) // consume all the data
		}
	}
}