X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fcmd%2Fnncp-bundle%2Fmain.go;h=42a9417be18d53f94c2e24d8605ef264763d3ede;hb=0367cce2741e1ce6a89a49fd5c4e9df6005c9744;hp=f577a1c602069817d25b0ea22adc2c6c9cacf4de;hpb=fa24572f1280b56977c6dcf6969a736403d1280e;p=nncp.git diff --git a/src/cmd/nncp-bundle/main.go b/src/cmd/nncp-bundle/main.go index f577a1c..42a9417 100644 --- a/src/cmd/nncp-bundle/main.go +++ b/src/cmd/nncp-bundle/main.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -33,8 +33,8 @@ import ( "strings" xdr "github.com/davecgh/go-xdr/xdr2" - "go.cypherpunks.ru/nncp/v5" - "golang.org/x/crypto/blake2b" + "github.com/dustin/go-humanize" + "go.cypherpunks.ru/nncp/v8" ) const ( @@ -69,6 +69,7 @@ func main() { version = flag.Bool("version", false, "Print version information") warranty = flag.Bool("warranty", false, "Print warranty information") ) + log.SetFlags(log.Lshortfile) flag.Usage = usage flag.Parse() if *warranty { @@ -107,29 +108,40 @@ func main() { for i := 0; i < flag.NArg(); i++ { node, err := ctx.FindNode(flag.Arg(i)) if err != nil { - log.Fatalln("Invalid specified:", err) + log.Fatalln("Invalid node specified:", err) } nodeIds[*node.Id] = struct{}{} } ctx.Umask() - sds := nncp.SDS{} if *doTx { - sds["xx"] = string(nncp.TTx) var pktName string bufStdout := bufio.NewWriter(os.Stdout) tarWr := tar.NewWriter(bufStdout) - for nodeId, _ := range nodeIds { - sds["node"] = nodeId.String() + for nodeId := range nodeIds { for job := range ctx.Jobs(&nodeId, nncp.TTx) { - pktName = filepath.Base(job.Fd.Name()) - sds["pkt"] = pktName + pktName = filepath.Base(job.Path) + les := nncp.LEs{ + {K: "XX", V: string(nncp.TTx)}, + {K: "Node", V: nodeId.String()}, + {K: "Pkt", V: pktName}, + } if job.PktEnc.Nice > nice { - ctx.LogD("nncp-bundle", sds, "too nice") - job.Fd.Close() // #nosec G104 + ctx.LogD("bundle-tx-too-nice", les, func(les nncp.LEs) string { + return fmt.Sprintf( + "Bundle transfer %s/tx/%s: too nice %s", + ctx.NodeName(&nodeId), + pktName, + nncp.NicenessFmt(job.PktEnc.Nice), + ) + }) continue } + fd, err := os.Open(job.Path) + if err != nil { + log.Fatalln("Error during opening:", err) + } if err = tarWr.WriteHeader(&tar.Header{ Format: tar.FormatUSTAR, Name: nncp.NNCPBundlePrefix, @@ -153,16 +165,18 @@ func main() { log.Fatalln("Error writing tar header:", err) } if _, err = nncp.CopyProgressed( - tarWr, job.Fd, "Tx", - nncp.SdsAdd(sds, nncp.SDS{ - "pkt": nncp.Base32Codec.EncodeToString(job.HshValue[:]), - "fullsize": job.Size, - }), + tarWr, bufio.NewReader(fd), "Tx", + append(les, nncp.LEs{ + {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])}, + {K: "FullSize", V: job.Size}, + }...), ctx.ShowPrgrs, ); err != nil { log.Fatalln("Error during copying to tar:", err) } - job.Fd.Close() // #nosec G104 + if err = fd.Close(); err != nil { + log.Fatalln("Error during closing:", err) + } if err = tarWr.Flush(); err != nil { log.Fatalln("Error during tar flushing:", err) } @@ -170,11 +184,24 @@ func main() { log.Fatalln("Error during stdout flushing:", err) } if *doDelete { - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { log.Fatalln("Error during deletion:", err) + } else if ctx.HdrUsage { + os.Remove(nncp.JobPath2Hdr(job.Path)) } } - ctx.LogI("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{"size": job.Size}), "") + ctx.LogI( + "bundle-tx", + append(les, nncp.LE{K: "Size", V: job.Size}), + func(les nncp.LEs) string { + return fmt.Sprintf( + "Bundle transfer, sent to node %s %s (%s)", + ctx.NodeName(&nodeId), + pktName, + humanize.IBytes(uint64(job.Size)), + ) + }, + ) } } if err = tarWr.Close(); err != nil { @@ -194,152 +221,220 @@ func main() { if err == io.EOF { break } - bufStdin.Discard(bufStdin.Buffered() - (len(nncp.NNCPBundlePrefix) - 1)) // #nosec G104 + bufStdin.Discard(bufStdin.Buffered() - (len(nncp.NNCPBundlePrefix) - 1)) continue } if _, err = bufStdin.Discard(prefixIdx); err != nil { panic(err) } tarR := tar.NewReader(bufStdin) - sds["xx"] = string(nncp.TRx) entry, err := tarR.Next() if err != nil { if err != io.EOF { ctx.LogD( - "nncp-bundle", - nncp.SdsAdd(sds, nncp.SDS{"err": err}), - "error reading tar", + "bundle-rx-read-tar", + nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}}, + func(les nncp.LEs) string { + return "Bundle transfer rx: reading tar" + }, ) } continue } if entry.Typeflag != tar.TypeDir { - ctx.LogD("nncp-bundle", sds, "Expected NNCP/") + ctx.LogD( + "bundle-rx-read-tar", + nncp.LEs{ + {K: "XX", V: string(nncp.TRx)}, + {K: "Err", V: errors.New("expected NNCP/")}, + }, + func(les nncp.LEs) string { + return "Bundle transfer rx: reading tar" + }, + ) continue } entry, err = tarR.Next() if err != nil { if err != io.EOF { ctx.LogD( - "nncp-bundle", - nncp.SdsAdd(sds, nncp.SDS{"err": err}), - "error reading tar", + "bundle-rx-read-tar", + nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}}, + func(les nncp.LEs) string { + return "Bundle transfer rx: reading tar" + }, ) } continue } - sds["pkt"] = entry.Name + les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}} + logMsg := func(les nncp.LEs) string { + return "Bundle transfer rx/" + entry.Name + } if entry.Size < nncp.PktEncOverhead { - ctx.LogD("nncp-bundle", sds, "Too small packet") + ctx.LogD("bundle-rx-too-small", les, func(les nncp.LEs) string { + return logMsg(les) + ": too small packet" + }) continue } if !ctx.IsEnoughSpace(entry.Size) { - ctx.LogE("nncp-bundle", sds, errors.New("not enough spool space"), "") + ctx.LogE("bundle-rx", les, errors.New("not enough spool space"), logMsg) continue } pktName := filepath.Base(entry.Name) if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil { - ctx.LogD("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{"err": "bad packet name"}), "") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: "bad packet name"}), + logMsg, + ) continue } if _, err = io.ReadFull(tarR, pktEncBuf); err != nil { - ctx.LogD("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "read") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: err}), + logMsg, + ) continue } if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil { - ctx.LogD("nncp-bundle", sds, "Bad packet structure") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: "Bad packet structure"}), + logMsg, + ) continue } - if pktEnc.Magic != nncp.MagicNNCPEv4 { - ctx.LogD("nncp-bundle", sds, "Bad packet magic number") + switch pktEnc.Magic { + case nncp.MagicNNCPEv1.B: + err = nncp.MagicNNCPEv1.TooOld() + case nncp.MagicNNCPEv2.B: + err = nncp.MagicNNCPEv2.TooOld() + case nncp.MagicNNCPEv3.B: + err = nncp.MagicNNCPEv3.TooOld() + case nncp.MagicNNCPEv4.B: + err = nncp.MagicNNCPEv4.TooOld() + case nncp.MagicNNCPEv5.B: + default: + err = errors.New("Bad packet magic number") + } + if err != nil { + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: err.Error()}), + logMsg, + ) continue } if pktEnc.Nice > nice { - ctx.LogD("nncp-bundle", sds, "too nice") + ctx.LogD("bundle-rx-too-nice", les, func(les nncp.LEs) string { + return logMsg(les) + ": too nice" + }) continue } if *pktEnc.Sender == *ctx.SelfId && *doDelete { if len(nodeIds) > 0 { if _, exists := nodeIds[*pktEnc.Recipient]; !exists { - ctx.LogD("nncp-bundle", sds, "Recipient is not requested") + ctx.LogD("bundle-tx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": recipient is not requested" + }) continue } } nodeId32 := nncp.Base32Codec.EncodeToString(pktEnc.Recipient[:]) - sds["xx"] = string(nncp.TTx) - sds["node"] = nodeId32 - sds["pkt"] = pktName - dstPath := filepath.Join( - ctx.Spool, - nodeId32, - string(nncp.TTx), - pktName, - ) + les := nncp.LEs{ + {K: "XX", V: string(nncp.TTx)}, + {K: "Node", V: nodeId32}, + {K: "Pkt", V: pktName}, + } + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf("Bundle transfer %s/tx/%s", nodeId32, pktName) + } + dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName) if _, err = os.Stat(dstPath); err != nil { - ctx.LogD("nncp-bundle", sds, "Packet is already missing") + ctx.LogD("bundle-tx-missing", les, func(les nncp.LEs) string { + return logMsg(les) + ": packet is already missing" + }) continue } - hsh, err := blake2b.New256(nil) - if err != nil { - log.Fatalln("Error during hasher creation:", err) - } + hsh := nncp.MTHNew(entry.Size, 0) if _, err = hsh.Write(pktEncBuf); err != nil { log.Fatalln("Error during writing:", err) } if _, err = nncp.CopyProgressed( hsh, tarR, "Rx", - nncp.SdsAdd(sds, nncp.SDS{"fullsize": entry.Size}), + append(les, nncp.LE{K: "FullSize", V: entry.Size}), ctx.ShowPrgrs, ); err != nil { log.Fatalln("Error during copying:", err) } if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName { - ctx.LogI("nncp-bundle", sds, "removed") + ctx.LogI("bundle-tx-removed", les, func(les nncp.LEs) string { + return logMsg(les) + ": removed" + }) if !*dryRun { - os.Remove(dstPath) // #nosec G104 + os.Remove(dstPath) + if ctx.HdrUsage { + os.Remove(nncp.JobPath2Hdr(dstPath)) + } } } else { - ctx.LogE("nncp-bundle", sds, errors.New("bad checksum"), "") + ctx.LogE("bundle-tx", les, errors.New("bad checksum"), logMsg) } continue } if *pktEnc.Recipient != *ctx.SelfId { - ctx.LogD("nncp-bundle", sds, "Unknown recipient") + ctx.LogD("nncp-bundle", les, func(les nncp.LEs) string { + return logMsg(les) + ": unknown recipient" + }) continue } if len(nodeIds) > 0 { if _, exists := nodeIds[*pktEnc.Sender]; !exists { - ctx.LogD("nncp-bundle", sds, "Sender is not requested") + ctx.LogD("bundle-rx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": sender is not requested" + }) continue } } - sds["node"] = nncp.Base32Codec.EncodeToString(pktEnc.Recipient[:]) - sds["pkt"] = pktName - sds["fullsize"] = entry.Size - selfPath := filepath.Join(ctx.Spool, ctx.SelfId.String(), string(nncp.TRx)) - dstPath := filepath.Join(selfPath, pktName) + sender := nncp.Base32Codec.EncodeToString(pktEnc.Sender[:]) + les = nncp.LEs{ + {K: "XX", V: string(nncp.TRx)}, + {K: "Node", V: sender}, + {K: "Pkt", V: pktName}, + {K: "FullSize", V: entry.Size}, + } + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf("Bundle transfer %s/rx/%s", sender, pktName) + } + dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx)) + dstPath := filepath.Join(dstDirPath, pktName) if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-bundle", sds, "Packet already exists") + ctx.LogD("bundle-rx-exists", les, func(les nncp.LEs) string { + return logMsg(les) + ": packet already exists" + }) continue } - if _, err = os.Stat(dstPath + nncp.SeenSuffix); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-bundle", sds, "Packet already exists") + if _, err = os.Stat(filepath.Join( + dstDirPath, nncp.SeenDir, pktName, + )); err == nil || !os.IsNotExist(err) { + ctx.LogD("bundle-rx-seen", les, func(les nncp.LEs) string { + return logMsg(les) + ": packet already seen" + }) continue } if *doCheck { if *dryRun { - hsh, err := blake2b.New256(nil) - if err != nil { - log.Fatalln("Error during hasher creation:", err) - } + hsh := nncp.MTHNew(entry.Size, 0) if _, err = hsh.Write(pktEncBuf); err != nil { log.Fatalln("Error during writing:", err) } - if _, err = nncp.CopyProgressed(hsh, tarR, "check", sds, ctx.ShowPrgrs); err != nil { + if _, err = nncp.CopyProgressed(hsh, tarR, "check", les, ctx.ShowPrgrs); err != nil { log.Fatalln("Error during copying:", err) } if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName { - ctx.LogE("nncp-bundle", sds, errors.New("bad checksum"), "") + ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg) continue } } else { @@ -350,25 +445,25 @@ func main() { if _, err = tmp.W.Write(pktEncBuf); err != nil { log.Fatalln("Error during writing:", err) } - if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", sds, ctx.ShowPrgrs); err != nil { + if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", les, ctx.ShowPrgrs); err != nil { log.Fatalln("Error during copying:", err) } if err = tmp.W.Flush(); err != nil { log.Fatalln("Error during flusing:", err) } if nncp.Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) == pktName { - if err = tmp.Commit(selfPath); err != nil { + if err = tmp.Commit(dstDirPath); err != nil { log.Fatalln("Error during commiting:", err) } } else { - ctx.LogE("nncp-bundle", sds, errors.New("bad checksum"), "") + ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg) tmp.Cancel() continue } } } else { if *dryRun { - if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", sds, ctx.ShowPrgrs); err != nil { + if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", les, ctx.ShowPrgrs); err != nil { log.Fatalln("Error during copying:", err) } } else { @@ -380,7 +475,7 @@ func main() { if _, err = bufTmp.Write(pktEncBuf); err != nil { log.Fatalln("Error during writing:", err) } - if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", sds, ctx.ShowPrgrs); err != nil { + if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", les, ctx.ShowPrgrs); err != nil { log.Fatalln("Error during copying:", err) } if err = bufTmp.Flush(); err != nil { @@ -392,20 +487,32 @@ func main() { if err = tmp.Close(); err != nil { log.Fatalln("Error during closing:", err) } - if err = os.MkdirAll(selfPath, os.FileMode(0777)); err != nil { + if err = os.MkdirAll(dstDirPath, os.FileMode(0777)); err != nil { log.Fatalln("Error during mkdir:", err) } if err = os.Rename(tmp.Name(), dstPath); err != nil { log.Fatalln("Error during renaming:", err) } - if err = nncp.DirSync(selfPath); err != nil { + if err = nncp.DirSync(dstDirPath); err != nil { log.Fatalln("Error during syncing:", err) } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncBuf, dstPath) + } } } - ctx.LogI("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{ - "size": sds["fullsize"], - }), "") + for _, le := range les { + if le.K == "FullSize" { + les = append(les, nncp.LE{K: "Size", V: le.V}) + break + } + } + ctx.LogI("bundle-rx", les, func(les nncp.LEs) string { + return fmt.Sprintf( + "Bundle transfer, received from %s %s (%s)", + sender, pktName, humanize.IBytes(uint64(entry.Size)), + ) + }) } } }