X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftoss.go;h=c0aa7bfed2fb3210c42dcff4d71e8494ffcd3ab3;hb=4e08a1c97600e0372680e86a651f916c70e89342;hp=43b72f5c1cffad94620dd8d0abbaaa0cad42510b;hpb=8936da23acb06dc60b7c1f94845d5525f98665aa;p=nncp.git diff --git a/src/toss.go b/src/toss.go index 43b72f5..c0aa7bf 100644 --- a/src/toss.go +++ b/src/toss.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2019 Sergey Matveev +Copyright (C) 2016-2020 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -20,6 +20,8 @@ package nncp import ( "bufio" "bytes" + "encoding/base64" + "errors" "fmt" "io" "io/ioutil" @@ -43,13 +45,22 @@ const ( SeenSuffix = ".seen" ) -func newNotification(fromTo *FromToJSON, subject string) io.Reader { - return strings.NewReader(fmt.Sprintf( - "From: %s\nTo: %s\nSubject: %s\n", - fromTo.From, - fromTo.To, - mime.BEncoding.Encode("UTF-8", subject), - )) +func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader { + lines := []string{ + "From: " + fromTo.From, + "To: " + fromTo.To, + "Subject: " + mime.BEncoding.Encode("UTF-8", subject), + } + if len(body) > 0 { + lines = append(lines, []string{ + "MIME-Version: 1.0", + "Content-Type: text/plain; charset=utf-8", + "Content-Transfer-Encoding: base64", + "", + base64.StdEncoding.EncodeToString(body), + }...) + } + return strings.NewReader(strings.Join(lines, "\n")) } func (ctx *Ctx) Toss( @@ -57,7 +68,14 @@ func (ctx *Ctx) Toss( nice uint8, dryRun, doSeen, noFile, noFreq, noExec, noTrns bool, ) bool { + dirLock, err := ctx.LockDir(nodeId, "toss") + if err != nil { + ctx.LogE("rx", SDS{}, err, "lock") + return false + } + defer ctx.UnlockDir(dirLock) isBad := false + sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"] decompressor, err := zstd.NewReader(nil) if err != nil { panic(err) @@ -67,14 +85,11 @@ func (ctx *Ctx) Toss( pktName := filepath.Base(job.Fd.Name()) sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName} if job.PktEnc.Nice > nice { - ctx.LogD("rx", SdsAdd(sds, SDS{ - "nice": strconv.Itoa(int(job.PktEnc.Nice)), - }), "too nice") + ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice") continue } pipeR, pipeW := io.Pipe() - errs := make(chan error, 1) - go func(job Job) { + go func(job Job) error { pipeWB := bufio.NewWriter(pipeW) _, _, err := PktEncRead( ctx.Self, @@ -82,20 +97,23 @@ func (ctx *Ctx) Toss( bufio.NewReader(job.Fd), pipeWB, ) - errs <- err - pipeWB.Flush() - pipeW.Close() - job.Fd.Close() + job.Fd.Close() // #nosec G104 if err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption") + ctx.LogE("rx", sds, err, "decryption") + return pipeW.CloseWithError(err) + } + if err = pipeWB.Flush(); err != nil { + ctx.LogE("rx", sds, err, "decryption flush") + return pipeW.CloseWithError(err) } + return pipeW.Close() }(job) var pkt Pkt var err error var pktSize int64 var pktSizeBlocks int64 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal") + ctx.LogE("rx", sds, err, "unmarshal") isBad = true goto Closing } @@ -105,7 +123,7 @@ func (ctx *Ctx) Toss( pktSize -= poly1305.TagSize } pktSize -= pktSizeBlocks * poly1305.TagSize - sds["size"] = strconv.FormatInt(pktSize, 10) + sds["size"] = pktSize ctx.LogD("rx", sds, "taken") switch pkt.Type { case PktTypeExec: @@ -118,14 +136,15 @@ func (ctx *Ctx) Toss( for _, p := range path[1:] { args = append(args, string(p)) } + argsStr := strings.Join(append([]string{handle}, args...), " ") sds := SdsAdd(sds, SDS{ "type": "exec", - "dst": strings.Join(append([]string{handle}, args...), " "), + "dst": argsStr, }) sender := ctx.Neigh[*job.PktEnc.Sender] cmdline, exists := sender.Exec[handle] if !exists || len(cmdline) == 0 { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "") + ctx.LogE("rx", sds, errors.New("No handle found"), "") isBad = true goto Closing } @@ -144,21 +163,40 @@ func (ctx *Ctx) Toss( "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)), ) cmd.Stdin = decompressor - if err = cmd.Run(); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle") + output, err := cmd.Output() + if err != nil { + ctx.LogE("rx", sds, err, "handle") isBad = true goto Closing } + if len(sendmail) > 0 && ctx.NotifyExec != nil { + notify, exists := ctx.NotifyExec[sender.Name+"."+handle] + if !exists { + notify, exists = ctx.NotifyExec["*."+handle] + } + if exists { + cmd := exec.Command( + sendmail[0], + append(sendmail[1:len(sendmail)], notify.To)..., + ) + cmd.Stdin = newNotification(notify, fmt.Sprintf( + "Exec from %s: %s", sender.Name, argsStr, + ), output) + if err = cmd.Run(); err != nil { + ctx.LogE("rx", sds, err, "notify") + } + } + } } ctx.LogI("rx", sds, "") if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { - fd.Close() + fd.Close() // #nosec G104 } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") + ctx.LogE("rx", sds, err, "remove") isBad = true } } @@ -169,50 +207,58 @@ func (ctx *Ctx) Toss( dst := string(pkt.Path[:int(pkt.PathLen)]) sds := SdsAdd(sds, SDS{"type": "file", "dst": dst}) if filepath.IsAbs(dst) { - ctx.LogE("rx", sds, "non-relative destination path") + ctx.LogE("rx", sds, errors.New("non-relative destination path"), "") isBad = true goto Closing } incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming if incoming == nil { - ctx.LogE("rx", sds, "incoming is not allowed") + ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "") isBad = true goto Closing } dir := filepath.Join(*incoming, path.Dir(dst)) if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir") + ctx.LogE("rx", sds, err, "mkdir") isBad = true goto Closing } if !dryRun { tmp, err := TempFile(dir, "file") if err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp") + ctx.LogE("rx", sds, err, "mktemp") isBad = true goto Closing } sds["tmp"] = tmp.Name() ctx.LogD("rx", sds, "created") bufW := bufio.NewWriter(tmp) - if _, err = io.Copy(bufW, pipeR); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy") + if _, err = CopyProgressed( + bufW, pipeR, "Rx file", + SdsAdd(sds, SDS{"fullsize": sds["size"]}), + ctx.ShowPrgrs, + ); err != nil { + ctx.LogE("rx", sds, err, "copy") isBad = true goto Closing } if err = bufW.Flush(); err != nil { - tmp.Close() - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy") + tmp.Close() // #nosec G104 + ctx.LogE("rx", sds, err, "copy") isBad = true goto Closing } if err = tmp.Sync(); err != nil { - tmp.Close() - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy") + tmp.Close() // #nosec G104 + ctx.LogE("rx", sds, err, "copy") + isBad = true + goto Closing + } + if err = tmp.Close(); err != nil { + ctx.LogE("rx", sds, err, "copy") isBad = true goto Closing } - tmp.Close() dstPathOrig := filepath.Join(*incoming, dst) dstPath := dstPathOrig dstPathCtr := 0 @@ -221,7 +267,7 @@ func (ctx *Ctx) Toss( if os.IsNotExist(err) { break } - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat") + ctx.LogE("rx", sds, err, "stat") isBad = true goto Closing } @@ -229,7 +275,11 @@ func (ctx *Ctx) Toss( dstPathCtr++ } if err = os.Rename(tmp.Name(), dstPath); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename") + ctx.LogE("rx", sds, err, "rename") + isBad = true + } + if err = DirSync(*incoming); err != nil { + ctx.LogE("rx", sds, err, "sync") isBad = true } delete(sds, "tmp") @@ -238,15 +288,14 @@ func (ctx *Ctx) Toss( if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { - fd.Close() + fd.Close() // #nosec G104 } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") + ctx.LogE("rx", sds, err, "remove") isBad = true } - sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"] - if exists && len(sendmail) > 0 && ctx.NotifyFile != nil { + if len(sendmail) > 0 && ctx.NotifyFile != nil { cmd := exec.Command( sendmail[0], append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)..., @@ -256,8 +305,10 @@ func (ctx *Ctx) Toss( ctx.Neigh[*job.PktEnc.Sender].Name, dst, humanize.IBytes(uint64(pktSize)), - )) - cmd.Run() + ), nil) + if err = cmd.Run(); err != nil { + ctx.LogE("rx", sds, err, "notify") + } } } case PktTypeFreq: @@ -266,47 +317,38 @@ func (ctx *Ctx) Toss( } src := string(pkt.Path[:int(pkt.PathLen)]) if filepath.IsAbs(src) { - ctx.LogE("rx", sds, "non-relative source path") + ctx.LogE("rx", sds, errors.New("non-relative source path"), "") isBad = true goto Closing } sds := SdsAdd(sds, SDS{"type": "freq", "src": src}) dstRaw, err := ioutil.ReadAll(pipeR) if err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read") + ctx.LogE("rx", sds, err, "read") isBad = true goto Closing } dst := string(dstRaw) sds["dst"] = dst sender := ctx.Neigh[*job.PktEnc.Sender] - freq := sender.Freq - if freq == nil { - ctx.LogE("rx", sds, "freqing is not allowed") + freqPath := sender.FreqPath + if freqPath == nil { + ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "") isBad = true goto Closing } if !dryRun { - if sender.FreqChunked == 0 { - err = ctx.TxFile( - sender, - pkt.Nice, - filepath.Join(*freq, src), - dst, - sender.FreqMinSize, - ) - } else { - err = ctx.TxFileChunked( - sender, - pkt.Nice, - filepath.Join(*freq, src), - dst, - sender.FreqMinSize, - sender.FreqChunked, - ) - } + err = ctx.TxFile( + sender, + pkt.Nice, + filepath.Join(*freqPath, src), + dst, + sender.FreqChunked, + sender.FreqMinSize, + sender.FreqMaxSize, + ) if err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file") + ctx.LogE("rx", sds, err, "tx file") isBad = true goto Closing } @@ -315,25 +357,24 @@ func (ctx *Ctx) Toss( if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { - fd.Close() + fd.Close() // #nosec G104 } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") + ctx.LogE("rx", sds, err, "remove") isBad = true } - sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"] - if exists && len(sendmail) > 0 && ctx.NotifyFreq != nil { + if len(sendmail) > 0 && ctx.NotifyFreq != nil { cmd := exec.Command( sendmail[0], append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)..., ) cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf( - "Freq from %s: %s", - ctx.Neigh[*job.PktEnc.Sender].Name, - src, - )) - cmd.Run() + "Freq from %s: %s", sender.Name, src, + ), nil) + if err = cmd.Run(); err != nil { + ctx.LogE("rx", sds, err, "notify") + } } } case PktTypeTrns: @@ -346,14 +387,14 @@ func (ctx *Ctx) Toss( node, known := ctx.Neigh[nodeId] sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId}) if !known { - ctx.LogE("rx", sds, "unknown node") + ctx.LogE("rx", sds, errors.New("unknown node"), "") isBad = true goto Closing } ctx.LogD("rx", sds, "taken") if !dryRun { if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns") + ctx.LogE("rx", sds, err, "tx trns") isBad = true goto Closing } @@ -362,20 +403,20 @@ func (ctx *Ctx) Toss( if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { - fd.Close() + fd.Close() // #nosec G104 } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") + ctx.LogE("rx", sds, err, "remove") isBad = true } } default: - ctx.LogE("rx", sds, "unknown type") + ctx.LogE("rx", sds, errors.New("unknown type"), "") isBad = true } Closing: - pipeR.Close() + pipeR.Close() // #nosec G104 } return isBad }