From a4588ab0c505b79caf8858cac06a9b8fb58ac81c Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Fri, 22 Jul 2016 18:42:07 +0200 Subject: new uploadWeb mostly done diff --git a/src/rhimportd/ctrlWebSocket.go b/src/rhimportd/ctrlWebSocket.go index e764085..1ce0f80 100644 --- a/src/rhimportd/ctrlWebSocket.go +++ b/src/rhimportd/ctrlWebSocket.go @@ -25,17 +25,18 @@ package main import ( - "code.helsinki.at/rhrd-go/rddb" - "code.helsinki.at/rhrd-go/rhimport" "encoding/json" "fmt" - "github.com/gorilla/websocket" "html" "io" "io/ioutil" "math" "net/http" "time" + + "code.helsinki.at/rhrd-go/rddb" + "code.helsinki.at/rhrd-go/rhimport" + "github.com/gorilla/websocket" ) type webSocketRequestData struct { @@ -224,7 +225,7 @@ func webSocketDone(res rhimport.Result, userdata interface{}) bool { return true } -func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, binchan <-chan []byte, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) { +func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) (int, string) { ctx := rhimport.NewContext(conf, nil) ctx.UserName = reqdata.UserName ctx.Password = reqdata.Password @@ -240,7 +241,6 @@ func (self *webSocketSession) startNewSession(reqdata *webSocketRequestData, bin ctx.AutotrimLevel = reqdata.AutotrimLevel ctx.UseMetaData = reqdata.UseMetaData ctx.SourceUri = reqdata.SourceUri - ctx.AttachmentChan = binchan id, s, code, errstring := sessions.New(ctx, reqdata.RefId) if code != http.StatusOK { @@ -302,7 +302,7 @@ func webSocketListUpdate(added, removed map[string]string, userdata interface{}) return true } -func webSocketSessionHandler(reqchan <-chan webSocketRequestData, binchan <-chan []byte, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) { +func webSocketSessionHandler(reqchan <-chan webSocketRequestData, ws *websocket.Conn, conf *rhimport.Config, sessions *rhimport.SessionStoreChan) { defer ws.Close() closed := make(chan bool, 1) @@ -325,7 +325,7 @@ func webSocketSessionHandler(reqchan <-chan webSocketRequestData, binchan <-chan if session.id != "" { sendWebSocketErrorResponse(ws, http.StatusBadRequest, "This connection already handles a session") } else { - code, errstring := session.startNewSession(&reqdata, binchan, conf, sessions) + code, errstring := session.startNewSession(&reqdata, conf, sessions) if code != http.StatusOK { sendWebSocketErrorResponse(ws, code, errstring) } else { @@ -389,10 +389,8 @@ func webSocketHandler(conf *rhimport.Config, db *rddb.DBChan, sessions *rhimport } rhdl.Println("WebSocket Client", ws.RemoteAddr(), "connected") reqchan := make(chan webSocketRequestData) - binchan := make(chan []byte) - go webSocketSessionHandler(reqchan, binchan, ws, conf, sessions) + go webSocketSessionHandler(reqchan, ws, conf, sessions) defer close(reqchan) - defer close(binchan) for { t, r, err := ws.NextReader() diff --git a/src/rhimportd/uploadWeb.go b/src/rhimportd/uploadWeb.go index 412df12..224e91d 100644 --- a/src/rhimportd/uploadWeb.go +++ b/src/rhimportd/uploadWeb.go @@ -27,9 +27,7 @@ package main import ( "bytes" "encoding/json" - "fmt" "io" - "io/ioutil" "mime/multipart" "net/http" @@ -62,7 +60,7 @@ const ( webUploadMaxRequestSize = (2 << 30) - 1 // 2GB, (2 << 30) overflows int on 32-bit systems therefore we use 2GB - 1 Byte ) -func webUploadParseForm(w http.ResponseWriter, r *http.Request) (username, sessionid, srcfile string, src *multipart.Part, ok bool) { +func webUploadParseForm(w http.ResponseWriter, r *http.Request) (username, sessionid, srcfile string, src *multipart.Part) { mpr, err := r.MultipartReader() if err != nil { rhl.Printf("WebUploadHandler: error while parsing multipart-form: %v", err) @@ -72,6 +70,9 @@ func webUploadParseForm(w http.ResponseWriter, r *http.Request) (username, sessi for { p, err := mpr.NextPart() + if err == io.EOF { + return + } if err != nil { rhl.Printf("WebUploadHandler: error while parsing multipart-form: %v", err) webUploadErrorResponse(w, http.StatusBadRequest, err.Error()) @@ -86,12 +87,10 @@ func webUploadParseForm(w http.ResponseWriter, r *http.Request) (username, sessi case "FILENAME": srcfile = p.FileName() src = p - ok = true - return + return // don't read any fileds beyond this point because we would need to load the whole file in order to continue parsing default: - rhl.Printf("WebUploadHandler: unknown form field: '%s'", p.FormName()) - webUploadErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("unknown field %s", p.FormName())) - return + rhdl.Printf("WebUploadHandler: ingoring unknown form field: '%s'", p.FormName()) + continue } var buf bytes.Buffer @@ -125,10 +124,7 @@ func webUploadHandler(conf *rhimport.Config, db *rddb.DBChan, sessions *rhimport } r.Body = http.MaxBytesReader(w, r.Body, webUploadMaxRequestSize) - username, sessionid, srcfile, src, ok := webUploadParseForm(w, r) - if !ok { - return - } + username, sessionid, srcfile, src := webUploadParseForm(w, r) if username == "" { webUploadErrorResponse(w, http.StatusBadRequest, "missing field LOGIN_NAME") return @@ -137,13 +133,53 @@ func webUploadHandler(conf *rhimport.Config, db *rddb.DBChan, sessions *rhimport webUploadErrorResponse(w, http.StatusBadRequest, "missing field SESSION_ID") return } + if srcfile == "" || src == nil { + webUploadErrorResponse(w, http.StatusBadRequest, "missing field FILENAME") + return + } + + s, _, code, _ := sessions.Get(username, sessionid) + if code != http.StatusOK { + webUploadErrorResponse(w, http.StatusUnauthorized, "session not found") + return + } + + cancel, attachmentChan := s.AttachUploader() + if attachmentChan == nil || cancel == nil { + webUploadErrorResponse(w, http.StatusForbidden, "upload already in progress") + return + } + defer close(attachmentChan) - // TODO: get session->attachmentChan from store -> 401 if not found - // TODO: take session -> 403 if already taken + rhl.Printf("WebUploadHandler: starting upload for file '%s'", srcfile) + for { + chunk := rhimport.AttachmentChunk{} + + var data [128 * 1024]byte + n, err := src.Read(data[:]) + if n > 0 { + chunk.Data = data[:n] + if err == io.EOF { + err = nil + } + } + chunk.Error = err + if err == io.EOF { + webUploadSuccessResponse(w) + return + } - // TODO: fetch file (src) in chunks and send it to attachmentChan && check for canceled - rhl.Printf("WebUploadHandler: fetching file from '%s' (%v)", srcfile, src) - io.Copy(ioutil.Discard, src) + select { + case <-cancel: + rhl.Printf("WebUploadHandler: upload for file '%s' got canceld", srcfile) + webUploadErrorResponse(w, http.StatusNoContent, "canceled") + return + case attachmentChan <- chunk: + } - webUploadSuccessResponse(w) + if err != nil { + webUploadErrorResponse(w, http.StatusInternalServerError, err.Error()) + return + } + } } diff --git a/web-static/upload-form.html b/web-static/upload-form.html index c38037e..5353f95 100644 --- a/web-static/upload-form.html +++ b/web-static/upload-form.html @@ -19,7 +19,7 @@
- +
Username
Session
Session
File
 
-- cgit v0.10.2