From: Sergey Matveev Date: Fri, 4 Mar 2022 13:34:28 +0000 (+0300) Subject: Generate list of created ACKs X-Git-Tag: v8.7.0^2 X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=commitdiff_plain;h=9eee562a50358d2f52449e80c8c47116d34f9415 Generate list of created ACKs --- diff --git a/doc/cmd/nncp-ack.texi b/doc/cmd/nncp-ack.texi index 5635cc1..a3b374d 100644 --- a/doc/cmd/nncp-ack.texi +++ b/doc/cmd/nncp-ack.texi @@ -7,6 +7,8 @@ $ nncp-ack [options] -all $ nncp-ack [options] -node NODE[,@dots{}] $ nncp-ack [options] -node NODE -pkt PKT + +$ nncp-ack [@dots{}] 4>&1 >&2 | nncp-rm [@dots{}] -pkt @end example Send acknowledgement of successful @option{PKT} (Base32-encoded hash) @@ -14,6 +16,11 @@ packet receipt from @option{NODE} node. If no @option{-pkt} is specified, then acknowledge all packet in node's @code{rx} outbound spool. If @option{-all} is specified, then do that for all nodes. +That commands outputs list of created encrypted ACK packets +(@code{NODE/PKT}) to @strong{4}th file descriptor. That output can be +passed for example to @command{@ref{nncp-rm}} to remove them after +transmission to not wait for acknowledgement and retransmission. + General workflow with acknowledgement is following, assuming that Alice has some outbound packets for Bob: @@ -33,16 +40,24 @@ bob$ nncp-xfer -rx /mnt/shared That will also check if copied packets checksum is not mismatched. -@item Create ACK packets of received ones: +@item Create ACK packets of received ones, saving the list of encrypted + ACK packets: @example -bob$ nncp-ack -node alice +bob$ nncp-ack -node alice 4>acks @end example @item Send those newly created packets back to Alice: @example -bob$ nncp-xfer -tx /mnt/shared +bob$ nncp-xfer [-keep] -tx /mnt/shared +@end example + +@item Remove them from outbound spool, because we expect no + acknowledgement for them: + +@example +bob$ nncp-rm -node alice -pkt maxSize { - return nil, 0, TooBig + return nil, 0, "", TooBig } if !ctx.IsEnoughSpace(expectedSize) { - return nil, 0, errors.New("is not enough space") + return nil, 0, "", errors.New("is not enough space") } } tmp, err := ctx.NewTmpFileWHash() if err != nil { - return nil, 0, err + return nil, 0, "", err } results := make(chan PktEncWriteResult) @@ -211,7 +211,7 @@ func (ctx *Ctx) Tx( r := <-results if r.err != nil { tmp.Fd.Close() - return nil, 0, r.err + return nil, 0, "", r.err } if r.pktEncRaw != nil { pktEncRaw = r.pktEncRaw @@ -224,7 +224,7 @@ func (ctx *Ctx) Tx( err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) if err != nil { - return lastNode, 0, err + return lastNode, 0, "", err } if ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) @@ -255,18 +255,18 @@ func (ctx *Ctx) Tx( } if err = ensureDir(seenDir); err != nil { ctx.LogE("tx-mkdir", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } if fd, err := os.Create(seenPath); err == nil { fd.Close() if err = DirSync(seenDir); err != nil { ctx.LogE("tx-dirsync", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } } ctx.LogI("tx-area", les, logMsg) } - return lastNode, payloadSize, err + return lastNode, payloadSize, tmp.Checksum(), err } type DummyCloser struct{} @@ -438,7 +438,7 @@ func (ctx *Ctx) TxFile( if err != nil { return err } - _, finalSize, err := ctx.Tx( + _, finalSize, pktName, err := ctx.Tx( node, pkt, nice, srcSize, minSize, maxSize, bufio.NewReader(reader), dstPath, areaId, @@ -450,6 +450,7 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", dstPath}, {"Size", finalSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -480,7 +481,7 @@ func (ctx *Ctx) TxFile( return err } hsh := MTHNew(0, 0) - _, size, err := ctx.Tx( + _, size, pktName, err := ctx.Tx( node, pkt, nice, 0, minSize, maxSize, io.TeeReader(lr, hsh), @@ -494,6 +495,7 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", path}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -541,7 +543,7 @@ func (ctx *Ctx) TxFile( return err } metaPktSize := int64(buf.Len()) - _, _, err = ctx.Tx( + _, _, pktName, err := ctx.Tx( node, pkt, nice, @@ -555,6 +557,7 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", path}, {"Size", metaPktSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -593,7 +596,9 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) + _, _, pktName, err := ctx.Tx( + node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil, + ) les := LEs{ {"Type", "freq"}, {"Node", node.Id}, @@ -601,6 +606,7 @@ func (ctx *Ctx) TxFreq( {"ReplyNice", int(replyNice)}, {"Src", srcPath}, {"Dst", dstPath}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -657,7 +663,9 @@ func (ctx *Ctx) TxExec( }(in) in = pr } - _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) + _, size, pktName, err := ctx.Tx( + node, pkt, nice, 0, minSize, maxSize, in, handle, areaId, + ) if !noCompress { e := <-compressErr if err == nil { @@ -672,6 +680,7 @@ func (ctx *Ctx) TxExec( {"ReplyNice", int(replyNice)}, {"Dst", dst}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -735,25 +744,28 @@ func (ctx *Ctx) TxACK( nice uint8, hsh string, minSize int64, -) error { +) (pktName string, err error) { hshRaw, err := Base32Codec.DecodeString(hsh) if err != nil { - return err + return "", err } if len(hshRaw) != MTHSize { - return errors.New("Invalid packet id size") + return "", errors.New("Invalid packet id size") } pkt, err := NewPkt(PktTypeACK, nice, []byte(hshRaw)) if err != nil { - return err + return "", err } src := bytes.NewReader([]byte{}) - _, _, err = ctx.Tx(node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil) + _, _, pktName, err = ctx.Tx( + node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil, + ) les := LEs{ {"Type", "ack"}, {"Node", node.Id}, {"Nice", int(nice)}, {"Pkt", hsh}, + {"NewPkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf("ACK to %s of %s is sent", ctx.NodeName(node.Id), hsh) @@ -763,5 +775,5 @@ func (ctx *Ctx) TxACK( } else { ctx.LogE("tx", les, err, logMsg) } - return err + return } diff --git a/src/tx_test.go b/src/tx_test.go index 507bc8b..9f8d750 100644 --- a/src/tx_test.go +++ b/src/tx_test.go @@ -87,7 +87,7 @@ func TestTx(t *testing.T) { } pkt, err := NewPkt(PktTypeExec, replyNice, []byte(pathSrc)) src := bytes.NewReader(data) - dstNode, _, err := ctx.Tx( + dstNode, _, _, err := ctx.Tx( nodeTgt, pkt, 123,