From: Sergey Matveev Date: Fri, 4 Mar 2022 13:51:18 +0000 (+0300) Subject: Merge branch 'develop' X-Git-Tag: v8.7.0^0 X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=commitdiff_plain;h=e523d3daf5cfcda8f0dc51e7003987c7fe693195;hp=835c153195c63c59df9a40a4de1e16fff09c003f Merge branch 'develop' --- diff --git a/doc/cmd/nncp-ack.texi b/doc/cmd/nncp-ack.texi index 5635cc1..a3b374d 100644 --- a/doc/cmd/nncp-ack.texi +++ b/doc/cmd/nncp-ack.texi @@ -7,6 +7,8 @@ $ nncp-ack [options] -all $ nncp-ack [options] -node NODE[,@dots{}] $ nncp-ack [options] -node NODE -pkt PKT + +$ nncp-ack [@dots{}] 4>&1 >&2 | nncp-rm [@dots{}] -pkt @end example Send acknowledgement of successful @option{PKT} (Base32-encoded hash) @@ -14,6 +16,11 @@ packet receipt from @option{NODE} node. If no @option{-pkt} is specified, then acknowledge all packet in node's @code{rx} outbound spool. If @option{-all} is specified, then do that for all nodes. +That commands outputs list of created encrypted ACK packets +(@code{NODE/PKT}) to @strong{4}th file descriptor. That output can be +passed for example to @command{@ref{nncp-rm}} to remove them after +transmission to not wait for acknowledgement and retransmission. + General workflow with acknowledgement is following, assuming that Alice has some outbound packets for Bob: @@ -33,16 +40,24 @@ bob$ nncp-xfer -rx /mnt/shared That will also check if copied packets checksum is not mismatched. -@item Create ACK packets of received ones: +@item Create ACK packets of received ones, saving the list of encrypted + ACK packets: @example -bob$ nncp-ack -node alice +bob$ nncp-ack -node alice 4>acks @end example @item Send those newly created packets back to Alice: @example -bob$ nncp-xfer -tx /mnt/shared +bob$ nncp-xfer [-keep] -tx /mnt/shared +@end example + +@item Remove them from outbound spool, because we expect no + acknowledgement for them: + +@example +bob$ nncp-rm -node alice -pkt cfg.hjson @option{-dump} option dumps current configuration file to the @ref{Configuration directory, directory layout} at @file{/path/to/dir}. @option{-load} loads it and parses, outputing the resulting Hjson to -stdout. +@code{stdout}. diff --git a/doc/cmd/nncp-hash.texi b/doc/cmd/nncp-hash.texi index afed745..17987e0 100644 --- a/doc/cmd/nncp-hash.texi +++ b/doc/cmd/nncp-hash.texi @@ -6,7 +6,7 @@ $ nncp-hash [-file @dots{}] [-seek X] [-debug] [-progress] @end example -Calculate @ref{MTH} hash of either stdin, or @option{-file} if +Calculate @ref{MTH} hash of either @code{stdin}, or @option{-file} if specified. You can optionally force seeking the file first, reading only part of diff --git a/doc/cmd/nncp-rm.texi b/doc/cmd/nncp-rm.texi index 4c74f0a..0267673 100644 --- a/doc/cmd/nncp-rm.texi +++ b/doc/cmd/nncp-rm.texi @@ -11,7 +11,12 @@ $ nncp-rm [options] @{-all|-node NODE@} -nock $ nncp-rm [options] @{-all|-node NODE@} -hdr $ nncp-rm [options] @{-all|-node NODE@} -area $ nncp-rm [options] @{-all|-node NODE@} [-rx] [-tx] -$ nncp-rm [options] @{-all|-node NODE@} -pkt PKT +$ nncp-rm [options] @{-all|-node NODE@} -pkt <. package main import ( + "bufio" + "errors" "flag" "fmt" + "io" "log" "os" "path/filepath" "strings" + xdr "github.com/davecgh/go-xdr/xdr2" "go.cypherpunks.ru/nncp/v8" ) @@ -33,8 +37,8 @@ func usage() { fmt.Fprintf(os.Stderr, nncp.UsageHeader()) fmt.Fprintf(os.Stderr, "nncp-ack -- send packet receipt acknowledgement\n\n") fmt.Fprintf(os.Stderr, "Usage: %s [options] -all\n", os.Args[0]) - fmt.Fprintf(os.Stderr, "Usage: %s -node NODE[,...]\n", os.Args[0]) - fmt.Fprintf(os.Stderr, "Usage: %s -node NODE -pkt PKT\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Usage: %s [options] -node NODE[,...]\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Usage: %s [options] -node NODE -pkt PKT\n", os.Args[0]) fmt.Fprintln(os.Stderr, "Options:") flag.PrintDefaults() } @@ -115,24 +119,119 @@ func main() { os.Exit(1) } + acksCreated := os.NewFile(uintptr(4), "ACKsCreated") + if acksCreated == nil { + log.Fatalln("can not open FD:4") + } + if *pktRaw != "" { if len(nodes) != 1 { usage() os.Exit(1) } nncp.ViaOverride(*viaOverride, ctx, nodes[0]) - if err = ctx.TxACK(nodes[0], nice, *pktRaw, minSize); err != nil { + pktName, err := ctx.TxACK(nodes[0], nice, *pktRaw, minSize) + if err != nil { log.Fatalln(err) } + acksCreated.WriteString(nodes[0].Id.String() + "/" + pktName + "\n") return } + isBad := false for _, node := range nodes { for job := range ctx.Jobs(node.Id, nncp.TRx) { pktName := filepath.Base(job.Path) - if err = ctx.TxACK(node, nice, pktName, minSize); err != nil { + sender := ctx.Neigh[*job.PktEnc.Sender] + les := nncp.LEs{ + {K: "Node", V: job.PktEnc.Sender}, + {K: "Pkt", V: pktName}, + } + logMsg := func(les nncp.LEs) string { + return fmt.Sprintf( + "ACKing %s/%s", + ctx.NodeName(job.PktEnc.Sender), pktName, + ) + } + if sender == nil { + err := errors.New("unknown node") + ctx.LogE("ack-read", les, err, logMsg) + isBad = true + continue + } + fd, err := os.Open(job.Path) + if err != nil { + ctx.LogE("ack-read-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + job.Path + }) + isBad = true + continue + } + pktEnc, _, err := ctx.HdrRead(fd) + if err != nil { + fd.Close() + ctx.LogE("ack-read-read", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": reading" + job.Path + }) + isBad = true + continue + } + switch pktEnc.Magic { + case nncp.MagicNNCPEv1.B: + err = nncp.MagicNNCPEv1.TooOld() + case nncp.MagicNNCPEv2.B: + err = nncp.MagicNNCPEv2.TooOld() + case nncp.MagicNNCPEv3.B: + err = nncp.MagicNNCPEv3.TooOld() + case nncp.MagicNNCPEv4.B: + err = nncp.MagicNNCPEv4.TooOld() + case nncp.MagicNNCPEv5.B: + err = nncp.MagicNNCPEv5.TooOld() + case nncp.MagicNNCPEv6.B: + default: + err = errors.New("is not an encrypted packet") + } + if err != nil { + fd.Close() + ctx.LogE("ack-read-magic", les, err, logMsg) + isBad = true + continue + } + if _, err = fd.Seek(0, io.SeekStart); err != nil { + fd.Close() + ctx.LogE("ack-read-seek", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": seeking" + }) + isBad = true + continue + } + pipeR, pipeW := io.Pipe() + go nncp.PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeW, true, nil) + var pkt nncp.Pkt + _, err = xdr.Unmarshal(pipeR, &pkt) + fd.Close() + pipeW.Close() + if err != nil { + ctx.LogE("ack-read-unmarshal", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": unmarshal" + }) + isBad = true + continue + } + if pkt.Type == nncp.PktTypeACK { + ctx.LogI("ack-read-if-ack", les, func(les nncp.LEs) string { + return logMsg(les) + ": it is ACK, skipping" + }) + continue + } + newPktName, err := ctx.TxACK(node, nice, pktName, minSize) + if err != nil { log.Fatalln(err) } + acksCreated.WriteString(node.Id.String() + "/" + newPktName + "\n") } } + if isBad { + os.Exit(1) + } } diff --git a/src/cmd/nncp-rm/main.go b/src/cmd/nncp-rm/main.go index 67f6657..814028c 100644 --- a/src/cmd/nncp-rm/main.go +++ b/src/cmd/nncp-rm/main.go @@ -21,6 +21,7 @@ package main import ( "flag" "fmt" + "io" "log" "os" "path/filepath" @@ -43,7 +44,7 @@ func usage() { fmt.Fprintf(os.Stderr, " %s [options] {-all|-node NODE} -hdr\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] {-all|-node NODE} -area\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] {-all|-node NODE} {-rx|-tx}\n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s [options] {-all|-node NODE} -pkt PKT\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [options] {-all|-node NODE} -pkt < ...\n", os.Args[0]) fmt.Fprintln(os.Stderr, "-older option's time units are: (s)econds, (m)inutes, (h)ours, (d)ays") fmt.Fprintln(os.Stderr, "Options:") flag.PrintDefaults() @@ -65,7 +66,7 @@ func main() { doArea = flag.Bool("area", false, "Remove only area/* seen files") older = flag.String("older", "", "XXX{smhd}: only older than XXX number of time units") dryRun = flag.Bool("dryrun", false, "Do not actually remove files") - pktRaw = flag.String("pkt", "", "Packet to remove") + doPkt = flag.Bool("pkt", false, "Packet to remove TODO") spoolPath = flag.String("spool", "", "Override path to spool") quiet = flag.Bool("quiet", false, "Print only errors") debug = flag.Bool("debug", false, "Print debug messages") @@ -114,6 +115,21 @@ func main() { } oldBoundary := time.Second * time.Duration(oldBoundaryRaw) + pkts := make(map[string]struct{}) + if *doPkt { + raw, err := io.ReadAll(os.Stdin) + if err != nil { + log.Fatalln("can not read -pkt from stdin:", err) + } + for _, line := range strings.Split(string(raw), "\n") { + if len(line) == 0 { + continue + } + cols := strings.Split(line, "/") + pkts[cols[len(cols)-1]] = struct{}{} + } + } + now := time.Now() if *doTmp { err = filepath.Walk( @@ -215,12 +231,14 @@ func main() { } return os.Remove(path) } - if *pktRaw != "" && filepath.Base(info.Name()) == *pktRaw { - ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg) - if *dryRun { - return nil + if len(pkts) > 0 { + if _, exists := pkts[filepath.Base(info.Name())]; exists { + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg) + if *dryRun { + return nil + } + return os.Remove(path) } - return os.Remove(path) } if !*doSeen && !*doNoCK && !*doHdr && !*doPart && (*doRx || *doTx) && @@ -234,12 +252,12 @@ func main() { return nil }) } - if *pktRaw != "" || *doRx || *doNoCK || *doPart { + if len(pkts) > 0 || *doRx || *doNoCK || *doPart { if err = remove(nncp.TRx); err != nil { log.Fatalln("Can not remove:", err) } } - if *pktRaw != "" || *doTx || *doHdr { + if len(pkts) > 0 || *doTx || *doHdr { if err = remove(nncp.TTx); err != nil { log.Fatalln("Can not remove:", err) } @@ -315,9 +333,12 @@ func main() { return nil } if now.Sub(info.ModTime()) < oldBoundary { - ctx.LogD("rm-skip", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string { - return fmt.Sprintf("File %s: too fresh, skipping", path) - }) + ctx.LogD( + "rm-skip", nncp.LEs{{K: "File", V: path}}, + func(les nncp.LEs) string { + return fmt.Sprintf("File %s: too fresh, skipping", path) + }, + ) return nil } ctx.LogI( diff --git a/src/cmd/nncp-trns/main.go b/src/cmd/nncp-trns/main.go index 27d9d6b..bd822d1 100644 --- a/src/cmd/nncp-trns/main.go +++ b/src/cmd/nncp-trns/main.go @@ -137,7 +137,7 @@ func main() { if err != nil { panic(err) } - if _, _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, pktTrns, nice, diff --git a/src/nncp.go b/src/nncp.go index 0ae3470..ff817da 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -40,7 +40,7 @@ along with this program. If not, see .` const Base32Encoded32Len = 52 var ( - Version string = "8.6.0" + Version string = "8.7.0" Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) ) diff --git a/src/toss.go b/src/toss.go index ebe20fb..79cc50f 100644 --- a/src/toss.go +++ b/src/toss.go @@ -621,7 +621,7 @@ func jobProcess( if err != nil { panic(err) } - if _, _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, pktTrns, nice, @@ -752,7 +752,7 @@ func jobProcess( } if nodeId != sender.Id && nodeId != pktEnc.Sender { ctx.LogI("rx-area-echo", lesEcho, logMsgNode) - if _, _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, &pkt, nice, @@ -926,6 +926,8 @@ func jobProcess( return logMsg(les) + ": removing packet" }) return err + } else if ctx.HdrUsage { + os.Remove(JobPath2Hdr(pktPath)) } } } else { diff --git a/src/tx.go b/src/tx.go index 6e13f2e..e38084e 100644 --- a/src/tx.go +++ b/src/tx.go @@ -57,12 +57,12 @@ func (ctx *Ctx) Tx( src io.Reader, pktName string, areaId *AreaId, -) (*Node, int64, error) { +) (*Node, int64, string, error) { var area *Area if areaId != nil { area = ctx.AreaId2Area[*areaId] if area.Prv == nil { - return nil, 0, errors.New("area has no encryption keys") + return nil, 0, "", errors.New("area has no encryption keys") } } hops := make([]*Node, 0, 1+len(node.Via)) @@ -82,15 +82,15 @@ func (ctx *Ctx) Tx( expectedSize += sizePadCalc(expectedSize, minSize, wrappers) expectedSize = PktEncOverhead + sizeWithTags(expectedSize) if maxSize != 0 && expectedSize > maxSize { - return nil, 0, TooBig + return nil, 0, "", TooBig } if !ctx.IsEnoughSpace(expectedSize) { - return nil, 0, errors.New("is not enough space") + return nil, 0, "", errors.New("is not enough space") } } tmp, err := ctx.NewTmpFileWHash() if err != nil { - return nil, 0, err + return nil, 0, "", err } results := make(chan PktEncWriteResult) @@ -211,7 +211,7 @@ func (ctx *Ctx) Tx( r := <-results if r.err != nil { tmp.Fd.Close() - return nil, 0, r.err + return nil, 0, "", r.err } if r.pktEncRaw != nil { pktEncRaw = r.pktEncRaw @@ -224,7 +224,7 @@ func (ctx *Ctx) Tx( err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) if err != nil { - return lastNode, 0, err + return lastNode, 0, "", err } if ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) @@ -255,18 +255,18 @@ func (ctx *Ctx) Tx( } if err = ensureDir(seenDir); err != nil { ctx.LogE("tx-mkdir", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } if fd, err := os.Create(seenPath); err == nil { fd.Close() if err = DirSync(seenDir); err != nil { ctx.LogE("tx-dirsync", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } } ctx.LogI("tx-area", les, logMsg) } - return lastNode, payloadSize, err + return lastNode, payloadSize, tmp.Checksum(), err } type DummyCloser struct{} @@ -438,7 +438,7 @@ func (ctx *Ctx) TxFile( if err != nil { return err } - _, finalSize, err := ctx.Tx( + _, finalSize, pktName, err := ctx.Tx( node, pkt, nice, srcSize, minSize, maxSize, bufio.NewReader(reader), dstPath, areaId, @@ -450,6 +450,7 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", dstPath}, {"Size", finalSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -480,7 +481,7 @@ func (ctx *Ctx) TxFile( return err } hsh := MTHNew(0, 0) - _, size, err := ctx.Tx( + _, size, pktName, err := ctx.Tx( node, pkt, nice, 0, minSize, maxSize, io.TeeReader(lr, hsh), @@ -494,6 +495,7 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", path}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -541,7 +543,7 @@ func (ctx *Ctx) TxFile( return err } metaPktSize := int64(buf.Len()) - _, _, err = ctx.Tx( + _, _, pktName, err := ctx.Tx( node, pkt, nice, @@ -555,6 +557,7 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", path}, {"Size", metaPktSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -593,7 +596,9 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) + _, _, pktName, err := ctx.Tx( + node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil, + ) les := LEs{ {"Type", "freq"}, {"Node", node.Id}, @@ -601,6 +606,7 @@ func (ctx *Ctx) TxFreq( {"ReplyNice", int(replyNice)}, {"Src", srcPath}, {"Dst", dstPath}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -657,7 +663,9 @@ func (ctx *Ctx) TxExec( }(in) in = pr } - _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) + _, size, pktName, err := ctx.Tx( + node, pkt, nice, 0, minSize, maxSize, in, handle, areaId, + ) if !noCompress { e := <-compressErr if err == nil { @@ -672,6 +680,7 @@ func (ctx *Ctx) TxExec( {"ReplyNice", int(replyNice)}, {"Dst", dst}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -735,25 +744,28 @@ func (ctx *Ctx) TxACK( nice uint8, hsh string, minSize int64, -) error { +) (pktName string, err error) { hshRaw, err := Base32Codec.DecodeString(hsh) if err != nil { - return err + return "", err } if len(hshRaw) != MTHSize { - return errors.New("Invalid packet id size") + return "", errors.New("Invalid packet id size") } pkt, err := NewPkt(PktTypeACK, nice, []byte(hshRaw)) if err != nil { - return err + return "", err } src := bytes.NewReader([]byte{}) - _, _, err = ctx.Tx(node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil) + _, _, pktName, err = ctx.Tx( + node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil, + ) les := LEs{ {"Type", "ack"}, {"Node", node.Id}, {"Nice", int(nice)}, {"Pkt", hsh}, + {"NewPkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf("ACK to %s of %s is sent", ctx.NodeName(node.Id), hsh) @@ -763,5 +775,5 @@ func (ctx *Ctx) TxACK( } else { ctx.LogE("tx", les, err, logMsg) } - return err + return } diff --git a/src/tx_test.go b/src/tx_test.go index 507bc8b..9f8d750 100644 --- a/src/tx_test.go +++ b/src/tx_test.go @@ -87,7 +87,7 @@ func TestTx(t *testing.T) { } pkt, err := NewPkt(PktTypeExec, replyNice, []byte(pathSrc)) src := bytes.NewReader(data) - dstNode, _, err := ctx.Tx( + dstNode, _, _, err := ctx.Tx( nodeTgt, pkt, 123,