X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftx.go;h=4edbea4e849eb183002f9f77e8d2f2c5ab626f98;hb=cf9363f956cb2d93a581c11ed65c5b02910d10d5;hp=0f7d2b8d9369627d4cbf52eb63b996b5874a22a5;hpb=b47dbfe6687569650fa544a4ecf3e4ea388390cb;p=nncp.git diff --git a/src/tx.go b/src/tx.go index 0f7d2b8..4edbea4 100644 --- a/src/tx.go +++ b/src/tx.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2023 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -57,12 +57,12 @@ func (ctx *Ctx) Tx( src io.Reader, pktName string, areaId *AreaId, -) (*Node, int64, error) { +) (*Node, int64, string, error) { var area *Area if areaId != nil { area = ctx.AreaId2Area[*areaId] if area.Prv == nil { - return nil, 0, errors.New("area has no encryption keys") + return nil, 0, "", errors.New("area has no encryption keys") } } hops := make([]*Node, 0, 1+len(node.Via)) @@ -82,15 +82,15 @@ func (ctx *Ctx) Tx( expectedSize += sizePadCalc(expectedSize, minSize, wrappers) expectedSize = PktEncOverhead + sizeWithTags(expectedSize) if maxSize != 0 && expectedSize > maxSize { - return nil, 0, TooBig + return nil, 0, "", TooBig } if !ctx.IsEnoughSpace(expectedSize) { - return nil, 0, errors.New("is not enough space") + return nil, 0, "", errors.New("is not enough space") } } tmp, err := ctx.NewTmpFileWHash() if err != nil { - return nil, 0, err + return nil, 0, "", err } results := make(chan PktEncWriteResult) @@ -200,26 +200,31 @@ func (ctx *Ctx) Tx( }() var pktEncRaw []byte var pktEncMsg []byte + var payloadSize int64 if area != nil { - pktEncMsg = (<-results).pktEncRaw + r := <-results + payloadSize = r.size + pktEncMsg = r.pktEncRaw + wrappers-- } - var finalSize int64 for i := 0; i <= wrappers; i++ { r := <-results if r.err != nil { tmp.Fd.Close() - return nil, 0, err + return nil, 0, "", r.err } if r.pktEncRaw != nil { - finalSize = r.size pktEncRaw = r.pktEncRaw + if payloadSize == 0 { + payloadSize = r.size + } } } nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) if err != nil { - return lastNode, 0, err + return lastNode, 0, "", err } if ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) @@ -250,18 +255,18 @@ func (ctx *Ctx) Tx( } if err = ensureDir(seenDir); err != nil { ctx.LogE("tx-mkdir", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } if fd, err := os.Create(seenPath); err == nil { fd.Close() if err = DirSync(seenDir); err != nil { ctx.LogE("tx-dirsync", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } } ctx.LogI("tx-area", les, logMsg) } - return lastNode, finalSize, err + return lastNode, payloadSize, tmp.Checksum(), err } type DummyCloser struct{} @@ -325,11 +330,11 @@ func prepareTxFile(srcPath string) ( if err != nil { return err } - if info.IsDir() { + if info.Mode().IsDir() { // directory header, PAX record header+contents srcSize += TarBlockSize + 2*TarBlockSize dirs = append(dirs, einfo{path: path, modTime: info.ModTime()}) - } else { + } else if info.Mode().IsRegular() { // file header, PAX record header+contents, file content srcSize += TarBlockSize + 2*TarBlockSize + info.Size() if n := info.Size() % TarBlockSize; n != 0 { @@ -383,7 +388,9 @@ func prepareTxFile(srcPath string) ( fd.Close() return w.CloseWithError(err) } - if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil { + if _, err = io.Copy( + tarWr, bufio.NewReaderSize(fd, MTHBlockSize), + ); err != nil { fd.Close() return w.CloseWithError(err) } @@ -433,10 +440,10 @@ func (ctx *Ctx) TxFile( if err != nil { return err } - _, finalSize, err := ctx.Tx( + _, finalSize, pktName, err := ctx.Tx( node, pkt, nice, srcSize, minSize, maxSize, - bufio.NewReader(reader), dstPath, areaId, + bufio.NewReaderSize(reader, MTHBlockSize), dstPath, areaId, ) les := LEs{ {"Type", "file"}, @@ -445,10 +452,11 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", dstPath}, {"Size", finalSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "File %s (%s) sent to %s:%s", + "File %s (%s) is sent to %s:%s", srcPath, humanize.IBytes(uint64(finalSize)), ctx.NodeName(node.Id), @@ -463,7 +471,7 @@ func (ctx *Ctx) TxFile( return err } - br := bufio.NewReader(reader) + br := bufio.NewReaderSize(reader, MTHBlockSize) var sizeFull int64 var chunkNum int checksums := [][MTHSize]byte{} @@ -475,7 +483,7 @@ func (ctx *Ctx) TxFile( return err } hsh := MTHNew(0, 0) - _, size, err := ctx.Tx( + _, size, pktName, err := ctx.Tx( node, pkt, nice, 0, minSize, maxSize, io.TeeReader(lr, hsh), @@ -489,10 +497,11 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", path}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "File %s (%s) sent to %s:%s", + "File %s (%s) is sent to %s:%s", srcPath, humanize.IBytes(uint64(size)), ctx.NodeName(node.Id), @@ -536,7 +545,7 @@ func (ctx *Ctx) TxFile( return err } metaPktSize := int64(buf.Len()) - _, _, err = ctx.Tx( + _, _, pktName, err := ctx.Tx( node, pkt, nice, @@ -550,10 +559,11 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", path}, {"Size", metaPktSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "File %s (%s) sent to %s:%s", + "File %s (%s) is sent to %s:%s", srcPath, humanize.IBytes(uint64(metaPktSize)), ctx.NodeName(node.Id), @@ -588,7 +598,9 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) + _, _, pktName, err := ctx.Tx( + node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil, + ) les := LEs{ {"Type", "freq"}, {"Node", node.Id}, @@ -596,10 +608,11 @@ func (ctx *Ctx) TxFreq( {"ReplyNice", int(replyNice)}, {"Src", srcPath}, {"Dst", dstPath}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "File request from %s:%s to %s sent", + "File request from %s:%s to %s is sent", ctx.NodeName(node.Id), srcPath, dstPath, ) @@ -652,7 +665,9 @@ func (ctx *Ctx) TxExec( }(in) in = pr } - _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) + _, size, pktName, err := ctx.Tx( + node, pkt, nice, 0, minSize, maxSize, in, handle, areaId, + ) if !noCompress { e := <-compressErr if err == nil { @@ -667,10 +682,11 @@ func (ctx *Ctx) TxExec( {"ReplyNice", int(replyNice)}, {"Dst", dst}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "Exec sent to %s@%s (%s)", + "Exec is sent to %s@%s (%s)", ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), ) } @@ -724,3 +740,42 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) return err } + +func (ctx *Ctx) TxACK( + node *Node, + nice uint8, + hsh string, + minSize int64, +) (pktName string, err error) { + hshRaw, err := Base32Codec.DecodeString(hsh) + if err != nil { + return "", err + } + if len(hshRaw) != MTHSize { + return "", errors.New("Invalid packet id size") + } + pkt, err := NewPkt(PktTypeACK, nice, []byte(hshRaw)) + if err != nil { + return "", err + } + src := bytes.NewReader([]byte{}) + _, _, pktName, err = ctx.Tx( + node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil, + ) + les := LEs{ + {"Type", "ack"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Pkt", hsh}, + {"NewPkt", pktName}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf("ACK to %s of %s is sent", ctx.NodeName(node.Id), hsh) + } + if err == nil { + ctx.LogI("tx", les, logMsg) + } else { + ctx.LogE("tx", les, err, logMsg) + } + return +}