X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftx.go;h=f7f099b39f3239c34979388be784393c694cfda8;hb=c2b17fd0fc439e420710f53602604e8823c2fb1d;hp=fd4e0fbdec5182a7f689731de9a3591e15eb80ae;hpb=c8efe853c5381ea453532e577ff6b3a2c0a947dc;p=nncp.git diff --git a/src/tx.go b/src/tx.go index fd4e0fb..f7f099b 100644 --- a/src/tx.go +++ b/src/tx.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -57,12 +57,12 @@ func (ctx *Ctx) Tx( src io.Reader, pktName string, areaId *AreaId, -) (*Node, int64, error) { +) (*Node, int64, string, error) { var area *Area if areaId != nil { area = ctx.AreaId2Area[*areaId] if area.Prv == nil { - return nil, 0, errors.New("area has no encryption keys") + return nil, 0, "", errors.New("area has no encryption keys") } } hops := make([]*Node, 0, 1+len(node.Via)) @@ -82,15 +82,15 @@ func (ctx *Ctx) Tx( expectedSize += sizePadCalc(expectedSize, minSize, wrappers) expectedSize = PktEncOverhead + sizeWithTags(expectedSize) if maxSize != 0 && expectedSize > maxSize { - return nil, 0, TooBig + return nil, 0, "", TooBig } if !ctx.IsEnoughSpace(expectedSize) { - return nil, 0, errors.New("is not enough space") + return nil, 0, "", errors.New("is not enough space") } } tmp, err := ctx.NewTmpFileWHash() if err != nil { - return nil, 0, err + return nil, 0, "", err } results := make(chan PktEncWriteResult) @@ -211,7 +211,7 @@ func (ctx *Ctx) Tx( r := <-results if r.err != nil { tmp.Fd.Close() - return nil, 0, err + return nil, 0, "", r.err } if r.pktEncRaw != nil { pktEncRaw = r.pktEncRaw @@ -224,7 +224,7 @@ func (ctx *Ctx) Tx( err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) if err != nil { - return lastNode, 0, err + return lastNode, 0, "", err } if ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) @@ -255,18 +255,18 @@ func (ctx *Ctx) Tx( } if err = ensureDir(seenDir); err != nil { ctx.LogE("tx-mkdir", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } if fd, err := os.Create(seenPath); err == nil { fd.Close() if err = DirSync(seenDir); err != nil { ctx.LogE("tx-dirsync", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } } ctx.LogI("tx-area", les, logMsg) } - return lastNode, payloadSize, err + return lastNode, payloadSize, tmp.Checksum(), err } type DummyCloser struct{} @@ -330,11 +330,11 @@ func prepareTxFile(srcPath string) ( if err != nil { return err } - if info.IsDir() { + if info.Mode().IsDir() { // directory header, PAX record header+contents srcSize += TarBlockSize + 2*TarBlockSize dirs = append(dirs, einfo{path: path, modTime: info.ModTime()}) - } else { + } else if info.Mode().IsRegular() { // file header, PAX record header+contents, file content srcSize += TarBlockSize + 2*TarBlockSize + info.Size() if n := info.Size() % TarBlockSize; n != 0 { @@ -388,7 +388,9 @@ func prepareTxFile(srcPath string) ( fd.Close() return w.CloseWithError(err) } - if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil { + if _, err = io.Copy( + tarWr, bufio.NewReaderSize(fd, MTHBlockSize), + ); err != nil { fd.Close() return w.CloseWithError(err) } @@ -438,10 +440,10 @@ func (ctx *Ctx) TxFile( if err != nil { return err } - _, finalSize, err := ctx.Tx( + _, finalSize, pktName, err := ctx.Tx( node, pkt, nice, srcSize, minSize, maxSize, - bufio.NewReader(reader), dstPath, areaId, + bufio.NewReaderSize(reader, MTHBlockSize), dstPath, areaId, ) les := LEs{ {"Type", "file"}, @@ -450,10 +452,11 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", dstPath}, {"Size", finalSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "File %s (%s) sent to %s:%s", + "File %s (%s) is sent to %s:%s", srcPath, humanize.IBytes(uint64(finalSize)), ctx.NodeName(node.Id), @@ -468,7 +471,7 @@ func (ctx *Ctx) TxFile( return err } - br := bufio.NewReader(reader) + br := bufio.NewReaderSize(reader, MTHBlockSize) var sizeFull int64 var chunkNum int checksums := [][MTHSize]byte{} @@ -480,7 +483,7 @@ func (ctx *Ctx) TxFile( return err } hsh := MTHNew(0, 0) - _, size, err := ctx.Tx( + _, size, pktName, err := ctx.Tx( node, pkt, nice, 0, minSize, maxSize, io.TeeReader(lr, hsh), @@ -494,10 +497,11 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", path}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "File %s (%s) sent to %s:%s", + "File %s (%s) is sent to %s:%s", srcPath, humanize.IBytes(uint64(size)), ctx.NodeName(node.Id), @@ -541,7 +545,7 @@ func (ctx *Ctx) TxFile( return err } metaPktSize := int64(buf.Len()) - _, _, err = ctx.Tx( + _, _, pktName, err := ctx.Tx( node, pkt, nice, @@ -555,10 +559,11 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", path}, {"Size", metaPktSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "File %s (%s) sent to %s:%s", + "File %s (%s) is sent to %s:%s", srcPath, humanize.IBytes(uint64(metaPktSize)), ctx.NodeName(node.Id), @@ -593,7 +598,9 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) + _, _, pktName, err := ctx.Tx( + node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil, + ) les := LEs{ {"Type", "freq"}, {"Node", node.Id}, @@ -601,10 +608,11 @@ func (ctx *Ctx) TxFreq( {"ReplyNice", int(replyNice)}, {"Src", srcPath}, {"Dst", dstPath}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "File request from %s:%s to %s sent", + "File request from %s:%s to %s is sent", ctx.NodeName(node.Id), srcPath, dstPath, ) @@ -657,7 +665,9 @@ func (ctx *Ctx) TxExec( }(in) in = pr } - _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) + _, size, pktName, err := ctx.Tx( + node, pkt, nice, 0, minSize, maxSize, in, handle, areaId, + ) if !noCompress { e := <-compressErr if err == nil { @@ -672,10 +682,11 @@ func (ctx *Ctx) TxExec( {"ReplyNice", int(replyNice)}, {"Dst", dst}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "Exec sent to %s@%s (%s)", + "Exec is sent to %s@%s (%s)", ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), ) } @@ -729,3 +740,42 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) return err } + +func (ctx *Ctx) TxACK( + node *Node, + nice uint8, + hsh string, + minSize int64, +) (pktName string, err error) { + hshRaw, err := Base32Codec.DecodeString(hsh) + if err != nil { + return "", err + } + if len(hshRaw) != MTHSize { + return "", errors.New("Invalid packet id size") + } + pkt, err := NewPkt(PktTypeACK, nice, []byte(hshRaw)) + if err != nil { + return "", err + } + src := bytes.NewReader([]byte{}) + _, _, pktName, err = ctx.Tx( + node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil, + ) + les := LEs{ + {"Type", "ack"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Pkt", hsh}, + {"NewPkt", pktName}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf("ACK to %s of %s is sent", ctx.NodeName(node.Id), hsh) + } + if err == nil { + ctx.LogI("tx", les, logMsg) + } else { + ctx.LogE("tx", les, err, logMsg) + } + return +}