X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftoss.go;h=7537d695d08f5edcb874b72a747cf624fea22ad7;hb=0fad171c0d79ad583c0faf5427e22d1d62a0a52d;hp=8e8dc27789b0e741432be27498433faee69827c8;hpb=cda0acc2c336734be25a7cdb3e79000486fc5901;p=nncp.git diff --git a/src/toss.go b/src/toss.go index 8e8dc27..7537d69 100644 --- a/src/toss.go +++ b/src/toss.go @@ -38,7 +38,6 @@ import ( xdr "github.com/davecgh/go-xdr/xdr2" "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" - "golang.org/x/crypto/blake2b" "golang.org/x/crypto/poly1305" ) @@ -53,13 +52,14 @@ func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader "Subject: " + mime.BEncoding.Encode("UTF-8", subject), } if len(body) > 0 { - lines = append(lines, []string{ + lines = append( + lines, "MIME-Version: 1.0", "Content-Type: text/plain; charset=utf-8", "Content-Transfer-Encoding: base64", "", base64.StdEncoding.EncodeToString(body), - }...) + ) } return strings.NewReader(strings.Join(lines, "\n")) } @@ -71,7 +71,6 @@ func (ctx *Ctx) Toss( ) bool { dirLock, err := ctx.LockDir(nodeId, "toss") if err != nil { - ctx.LogE("rx", SDS{}, err, "lock") return false } defer ctx.UnlockDir(dirLock) @@ -83,38 +82,57 @@ func (ctx *Ctx) Toss( } defer decompressor.Close() for job := range ctx.Jobs(nodeId, TRx) { - pktName := filepath.Base(job.Fd.Name()) - sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName} + pktName := filepath.Base(job.Path) + les := LEs{ + {"Node", job.PktEnc.Sender}, + {"Pkt", pktName}, + {"Nice", int(job.PktEnc.Nice)}, + } if job.PktEnc.Nice > nice { - ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice") + ctx.LogD("rx-too-nice", les, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s: too nice: %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + NicenessFmt(job.PktEnc.Nice), + ) + }) + continue + } + fd, err := os.Open(job.Path) + if err != nil { + ctx.LogE("rx-open", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s: opening %s", + ctx.NodeName(job.PktEnc.Sender), pktName, job.Path, + ) + }) + isBad = true continue } + pipeR, pipeW := io.Pipe() go func(job Job) error { pipeWB := bufio.NewWriter(pipeW) - _, _, err := PktEncRead( - ctx.Self, - ctx.Neigh, - bufio.NewReader(job.Fd), - pipeWB, - ) - job.Fd.Close() // #nosec G104 + _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB) + fd.Close() // #nosec G104 if err != nil { - ctx.LogE("rx", sds, err, "decryption") return pipeW.CloseWithError(err) } if err = pipeWB.Flush(); err != nil { - ctx.LogE("rx", sds, err, "decryption flush") return pipeW.CloseWithError(err) } return pipeW.Close() }(job) var pkt Pkt - var err error var pktSize int64 var pktSizeBlocks int64 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil { - ctx.LogE("rx", sds, err, "unmarshal") + ctx.LogE("rx-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s: unmarshal", + ctx.NodeName(job.PktEnc.Sender), pktName, + ) + }) isBad = true goto Closing } @@ -124,8 +142,15 @@ func (ctx *Ctx) Toss( pktSize -= poly1305.TagSize } pktSize -= pktSizeBlocks * poly1305.TagSize - sds["size"] = pktSize - ctx.LogD("rx", sds, "taken") + les = append(les, LE{"Size", pktSize}) + ctx.LogD("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s (%s)", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), + ) + }) + switch pkt.Type { case PktTypeExec, PktTypeExecFat: if noExec { @@ -138,14 +163,20 @@ func (ctx *Ctx) Toss( args = append(args, string(p)) } argsStr := strings.Join(append([]string{handle}, args...), " ") - sds := SdsAdd(sds, SDS{ - "type": "exec", - "dst": argsStr, - }) + les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr}) sender := ctx.Neigh[*job.PktEnc.Sender] cmdline, exists := sender.Exec[handle] if !exists || len(cmdline) == 0 { - ctx.LogE("rx", sds, errors.New("No handle found"), "") + ctx.LogE( + "rx-no-handle", les, errors.New("No handle found"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }, + ) isBad = true goto Closing } @@ -155,10 +186,7 @@ func (ctx *Ctx) Toss( } } if !dryRun { - cmd := exec.Command( - cmdline[0], - append(cmdline[1:len(cmdline)], args...)..., - ) + cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...) cmd.Env = append( cmd.Env, "NNCP_SELF="+ctx.Self.Id.String(), @@ -172,7 +200,13 @@ func (ctx *Ctx) Toss( } output, err := cmd.Output() if err != nil { - ctx.LogE("rx", sds, err, "handle") + ctx.LogE("rx-hande", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s: handling", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }) isBad = true goto Closing } @@ -184,85 +218,166 @@ func (ctx *Ctx) Toss( if exists { cmd := exec.Command( sendmail[0], - append(sendmail[1:len(sendmail)], notify.To)..., + append(sendmail[1:], notify.To)..., ) cmd.Stdin = newNotification(notify, fmt.Sprintf( "Exec from %s: %s", sender.Name, argsStr, ), output) if err = cmd.Run(); err != nil { - ctx.LogE("rx", sds, err, "notify") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }) } } } } - ctx.LogI("rx", sds, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got exec from %s to %s (%s)", + ctx.NodeName(job.PktEnc.Sender), argsStr, + humanize.IBytes(uint64(pktSize)), + ) + }) if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", sds, err, "remove") + if err = os.Remove(job.Path); err != nil { + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }) isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } } + case PktTypeFile: if noFile { goto Closing } dst := string(pkt.Path[:int(pkt.PathLen)]) - sds := SdsAdd(sds, SDS{"type": "file", "dst": dst}) + les = append(les, LE{"Type", "file"}, LE{"Dst", dst}) if filepath.IsAbs(dst) { - ctx.LogE("rx", sds, errors.New("non-relative destination path"), "") + ctx.LogE( + "rx-non-rel", les, errors.New("non-relative destination path"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }, + ) isBad = true goto Closing } incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming if incoming == nil { - ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "") + ctx.LogE( + "rx-no-incoming", les, errors.New("incoming is not allowed"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }, + ) isBad = true goto Closing } dir := filepath.Join(*incoming, path.Dir(dst)) if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil { - ctx.LogE("rx", sds, err, "mkdir") + ctx.LogE("rx-mkdir", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: mkdir", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if !dryRun { tmp, err := TempFile(dir, "file") if err != nil { - ctx.LogE("rx", sds, err, "mktemp") + ctx.LogE("rx-mktemp", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: mktemp", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } - sds["tmp"] = tmp.Name() - ctx.LogD("rx", sds, "created") + les = append(les, LE{"Tmp", tmp.Name()}) + ctx.LogD("rx-tmp-created", les, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: created: %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, tmp.Name(), + ) + }) bufW := bufio.NewWriter(tmp) if _, err = CopyProgressed( bufW, pipeR, "Rx file", - SdsAdd(sds, SDS{"fullsize": sds["size"]}), + append(les, LE{"FullSize", pktSize}), ctx.ShowPrgrs, ); err != nil { - ctx.LogE("rx", sds, err, "copy") + ctx.LogE("rx-copy", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: copying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if err = bufW.Flush(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("rx", sds, err, "copy") + ctx.LogE("rx-flush", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: flushing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if err = tmp.Sync(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("rx", sds, err, "copy") + ctx.LogE("rx-sync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: syncing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if err = tmp.Close(); err != nil { - ctx.LogE("rx", sds, err, "copy") + ctx.LogE("rx-close", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: closing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } @@ -274,7 +389,13 @@ func (ctx *Ctx) Toss( if os.IsNotExist(err) { break } - ctx.LogE("rx", sds, err, "stat") + ctx.LogE("rx-stat", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: stating: %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, dstPath, + ) + }) isBad = true goto Closing } @@ -282,30 +403,56 @@ func (ctx *Ctx) Toss( dstPathCtr++ } if err = os.Rename(tmp.Name(), dstPath); err != nil { - ctx.LogE("rx", sds, err, "rename") + ctx.LogE("rx-rename", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: renaming", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true } if err = DirSync(*incoming); err != nil { - ctx.LogE("rx", sds, err, "sync") + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true } - delete(sds, "tmp") + les = les[:len(les)-1] // delete Tmp } - ctx.LogI("rx", sds, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got file %s (%s) from %s", + dst, humanize.IBytes(uint64(pktSize)), + ctx.NodeName(job.PktEnc.Sender), + ) + }) if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", sds, err, "remove") + if err = os.Remove(job.Path); err != nil { + ctx.LogE("rx-remove", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: removing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } if len(sendmail) > 0 && ctx.NotifyFile != nil { cmd := exec.Command( sendmail[0], - append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)..., + append(sendmail[1:], ctx.NotifyFile.To)..., ) cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf( "File from %s: %s (%s)", @@ -314,33 +461,64 @@ func (ctx *Ctx) Toss( humanize.IBytes(uint64(pktSize)), ), nil) if err = cmd.Run(); err != nil { - ctx.LogE("rx", sds, err, "notify") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) } } } + case PktTypeFreq: if noFreq { goto Closing } src := string(pkt.Path[:int(pkt.PathLen)]) + les := append(les, LE{"Type", "freq"}, LE{"Src", src}) if filepath.IsAbs(src) { - ctx.LogE("rx", sds, errors.New("non-relative source path"), "") + ctx.LogE( + "rx-non-rel", les, errors.New("non-relative source path"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, + ) + }, + ) isBad = true goto Closing } - sds := SdsAdd(sds, SDS{"type": "freq", "src": src}) dstRaw, err := ioutil.ReadAll(pipeR) if err != nil { - ctx.LogE("rx", sds, err, "read") + ctx.LogE("rx-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s: reading", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, + ) + }) isBad = true goto Closing } dst := string(dstRaw) - sds["dst"] = dst + les = append(les, LE{"Dst", dst}) sender := ctx.Neigh[*job.PktEnc.Sender] freqPath := sender.FreqPath if freqPath == nil { - ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "") + ctx.LogE( + "rx-no-freq", les, errors.New("freqing is not allowed"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }, + ) isBad = true goto Closing } @@ -355,71 +533,136 @@ func (ctx *Ctx) Toss( sender.FreqMaxSize, ) if err != nil { - ctx.LogE("rx", sds, err, "tx file") + ctx.LogE("rx-tx", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s: txing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }) isBad = true goto Closing } } - ctx.LogI("rx", sds, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got file request %s to %s", + src, ctx.NodeName(job.PktEnc.Sender), + ) + }) if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", sds, err, "remove") + if err = os.Remove(job.Path); err != nil { + ctx.LogE("rx-remove", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s: removing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }) isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } if len(sendmail) > 0 && ctx.NotifyFreq != nil { cmd := exec.Command( sendmail[0], - append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)..., + append(sendmail[1:], ctx.NotifyFreq.To)..., ) cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf( "Freq from %s: %s", sender.Name, src, ), nil) if err = cmd.Run(); err != nil { - ctx.LogE("rx", sds, err, "notify") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }) } } } + case PktTypeTrns: if noTrns { goto Closing } - dst := new([blake2b.Size256]byte) + dst := new([MTHSize]byte) copy(dst[:], pkt.Path[:int(pkt.PathLen)]) nodeId := NodeId(*dst) node, known := ctx.Neigh[nodeId] - sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId}) + les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId}) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Tossing trns %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), + pktName, + humanize.IBytes(uint64(pktSize)), + nodeId.String(), + ) + } if !known { - ctx.LogE("rx", sds, errors.New("unknown node"), "") + ctx.LogE("rx-unknown", les, errors.New("unknown node"), logMsg) isBad = true goto Closing } - ctx.LogD("rx", sds, "taken") + ctx.LogD("rx-tx", les, logMsg) if !dryRun { if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil { - ctx.LogE("rx", sds, err, "tx trns") + ctx.LogE("rx", les, err, func(les LEs) string { + return logMsg(les) + ": txing" + }) isBad = true goto Closing } } - ctx.LogI("rx", sds, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got transitional packet from %s to %s (%s)", + ctx.NodeName(job.PktEnc.Sender), + ctx.NodeName(&nodeId), + humanize.IBytes(uint64(pktSize)), + ) + }) if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", sds, err, "remove") + if err = os.Remove(job.Path); err != nil { + ctx.LogE("rx", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing trns %s/%s (%s): %s: removing", + ctx.NodeName(job.PktEnc.Sender), + pktName, + humanize.IBytes(uint64(pktSize)), + ctx.NodeName(&nodeId), + ) + }) isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } } + default: - ctx.LogE("rx", sds, errors.New("unknown type"), "") + ctx.LogE( + "rx-type-unknown", les, errors.New("unknown type"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s (%s)", + ctx.NodeName(job.PktEnc.Sender), + pktName, + humanize.IBytes(uint64(pktSize)), + ) + }, + ) isBad = true } Closing: @@ -445,7 +688,7 @@ func (ctx *Ctx) AutoToss( default: } time.Sleep(time.Second) - bad = !ctx.Toss(nodeId, nice, false, doSeen, noFile, noFreq, noExec, noTrns) + bad = !ctx.Toss(nodeId, nice, false, doSeen, noFile, noFreq, noExec, noTrns) || bad } }() return finish, badCode