]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/toss.go
Operations progress
[nncp.git] / src / toss.go
index 5bc8b7960b79b8d2e904405b1e1aacd7282672d8..6abf278af8ab18cc61e785a80fb0c5565890bcca 100644 (file)
@@ -21,6 +21,7 @@ import (
        "bufio"
        "bytes"
        "encoding/base64"
+       "errors"
        "fmt"
        "io"
        "io/ioutil"
@@ -78,9 +79,7 @@ func (ctx *Ctx) Toss(
                pktName := filepath.Base(job.Fd.Name())
                sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
                if job.PktEnc.Nice > nice {
-                       ctx.LogD("rx", SdsAdd(sds, SDS{
-                               "nice": strconv.Itoa(int(job.PktEnc.Nice)),
-                       }), "too nice")
+                       ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice")
                        continue
                }
                pipeR, pipeW := io.Pipe()
@@ -98,7 +97,7 @@ func (ctx *Ctx) Toss(
                        pipeW.Close()
                        job.Fd.Close()
                        if err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
+                               ctx.LogE("rx", sds, err, "decryption")
                        }
                }(job)
                var pkt Pkt
@@ -106,7 +105,7 @@ func (ctx *Ctx) Toss(
                var pktSize int64
                var pktSizeBlocks int64
                if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
-                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
+                       ctx.LogE("rx", sds, err, "unmarshal")
                        isBad = true
                        goto Closing
                }
@@ -116,7 +115,7 @@ func (ctx *Ctx) Toss(
                        pktSize -= poly1305.TagSize
                }
                pktSize -= pktSizeBlocks * poly1305.TagSize
-               sds["size"] = strconv.FormatInt(pktSize, 10)
+               sds["size"] = pktSize
                ctx.LogD("rx", sds, "taken")
                switch pkt.Type {
                case PktTypeExec:
@@ -137,7 +136,7 @@ func (ctx *Ctx) Toss(
                        sender := ctx.Neigh[*job.PktEnc.Sender]
                        cmdline, exists := sender.Exec[handle]
                        if !exists || len(cmdline) == 0 {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "")
+                               ctx.LogE("rx", sds, errors.New("No handle found"), "")
                                isBad = true
                                goto Closing
                        }
@@ -158,7 +157,7 @@ func (ctx *Ctx) Toss(
                                cmd.Stdin = decompressor
                                output, err := cmd.Output()
                                if err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
+                                       ctx.LogE("rx", sds, err, "handle")
                                        isBad = true
                                        goto Closing
                                }
@@ -187,7 +186,7 @@ func (ctx *Ctx) Toss(
                                        }
                                }
                                if err = os.Remove(job.Fd.Name()); err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       ctx.LogE("rx", sds, err, "remove")
                                        isBad = true
                                }
                        }
@@ -198,46 +197,51 @@ func (ctx *Ctx) Toss(
                        dst := string(pkt.Path[:int(pkt.PathLen)])
                        sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
                        if filepath.IsAbs(dst) {
-                               ctx.LogE("rx", sds, "non-relative destination path")
+                               ctx.LogE("rx", sds, errors.New("non-relative destination path"), "")
                                isBad = true
                                goto Closing
                        }
                        incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
                        if incoming == nil {
-                               ctx.LogE("rx", sds, "incoming is not allowed")
+                               ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "")
                                isBad = true
                                goto Closing
                        }
                        dir := filepath.Join(*incoming, path.Dir(dst))
                        if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
+                               ctx.LogE("rx", sds, err, "mkdir")
                                isBad = true
                                goto Closing
                        }
                        if !dryRun {
                                tmp, err := TempFile(dir, "file")
                                if err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
+                                       ctx.LogE("rx", sds, err, "mktemp")
                                        isBad = true
                                        goto Closing
                                }
                                sds["tmp"] = tmp.Name()
                                ctx.LogD("rx", sds, "created")
                                bufW := bufio.NewWriter(tmp)
-                               if _, err = io.Copy(bufW, pipeR); err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
+                               if _, err = CopyProgressed(
+                                       bufW,
+                                       pipeR,
+                                       SdsAdd(sds, SDS{"fullsize": sds["size"]}),
+                                       ctx.ShowPrgrs,
+                               ); err != nil {
+                                       ctx.LogE("rx", sds, err, "copy")
                                        isBad = true
                                        goto Closing
                                }
                                if err = bufW.Flush(); err != nil {
                                        tmp.Close()
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
+                                       ctx.LogE("rx", sds, err, "copy")
                                        isBad = true
                                        goto Closing
                                }
                                if err = tmp.Sync(); err != nil {
                                        tmp.Close()
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
+                                       ctx.LogE("rx", sds, err, "copy")
                                        isBad = true
                                        goto Closing
                                }
@@ -250,7 +254,7 @@ func (ctx *Ctx) Toss(
                                                if os.IsNotExist(err) {
                                                        break
                                                }
-                                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
+                                               ctx.LogE("rx", sds, err, "stat")
                                                isBad = true
                                                goto Closing
                                        }
@@ -258,11 +262,11 @@ func (ctx *Ctx) Toss(
                                        dstPathCtr++
                                }
                                if err = os.Rename(tmp.Name(), dstPath); err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
+                                       ctx.LogE("rx", sds, err, "rename")
                                        isBad = true
                                }
                                if err = DirSync(*incoming); err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sync")
+                                       ctx.LogE("rx", sds, err, "sync")
                                        isBad = true
                                }
                                delete(sds, "tmp")
@@ -275,7 +279,7 @@ func (ctx *Ctx) Toss(
                                        }
                                }
                                if err = os.Remove(job.Fd.Name()); err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       ctx.LogE("rx", sds, err, "remove")
                                        isBad = true
                                }
                                if len(sendmail) > 0 && ctx.NotifyFile != nil {
@@ -298,14 +302,14 @@ func (ctx *Ctx) Toss(
                        }
                        src := string(pkt.Path[:int(pkt.PathLen)])
                        if filepath.IsAbs(src) {
-                               ctx.LogE("rx", sds, "non-relative source path")
+                               ctx.LogE("rx", sds, errors.New("non-relative source path"), "")
                                isBad = true
                                goto Closing
                        }
                        sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
                        dstRaw, err := ioutil.ReadAll(pipeR)
                        if err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
+                               ctx.LogE("rx", sds, err, "read")
                                isBad = true
                                goto Closing
                        }
@@ -314,7 +318,7 @@ func (ctx *Ctx) Toss(
                        sender := ctx.Neigh[*job.PktEnc.Sender]
                        freqPath := sender.FreqPath
                        if freqPath == nil {
-                               ctx.LogE("rx", sds, "freqing is not allowed")
+                               ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "")
                                isBad = true
                                goto Closing
                        }
@@ -329,7 +333,7 @@ func (ctx *Ctx) Toss(
                                        sender.FreqMaxSize,
                                )
                                if err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
+                                       ctx.LogE("rx", sds, err, "tx file")
                                        isBad = true
                                        goto Closing
                                }
@@ -342,7 +346,7 @@ func (ctx *Ctx) Toss(
                                        }
                                }
                                if err = os.Remove(job.Fd.Name()); err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       ctx.LogE("rx", sds, err, "remove")
                                        isBad = true
                                }
                                if len(sendmail) > 0 && ctx.NotifyFreq != nil {
@@ -366,14 +370,14 @@ func (ctx *Ctx) Toss(
                        node, known := ctx.Neigh[nodeId]
                        sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
                        if !known {
-                               ctx.LogE("rx", sds, "unknown node")
+                               ctx.LogE("rx", sds, errors.New("unknown node"), "")
                                isBad = true
                                goto Closing
                        }
                        ctx.LogD("rx", sds, "taken")
                        if !dryRun {
                                if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
+                                       ctx.LogE("rx", sds, err, "tx trns")
                                        isBad = true
                                        goto Closing
                                }
@@ -386,12 +390,12 @@ func (ctx *Ctx) Toss(
                                        }
                                }
                                if err = os.Remove(job.Fd.Name()); err != nil {
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       ctx.LogE("rx", sds, err, "remove")
                                        isBad = true
                                }
                        }
                default:
-                       ctx.LogE("rx", sds, "unknown type")
+                       ctx.LogE("rx", sds, errors.New("unknown type"), "")
                        isBad = true
                }
        Closing: