summaryrefslogtreecommitdiff
path: root/src/rhimportd
diff options
context:
space:
mode:
Diffstat (limited to 'src/rhimportd')
-rw-r--r--src/rhimportd/ctrlWebSocket.go18
-rw-r--r--src/rhimportd/uploadWeb.go72
2 files changed, 62 insertions, 28 deletions
diff --git a/src/rhimportd/ctrlWebSocket.go b/src/rhimportd/ctrlWebSocket.go
index e764085..1ce0f80 100644
--- a/src/rhimportd/ctrlWebSocket.go
+++ b/src/rhimportd/ctrlWebSocket.go
@@ -25,17 +25,18 @@
package main
import (
- "code.helsinki.at/rhrd-go/rddb"
- "code.helsinki.at/rhrd-go/rhimport"
"encoding/json"
"fmt"
- "github.com/gorilla/websocket"
"html"
"io"
"io/ioutil"
"math"
"net/http"
"time"
+
+ "code.helsinki.at/rhrd-go/rddb"
+ "code.helsinki.at/rhrd-go/rhimport"
+ "github.com/gorilla/websocket"
)
type webSocketRequestData struct {
@@ -224,7 +225,7 @@ func webSocketDone(res rhimport.Result, userdata interface{}) bool {
return true
}
-func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, binchan <-chan []byte, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) {
+func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) {
ctx := rhimport.NewContext(conf, nil)
ctx.UserName = reqdata.UserName
ctx.Password = reqdata.Password
@@ -240,7 +241,6 @@ func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, bin
ctx.AutotrimLevel = reqdata.AutotrimLevel
ctx.UseMetaData = reqdata.UseMetaData
ctx.SourceUri = reqdata.SourceUri
- ctx.AttachmentChan = binchan
id, s, code, errstring := sessions.New(ctx, reqdata.RefId)
if code != http.StatusOK {
@@ -302,7 +302,7 @@ func webSocketListUpdate(added, removed map[string]string, userdata interface{})
return true
}
-func webSocketSessionHandler(reqchan <-chan webSocketRequestData, binchan <-chan []byte, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) {
+func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) {
defer ws.Close()
closed := make(chan bool, 1)
@@ -325,7 +325,7 @@ func webSocketSessionHandler(reqchan <-chan webSocketRequestData, binchan <-chan
if session.id != "" {
sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session")
} else {
- code, errstring := session.startNewSession(&reqdata, binchan, conf, sessions)
+ code, errstring := session.startNewSession(&reqdata, conf, sessions)
if code != http.StatusOK {
sendWebSocketErrorResponse(ws, code, errstring)
} else {
@@ -389,10 +389,8 @@ func webSocketHandler(conf *rhimport.Config, db *rddb.DBChan, sessions *rhimport
}
rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected")
reqchan := make(chan webSocketRequestData)
- binchan := make(chan []byte)
- go webSocketSessionHandler(reqchan, binchan, ws, conf, sessions)
+ go webSocketSessionHandler(reqchan, ws, conf, sessions)
defer close(reqchan)
- defer close(binchan)
for {
t, r, err := ws.NextReader()
diff --git a/src/rhimportd/uploadWeb.go b/src/rhimportd/uploadWeb.go
index 412df12..224e91d 100644
--- a/src/rhimportd/uploadWeb.go
+++ b/src/rhimportd/uploadWeb.go
@@ -27,9 +27,7 @@ package main
import (
"bytes"
"encoding/json"
- "fmt"
"io"
- "io/ioutil"
"mime/multipart"
"net/http"
@@ -62,7 +60,7 @@ const (
webUploadMaxRequestSize = (2 << 30) - 1 // 2GB, (2 << 30) overflows int on 32-bit systems therefore we use 2GB - 1 Byte
)
-func webUploadParseForm(w http.ResponseWriter, r *http.Request) (username, sessionid, srcfile string, src *multipart.Part, ok bool) {
+func webUploadParseForm(w http.ResponseWriter, r *http.Request) (username, sessionid, srcfile string, src *multipart.Part) {
mpr, err := r.MultipartReader()
if err != nil {
rhl.Printf("WebUploadHandler: error while parsing multipart-form: %v", err)
@@ -72,6 +70,9 @@ func webUploadParseForm(w http.ResponseWriter, r *http.Request) (username, sessi
for {
p, err := mpr.NextPart()
+ if err == io.EOF {
+ return
+ }
if err != nil {
rhl.Printf("WebUploadHandler: error while parsing multipart-form: %v", err)
webUploadErrorResponse(w, http.StatusBadRequest, err.Error())
@@ -86,12 +87,10 @@ func webUploadParseForm(w http.ResponseWriter, r *http.Request) (username, sessi
case "FILENAME":
srcfile = p.FileName()
src = p
- ok = true
- return
+ return // don't read any fileds beyond this point because we would need to load the whole file in order to continue parsing
default:
- rhl.Printf("WebUploadHandler: unknown form field: '%s'", p.FormName())
- webUploadErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("unknown field %s", p.FormName()))
- return
+ rhdl.Printf("WebUploadHandler: ingoring unknown form field: '%s'", p.FormName())
+ continue
}
var buf bytes.Buffer
@@ -125,10 +124,7 @@ func webUploadHandler(conf *rhimport.Config, db *rddb.DBChan, sessions *rhimport
}
r.Body = http.MaxBytesReader(w, r.Body, webUploadMaxRequestSize)
- username, sessionid, srcfile, src, ok := webUploadParseForm(w, r)
- if !ok {
- return
- }
+ username, sessionid, srcfile, src := webUploadParseForm(w, r)
if username == "" {
webUploadErrorResponse(w, http.StatusBadRequest, "missing field LOGIN_NAME")
return
@@ -137,13 +133,53 @@ func webUploadHandler(conf *rhimport.Config, db *rddb.DBChan, sessions *rhimport
webUploadErrorResponse(w, http.StatusBadRequest, "missing field SESSION_ID")
return
}
+ if srcfile == "" || src == nil {
+ webUploadErrorResponse(w, http.StatusBadRequest, "missing field FILENAME")
+ return
+ }
+
+ s, _, code, _ := sessions.Get(username, sessionid)
+ if code != http.StatusOK {
+ webUploadErrorResponse(w, http.StatusUnauthorized, "session not found")
+ return
+ }
+
+ cancel, attachmentChan := s.AttachUploader()
+ if attachmentChan == nil || cancel == nil {
+ webUploadErrorResponse(w, http.StatusForbidden, "upload already in progress")
+ return
+ }
+ defer close(attachmentChan)
- // TODO: get session->attachmentChan from store -> 401 if not found
- // TODO: take session -> 403 if already taken
+ rhl.Printf("WebUploadHandler: starting upload for file '%s'", srcfile)
+ for {
+ chunk := rhimport.AttachmentChunk{}
+
+ var data [128 * 1024]byte
+ n, err := src.Read(data[:])
+ if n > 0 {
+ chunk.Data = data[:n]
+ if err == io.EOF {
+ err = nil
+ }
+ }
+ chunk.Error = err
+ if err == io.EOF {
+ webUploadSuccessResponse(w)
+ return
+ }
- // TODO: fetch file (src) in chunks and send it to attachmentChan && check for canceled
- rhl.Printf("WebUploadHandler: fetching file from '%s' (%v)", srcfile, src)
- io.Copy(ioutil.Discard, src)
+ select {
+ case <-cancel:
+ rhl.Printf("WebUploadHandler: upload for file '%s' got canceld", srcfile)
+ webUploadErrorResponse(w, http.StatusNoContent, "canceled")
+ return
+ case attachmentChan <- chunk:
+ }
- webUploadSuccessResponse(w)
+ if err != nil {
+ webUploadErrorResponse(w, http.StatusInternalServerError, err.Error())
+ return
+ }
+ }
}