X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fcmd%2Fnncp-xfer%2Fmain.go;h=a1ef5fa5d51c438179b33450f7876352d62d5242;hb=2cad23b498bbb9fc1e929b4900b40e520017c614;hp=fde11174e9c19297aa573a8af86cd4c64d8848c3;hpb=dd887c15fa21071a2f4931f7248e10c4ab1029d2;p=nncp.git diff --git a/src/cmd/nncp-xfer/main.go b/src/cmd/nncp-xfer/main.go index fde1117..a1ef5fa 100644 --- a/src/cmd/nncp-xfer/main.go +++ b/src/cmd/nncp-xfer/main.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2019 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -20,16 +20,16 @@ package main import ( "bufio" + "errors" "flag" "fmt" "io" "log" "os" "path/filepath" - "strconv" - "github.com/davecgh/go-xdr/xdr2" - "go.cypherpunks.ru/nncp/v5" + "github.com/dustin/go-humanize" + "go.cypherpunks.ru/nncp/v8" ) func usage() { @@ -51,10 +51,13 @@ func main() { spoolPath = flag.String("spool", "", "Override path to spool") logPath = flag.String("log", "", "Override path to logfile") quiet = flag.Bool("quiet", false, "Print only errors") + showPrgrs = flag.Bool("progress", false, "Force progress showing") + omitPrgrs = flag.Bool("noprogress", false, "Omit progress showing") debug = flag.Bool("debug", false, "Print debug messages") version = flag.Bool("version", false, "Print version information") warranty = flag.Bool("warranty", false, "Print warranty information") ) + log.SetFlags(log.Lshortfile) flag.Usage = usage flag.Parse() if *warranty { @@ -77,7 +80,15 @@ func main() { log.Fatalln("-rx and -tx can not be set simultaneously") } - ctx, err := nncp.CtxFromCmdline(*cfgPath, *spoolPath, *logPath, *quiet, *debug) + ctx, err := nncp.CtxFromCmdline( + *cfgPath, + *spoolPath, + *logPath, + *quiet, + *showPrgrs, + *omitPrgrs, + *debug, + ) if err != nil { log.Fatalln("Error during initialization:", err) } @@ -95,32 +106,46 @@ func main() { isBad := false var dir *os.File var fis []os.FileInfo - sds := nncp.SDS{} + var les nncp.LEs + var logMsg func(les nncp.LEs) string if *txOnly { goto Tx } - sds["xx"] = string(nncp.TRx) - sds["dir"] = selfPath - ctx.LogD("nncp-xfer", sds, "self") + les = nncp.LEs{ + {K: "XX", V: string(nncp.TRx)}, + {K: "Dir", V: selfPath}, + } + logMsg = func(les nncp.LEs) string { + return "Packet transfer, received from self" + } + ctx.LogD("xfer-self", les, logMsg) if _, err = os.Stat(selfPath); err != nil { if os.IsNotExist(err) { - ctx.LogD("nncp-xfer", sds, "no dir") + ctx.LogD("xfer-self-no-dir", les, func(les nncp.LEs) string { + return logMsg(les) + ": no directory" + }) goto Tx } - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "stat") + ctx.LogE("xfer-self-stat", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": stating" + }) isBad = true goto Tx } dir, err = os.Open(selfPath) if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "open") + ctx.LogE("xfer-self-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) isBad = true goto Tx } fis, err = dir.Readdir(0) dir.Close() if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "read") + ctx.LogE("xfer-self-read", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": reading" + }) isBad = true goto Tx } @@ -129,29 +154,42 @@ func main() { continue } nodeId, err := nncp.NodeIdFromString(fi.Name()) - sds["node"] = fi.Name() + les := append(les, nncp.LE{K: "Node", V: fi.Name()}) + logMsg := func(les nncp.LEs) string { + return "Packet transfer, received from " + ctx.NodeName(nodeId) + } if err != nil { - ctx.LogD("nncp-xfer", sds, "is not NodeId") + ctx.LogD("xfer-rx-not-node", les, func(les nncp.LEs) string { + return logMsg(les) + ": is not NodeId" + }) continue } if nodeOnly != nil && *nodeId != *nodeOnly.Id { - ctx.LogD("nncp-xfer", sds, "skip") + ctx.LogD("xfer-rx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": skipping" + }) continue } if _, known := ctx.Neigh[*nodeId]; !known { - ctx.LogD("nncp-xfer", sds, "unknown") + ctx.LogD("xfer-rx-unknown", les, func(les nncp.LEs) string { + return logMsg(les) + ": unknown" + }) continue } dir, err = os.Open(filepath.Join(selfPath, fi.Name())) if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "open") + ctx.LogE("xfer-rx-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) isBad = true continue } fisInt, err := dir.Readdir(0) dir.Close() if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "read") + ctx.LogE("xfer-rx-read", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": reading" + }) isBad = true continue } @@ -159,46 +197,121 @@ func main() { if !fi.IsDir() { continue } + // Check that it is valid Base32 encoding + if _, err = nncp.NodeIdFromString(fiInt.Name()); err != nil { + continue + } filename := filepath.Join(dir.Name(), fiInt.Name()) - sds["file"] = filename - delete(sds, "size") + les := append(les, nncp.LE{K: "File", V: filename}) + logMsg := func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, received from %s: %s", + ctx.NodeName(nodeId), filename, + ) + } fd, err := os.Open(filename) if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "open") + ctx.LogE("xfer-rx-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) isBad = true continue } - var pktEnc nncp.PktEnc - _, err = xdr.Unmarshal(fd, &pktEnc) - if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 { - ctx.LogD("nncp-xfer", sds, "is not a packet") + pktEnc, pktEncRaw, err := ctx.HdrRead(fd) + if err == nil { + switch pktEnc.Magic { + case nncp.MagicNNCPEv1.B: + err = nncp.MagicNNCPEv1.TooOld() + case nncp.MagicNNCPEv2.B: + err = nncp.MagicNNCPEv2.TooOld() + case nncp.MagicNNCPEv3.B: + err = nncp.MagicNNCPEv3.TooOld() + case nncp.MagicNNCPEv4.B: + err = nncp.MagicNNCPEv4.TooOld() + case nncp.MagicNNCPEv5.B: + err = nncp.MagicNNCPEv5.TooOld() + case nncp.MagicNNCPEv6.B: + default: + err = errors.New("is not an encrypted packet") + } + } + if err != nil { + ctx.LogD( + "xfer-rx-not-packet", + append(les, nncp.LE{K: "Err", V: err}), + func(les nncp.LEs) string { + return logMsg(les) + ": not valid packet: " + err.Error() + }, + ) fd.Close() continue } if pktEnc.Nice > nice { - ctx.LogD("nncp-xfer", sds, "too nice") + ctx.LogD("xfer-rx-too-nice", les, func(les nncp.LEs) string { + return logMsg(les) + ": too nice" + }) fd.Close() continue } - sds["size"] = strconv.FormatInt(fiInt.Size(), 10) + les = append(les, nncp.LE{K: "Size", V: fiInt.Size()}) + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, received from %s: %s (%s)", + ctx.NodeName(nodeId), filename, + humanize.IBytes(uint64(fiInt.Size())), + ) + } if !ctx.IsEnoughSpace(fiInt.Size()) { - ctx.LogE("nncp-xfer", sds, "is not enough space") + ctx.LogE("xfer-rx", les, errors.New("is not enough space"), logMsg) fd.Close() continue } - fd.Seek(0, 0) + if _, err = fd.Seek(0, io.SeekStart); err != nil { + log.Fatalln(err) + } tmp, err := ctx.NewTmpFileWHash() if err != nil { log.Fatalln(err) } - if _, err = io.CopyN(tmp.W, bufio.NewReader(fd), fiInt.Size()); err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "copy") + r, w := io.Pipe() + go func() { + _, err := io.CopyN(w, bufio.NewReader(fd), fiInt.Size()) + if err == nil { + err = w.Close() + } + if err != nil { + ctx.LogE("xfer-rx", les, err, logMsg) + w.CloseWithError(err) + } + }() + _, err = nncp.CopyProgressed( + tmp.W, r, "Rx", + append( + les, + nncp.LE{K: "Pkt", V: filename}, + nncp.LE{K: "FullSize", V: fiInt.Size()}, + ), + ctx.ShowPrgrs, + ) + fd.Close() + if err != nil { + ctx.LogE("xfer-rx", les, err, logMsg) + tmp.Cancel() isBad = true - fd.Close() + continue + } + if err = tmp.W.Flush(); err != nil { + ctx.LogE("xfer-rx", les, err, logMsg) tmp.Cancel() + isBad = true + continue + } + if tmp.Checksum() != fiInt.Name() { + ctx.LogE("xfer-rx", les, errors.New("checksum mismatch"), logMsg) + tmp.Cancel() + isBad = true continue } - fd.Close() if err = tmp.Commit(filepath.Join( ctx.Spool, nodeId.String(), @@ -206,13 +319,21 @@ func main() { )); err != nil { log.Fatalln(err) } - ctx.LogI("nncp-xfer", sds, "") + ctx.LogI("xfer-rx", les, logMsg) if !*keep { if err = os.Remove(filename); err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "remove") + ctx.LogE("xfer-rx-remove", les, err, logMsg) isBad = true } } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join( + ctx.Spool, + nodeId.String(), + string(nncp.TRx), + tmp.Checksum(), + )) + } } } @@ -223,122 +344,198 @@ Tx: } return } - sds["xx"] = string(nncp.TTx) - for nodeId, _ := range ctx.Neigh { - sds["node"] = nodeId + for nodeId := range ctx.Neigh { + les := nncp.LEs{{K: "XX", V: string(nncp.TTx)}, {K: "Node", V: nodeId}} + logMsg := func(les nncp.LEs) string { + return "Packet transfer, sent to " + ctx.NodeName(&nodeId) + } if nodeOnly != nil && nodeId != *nodeOnly.Id { - ctx.LogD("nncp-xfer", sds, "skip") + ctx.LogD("xfer-tx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": skipping" + }) continue } - dirLock, err := ctx.LockDir(&nodeId, nncp.TTx) + dirLock, err := ctx.LockDir(&nodeId, string(nncp.TTx)) if err != nil { continue } nodePath := filepath.Join(flag.Arg(0), nodeId.String()) - sds["dir"] = nodePath + les = append(les, nncp.LE{K: "Dir", V: nodePath}) + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, sent to %s: directory %s", + ctx.NodeName(&nodeId), nodePath, + ) + } _, err = os.Stat(nodePath) if err != nil { if os.IsNotExist(err) { - ctx.LogD("nncp-xfer", sds, "does not exist") + ctx.LogD("xfer-tx-not-exist", les, func(les nncp.LEs) string { + return logMsg(les) + ": does not exist" + }) if !*mkdir { ctx.UnlockDir(dirLock) continue } if err = os.Mkdir(nodePath, os.FileMode(0777)); err != nil { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "mkdir") + ctx.LogE("xfer-tx-mkdir", les, err, logMsg) isBad = true continue } } else { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "stat") + ctx.LogE("xfer-tx", les, err, logMsg) isBad = true continue } } dstPath := filepath.Join(nodePath, ctx.SelfId.String()) - sds["dir"] = dstPath + les[len(les)-1].V = dstPath + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, sent to %s: directory %s", + ctx.NodeName(&nodeId), dstPath, + ) + } _, err = os.Stat(dstPath) if err != nil { if os.IsNotExist(err) { if err = os.Mkdir(dstPath, os.FileMode(0777)); err != nil { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "mkdir") + ctx.LogE("xfer-tx-mkdir", les, err, logMsg) isBad = true continue } } else { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "stat") + ctx.LogE("xfer-tx", les, err, logMsg) isBad = true continue } } - delete(sds, "dir") + les = les[:len(les)-1] for job := range ctx.Jobs(&nodeId, nncp.TTx) { - pktName := filepath.Base(job.Fd.Name()) - sds["pkt"] = pktName + pktName := filepath.Base(job.Path) + les := append(les, nncp.LE{K: "Pkt", V: pktName}) + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, sent to %s: %s", + ctx.NodeName(&nodeId), pktName, + ) + } if job.PktEnc.Nice > nice { - ctx.LogD("nncp-xfer", sds, "too nice") - job.Fd.Close() + ctx.LogD("xfer-tx-too-nice", les, func(les nncp.LEs) string { + return logMsg(les) + ": too nice" + }) continue } if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-xfer", sds, "already exists") - job.Fd.Close() + ctx.LogD("xfer-tx-exists", les, func(les nncp.LEs) string { + return logMsg(les) + ": already exists" + }) continue } - if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-xfer", sds, "already exists") - job.Fd.Close() + if _, err = os.Stat(filepath.Join( + dstPath, nncp.SeenDir, pktName, + )); err == nil || !os.IsNotExist(err) { + ctx.LogD("xfer-tx-seen", les, func(les nncp.LEs) string { + return logMsg(les) + ": already seen" + }) continue } tmp, err := nncp.TempFile(dstPath, "xfer") if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "mktemp") - job.Fd.Close() + ctx.LogE("xfer-tx-mktemp", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": mktemp" + }) isBad = true break } - sds["tmp"] = tmp.Name() - ctx.LogD("nncp-xfer", sds, "created") - bufW := bufio.NewWriter(tmp) - copied, err := io.Copy(bufW, bufio.NewReader(job.Fd)) - job.Fd.Close() + les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()}) + ctx.LogD("xfer-tx-tmp-create", les, func(les nncp.LEs) string { + return fmt.Sprintf("%s: temporary %s created", logMsg(les), tmp.Name()) + }) + fd, err := os.Open(job.Path) if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "copy") + ctx.LogE("xfer-tx-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) tmp.Close() isBad = true continue } - if err = bufW.Flush(); err != nil { + bufW := bufio.NewWriter(tmp) + copied, err := nncp.CopyProgressed( + bufW, bufio.NewReader(fd), "Tx", + append(les, nncp.LE{K: "FullSize", V: job.Size}), + ctx.ShowPrgrs, + ) + fd.Close() + if err != nil { + ctx.LogE("xfer-tx-copy", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": copying" + }) tmp.Close() - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "flush") isBad = true continue } - if err = tmp.Sync(); err != nil { + if err = bufW.Flush(); err != nil { tmp.Close() - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "sync") + ctx.LogE("xfer-tx-flush", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": flushing" + }) isBad = true continue } - tmp.Close() + if !nncp.NoSync { + if err = tmp.Sync(); err != nil { + tmp.Close() + ctx.LogE("xfer-tx-sync", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": syncing" + }) + isBad = true + continue + } + } + if err = tmp.Close(); err != nil { + ctx.LogE("xfer-tx-close", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": closing" + }) + } if err = os.Rename(tmp.Name(), filepath.Join(dstPath, pktName)); err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "rename") + ctx.LogE("xfer-tx-rename", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": renaming" + }) + isBad = true + continue + } + if err = nncp.DirSync(dstPath); err != nil { + ctx.LogE("xfer-tx-dirsync", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": dirsyncing" + }) isBad = true continue } os.Remove(filepath.Join(dstPath, pktName+".part")) - delete(sds, "tmp") - ctx.LogI("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{ - "size": strconv.FormatInt(copied, 10), - }), "") + les = les[:len(les)-1] + ctx.LogI( + "xfer-tx", + append(les, nncp.LE{K: "Size", V: copied}), + func(les nncp.LEs) string { + return fmt.Sprintf( + "%s (%s)", logMsg(les), humanize.IBytes(uint64(copied)), + ) + }, + ) if !*keep { - if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "remove") + if err = os.Remove(job.Path); err != nil { + ctx.LogE("xfer-tx-remove", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": removing" + }) isBad = true + } else if ctx.HdrUsage { + os.Remove(nncp.JobPath2Hdr(job.Path)) } } }