X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftoss.go;h=6abf278af8ab18cc61e785a80fb0c5565890bcca;hb=1d2ce674b042d07fd9b37a46578c8b62bb0345b7;hp=5bc8b7960b79b8d2e904405b1e1aacd7282672d8;hpb=11516a2bbce2b39bda71cf716eaeaf1ffb62314a;p=nncp.git diff --git a/src/toss.go b/src/toss.go index 5bc8b79..6abf278 100644 --- a/src/toss.go +++ b/src/toss.go @@ -21,6 +21,7 @@ import ( "bufio" "bytes" "encoding/base64" + "errors" "fmt" "io" "io/ioutil" @@ -78,9 +79,7 @@ func (ctx *Ctx) Toss( pktName := filepath.Base(job.Fd.Name()) sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName} if job.PktEnc.Nice > nice { - ctx.LogD("rx", SdsAdd(sds, SDS{ - "nice": strconv.Itoa(int(job.PktEnc.Nice)), - }), "too nice") + ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice") continue } pipeR, pipeW := io.Pipe() @@ -98,7 +97,7 @@ func (ctx *Ctx) Toss( pipeW.Close() job.Fd.Close() if err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption") + ctx.LogE("rx", sds, err, "decryption") } }(job) var pkt Pkt @@ -106,7 +105,7 @@ func (ctx *Ctx) Toss( var pktSize int64 var pktSizeBlocks int64 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal") + ctx.LogE("rx", sds, err, "unmarshal") isBad = true goto Closing } @@ -116,7 +115,7 @@ func (ctx *Ctx) Toss( pktSize -= poly1305.TagSize } pktSize -= pktSizeBlocks * poly1305.TagSize - sds["size"] = strconv.FormatInt(pktSize, 10) + sds["size"] = pktSize ctx.LogD("rx", sds, "taken") switch pkt.Type { case PktTypeExec: @@ -137,7 +136,7 @@ func (ctx *Ctx) Toss( sender := ctx.Neigh[*job.PktEnc.Sender] cmdline, exists := sender.Exec[handle] if !exists || len(cmdline) == 0 { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "") + ctx.LogE("rx", sds, errors.New("No handle found"), "") isBad = true goto Closing } @@ -158,7 +157,7 @@ func (ctx *Ctx) Toss( cmd.Stdin = decompressor output, err := cmd.Output() if err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle") + ctx.LogE("rx", sds, err, "handle") isBad = true goto Closing } @@ -187,7 +186,7 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") + ctx.LogE("rx", sds, err, "remove") isBad = true } } @@ -198,46 +197,51 @@ func (ctx *Ctx) Toss( dst := string(pkt.Path[:int(pkt.PathLen)]) sds := SdsAdd(sds, SDS{"type": "file", "dst": dst}) if filepath.IsAbs(dst) { - ctx.LogE("rx", sds, "non-relative destination path") + ctx.LogE("rx", sds, errors.New("non-relative destination path"), "") isBad = true goto Closing } incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming if incoming == nil { - ctx.LogE("rx", sds, "incoming is not allowed") + ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "") isBad = true goto Closing } dir := filepath.Join(*incoming, path.Dir(dst)) if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir") + ctx.LogE("rx", sds, err, "mkdir") isBad = true goto Closing } if !dryRun { tmp, err := TempFile(dir, "file") if err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp") + ctx.LogE("rx", sds, err, "mktemp") isBad = true goto Closing } sds["tmp"] = tmp.Name() ctx.LogD("rx", sds, "created") bufW := bufio.NewWriter(tmp) - if _, err = io.Copy(bufW, pipeR); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy") + if _, err = CopyProgressed( + bufW, + pipeR, + SdsAdd(sds, SDS{"fullsize": sds["size"]}), + ctx.ShowPrgrs, + ); err != nil { + ctx.LogE("rx", sds, err, "copy") isBad = true goto Closing } if err = bufW.Flush(); err != nil { tmp.Close() - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy") + ctx.LogE("rx", sds, err, "copy") isBad = true goto Closing } if err = tmp.Sync(); err != nil { tmp.Close() - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy") + ctx.LogE("rx", sds, err, "copy") isBad = true goto Closing } @@ -250,7 +254,7 @@ func (ctx *Ctx) Toss( if os.IsNotExist(err) { break } - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat") + ctx.LogE("rx", sds, err, "stat") isBad = true goto Closing } @@ -258,11 +262,11 @@ func (ctx *Ctx) Toss( dstPathCtr++ } if err = os.Rename(tmp.Name(), dstPath); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename") + ctx.LogE("rx", sds, err, "rename") isBad = true } if err = DirSync(*incoming); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sync") + ctx.LogE("rx", sds, err, "sync") isBad = true } delete(sds, "tmp") @@ -275,7 +279,7 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") + ctx.LogE("rx", sds, err, "remove") isBad = true } if len(sendmail) > 0 && ctx.NotifyFile != nil { @@ -298,14 +302,14 @@ func (ctx *Ctx) Toss( } src := string(pkt.Path[:int(pkt.PathLen)]) if filepath.IsAbs(src) { - ctx.LogE("rx", sds, "non-relative source path") + ctx.LogE("rx", sds, errors.New("non-relative source path"), "") isBad = true goto Closing } sds := SdsAdd(sds, SDS{"type": "freq", "src": src}) dstRaw, err := ioutil.ReadAll(pipeR) if err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read") + ctx.LogE("rx", sds, err, "read") isBad = true goto Closing } @@ -314,7 +318,7 @@ func (ctx *Ctx) Toss( sender := ctx.Neigh[*job.PktEnc.Sender] freqPath := sender.FreqPath if freqPath == nil { - ctx.LogE("rx", sds, "freqing is not allowed") + ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "") isBad = true goto Closing } @@ -329,7 +333,7 @@ func (ctx *Ctx) Toss( sender.FreqMaxSize, ) if err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file") + ctx.LogE("rx", sds, err, "tx file") isBad = true goto Closing } @@ -342,7 +346,7 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") + ctx.LogE("rx", sds, err, "remove") isBad = true } if len(sendmail) > 0 && ctx.NotifyFreq != nil { @@ -366,14 +370,14 @@ func (ctx *Ctx) Toss( node, known := ctx.Neigh[nodeId] sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId}) if !known { - ctx.LogE("rx", sds, "unknown node") + ctx.LogE("rx", sds, errors.New("unknown node"), "") isBad = true goto Closing } ctx.LogD("rx", sds, "taken") if !dryRun { if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns") + ctx.LogE("rx", sds, err, "tx trns") isBad = true goto Closing } @@ -386,12 +390,12 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") + ctx.LogE("rx", sds, err, "remove") isBad = true } } default: - ctx.LogE("rx", sds, "unknown type") + ctx.LogE("rx", sds, errors.New("unknown type"), "") isBad = true } Closing: