]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/tx.go
Use explicitly larger bufio's buffer
[nncp.git] / src / tx.go
index b8d47aa6148ae87ad542b20236c67a106130f404..f7f099b39f3239c34979388be784393c694cfda8 100644 (file)
--- a/src/tx.go
+++ b/src/tx.go
@@ -1,6 +1,6 @@
 /*
 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
 
 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
@@ -57,12 +57,12 @@ func (ctx *Ctx) Tx(
        src io.Reader,
        pktName string,
        areaId *AreaId,
-) (*Node, int64, error) {
+) (*Node, int64, string, error) {
        var area *Area
        if areaId != nil {
                area = ctx.AreaId2Area[*areaId]
                if area.Prv == nil {
-                       return nil, 0, errors.New("area has no encryption keys")
+                       return nil, 0, "", errors.New("area has no encryption keys")
                }
        }
        hops := make([]*Node, 0, 1+len(node.Via))
@@ -82,15 +82,15 @@ func (ctx *Ctx) Tx(
                expectedSize += sizePadCalc(expectedSize, minSize, wrappers)
                expectedSize = PktEncOverhead + sizeWithTags(expectedSize)
                if maxSize != 0 && expectedSize > maxSize {
-                       return nil, 0, TooBig
+                       return nil, 0, "", TooBig
                }
                if !ctx.IsEnoughSpace(expectedSize) {
-                       return nil, 0, errors.New("is not enough space")
+                       return nil, 0, "", errors.New("is not enough space")
                }
        }
        tmp, err := ctx.NewTmpFileWHash()
        if err != nil {
-               return nil, 0, err
+               return nil, 0, "", err
        }
 
        results := make(chan PktEncWriteResult)
@@ -205,12 +205,13 @@ func (ctx *Ctx) Tx(
                r := <-results
                payloadSize = r.size
                pktEncMsg = r.pktEncRaw
+               wrappers--
        }
        for i := 0; i <= wrappers; i++ {
                r := <-results
                if r.err != nil {
                        tmp.Fd.Close()
-                       return nil, 0, err
+                       return nil, 0, "", r.err
                }
                if r.pktEncRaw != nil {
                        pktEncRaw = r.pktEncRaw
@@ -223,7 +224,7 @@ func (ctx *Ctx) Tx(
        err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
        os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
        if err != nil {
-               return lastNode, 0, err
+               return lastNode, 0, "", err
        }
        if ctx.HdrUsage {
                ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
@@ -254,18 +255,18 @@ func (ctx *Ctx) Tx(
                }
                if err = ensureDir(seenDir); err != nil {
                        ctx.LogE("tx-mkdir", les, err, logMsg)
-                       return lastNode, 0, err
+                       return lastNode, 0, "", err
                }
                if fd, err := os.Create(seenPath); err == nil {
                        fd.Close()
                        if err = DirSync(seenDir); err != nil {
                                ctx.LogE("tx-dirsync", les, err, logMsg)
-                               return lastNode, 0, err
+                               return lastNode, 0, "", err
                        }
                }
                ctx.LogI("tx-area", les, logMsg)
        }
-       return lastNode, payloadSize, err
+       return lastNode, payloadSize, tmp.Checksum(), err
 }
 
 type DummyCloser struct{}
@@ -329,11 +330,11 @@ func prepareTxFile(srcPath string) (
                if err != nil {
                        return err
                }
-               if info.IsDir() {
+               if info.Mode().IsDir() {
                        // directory header, PAX record header+contents
                        srcSize += TarBlockSize + 2*TarBlockSize
                        dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
-               } else {
+               } else if info.Mode().IsRegular() {
                        // file header, PAX record header+contents, file content
                        srcSize += TarBlockSize + 2*TarBlockSize + info.Size()
                        if n := info.Size() % TarBlockSize; n != 0 {
@@ -387,7 +388,9 @@ func prepareTxFile(srcPath string) (
                                fd.Close()
                                return w.CloseWithError(err)
                        }
-                       if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
+                       if _, err = io.Copy(
+                               tarWr, bufio.NewReaderSize(fd, MTHBlockSize),
+                       ); err != nil {
                                fd.Close()
                                return w.CloseWithError(err)
                        }
@@ -437,10 +440,10 @@ func (ctx *Ctx) TxFile(
                if err != nil {
                        return err
                }
-               _, finalSize, err := ctx.Tx(
+               _, finalSize, pktName, err := ctx.Tx(
                        node, pkt, nice,
                        srcSize, minSize, maxSize,
-                       bufio.NewReader(reader), dstPath, areaId,
+                       bufio.NewReaderSize(reader, MTHBlockSize), dstPath, areaId,
                )
                les := LEs{
                        {"Type", "file"},
@@ -449,10 +452,11 @@ func (ctx *Ctx) TxFile(
                        {"Src", srcPath},
                        {"Dst", dstPath},
                        {"Size", finalSize},
+                       {"Pkt", pktName},
                }
                logMsg := func(les LEs) string {
                        return fmt.Sprintf(
-                               "File %s (%s) sent to %s:%s",
+                               "File %s (%s) is sent to %s:%s",
                                srcPath,
                                humanize.IBytes(uint64(finalSize)),
                                ctx.NodeName(node.Id),
@@ -467,7 +471,7 @@ func (ctx *Ctx) TxFile(
                return err
        }
 
-       br := bufio.NewReader(reader)
+       br := bufio.NewReaderSize(reader, MTHBlockSize)
        var sizeFull int64
        var chunkNum int
        checksums := [][MTHSize]byte{}
@@ -479,7 +483,7 @@ func (ctx *Ctx) TxFile(
                        return err
                }
                hsh := MTHNew(0, 0)
-               _, size, err := ctx.Tx(
+               _, size, pktName, err := ctx.Tx(
                        node, pkt, nice,
                        0, minSize, maxSize,
                        io.TeeReader(lr, hsh),
@@ -493,10 +497,11 @@ func (ctx *Ctx) TxFile(
                        {"Src", srcPath},
                        {"Dst", path},
                        {"Size", size},
+                       {"Pkt", pktName},
                }
                logMsg := func(les LEs) string {
                        return fmt.Sprintf(
-                               "File %s (%s) sent to %s:%s",
+                               "File %s (%s) is sent to %s:%s",
                                srcPath,
                                humanize.IBytes(uint64(size)),
                                ctx.NodeName(node.Id),
@@ -540,7 +545,7 @@ func (ctx *Ctx) TxFile(
                return err
        }
        metaPktSize := int64(buf.Len())
-       _, _, err = ctx.Tx(
+       _, _, pktName, err := ctx.Tx(
                node,
                pkt,
                nice,
@@ -554,10 +559,11 @@ func (ctx *Ctx) TxFile(
                {"Src", srcPath},
                {"Dst", path},
                {"Size", metaPktSize},
+               {"Pkt", pktName},
        }
        logMsg := func(les LEs) string {
                return fmt.Sprintf(
-                       "File %s (%s) sent to %s:%s",
+                       "File %s (%s) is sent to %s:%s",
                        srcPath,
                        humanize.IBytes(uint64(metaPktSize)),
                        ctx.NodeName(node.Id),
@@ -592,7 +598,9 @@ func (ctx *Ctx) TxFreq(
        }
        src := strings.NewReader(dstPath)
        size := int64(src.Len())
-       _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil)
+       _, _, pktName, err := ctx.Tx(
+               node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil,
+       )
        les := LEs{
                {"Type", "freq"},
                {"Node", node.Id},
@@ -600,10 +608,11 @@ func (ctx *Ctx) TxFreq(
                {"ReplyNice", int(replyNice)},
                {"Src", srcPath},
                {"Dst", dstPath},
+               {"Pkt", pktName},
        }
        logMsg := func(les LEs) string {
                return fmt.Sprintf(
-                       "File request from %s:%s to %s sent",
+                       "File request from %s:%s to %s is sent",
                        ctx.NodeName(node.Id), srcPath,
                        dstPath,
                )
@@ -656,7 +665,9 @@ func (ctx *Ctx) TxExec(
                }(in)
                in = pr
        }
-       _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId)
+       _, size, pktName, err := ctx.Tx(
+               node, pkt, nice, 0, minSize, maxSize, in, handle, areaId,
+       )
        if !noCompress {
                e := <-compressErr
                if err == nil {
@@ -671,10 +682,11 @@ func (ctx *Ctx) TxExec(
                {"ReplyNice", int(replyNice)},
                {"Dst", dst},
                {"Size", size},
+               {"Pkt", pktName},
        }
        logMsg := func(les LEs) string {
                return fmt.Sprintf(
-                       "Exec sent to %s@%s (%s)",
+                       "Exec is sent to %s@%s (%s)",
                        ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
                )
        }
@@ -728,3 +740,42 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error
        os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
        return err
 }
+
+func (ctx *Ctx) TxACK(
+       node *Node,
+       nice uint8,
+       hsh string,
+       minSize int64,
+) (pktName string, err error) {
+       hshRaw, err := Base32Codec.DecodeString(hsh)
+       if err != nil {
+               return "", err
+       }
+       if len(hshRaw) != MTHSize {
+               return "", errors.New("Invalid packet id size")
+       }
+       pkt, err := NewPkt(PktTypeACK, nice, []byte(hshRaw))
+       if err != nil {
+               return "", err
+       }
+       src := bytes.NewReader([]byte{})
+       _, _, pktName, err = ctx.Tx(
+               node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil,
+       )
+       les := LEs{
+               {"Type", "ack"},
+               {"Node", node.Id},
+               {"Nice", int(nice)},
+               {"Pkt", hsh},
+               {"NewPkt", pktName},
+       }
+       logMsg := func(les LEs) string {
+               return fmt.Sprintf("ACK to %s of %s is sent", ctx.NodeName(node.Id), hsh)
+       }
+       if err == nil {
+               ctx.LogI("tx", les, logMsg)
+       } else {
+               ctx.LogE("tx", les, err, logMsg)
+       }
+       return
+}