]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/tx.go
MTH
[nncp.git] / src / tx.go
index c89957b333884a245793d49c6c72001b51b603aa..20482efb234bc5a99e4b2fc5b792d7a976ff8c3a 100644 (file)
--- a/src/tx.go
+++ b/src/tx.go
@@ -23,6 +23,7 @@ import (
        "bytes"
        "crypto/rand"
        "errors"
+       "fmt"
        "hash"
        "io"
        "io/ioutil"
@@ -33,8 +34,8 @@ import (
        "time"
 
        xdr "github.com/davecgh/go-xdr/xdr2"
+       "github.com/dustin/go-humanize"
        "github.com/klauspost/compress/zstd"
-       "golang.org/x/crypto/blake2b"
        "golang.org/x/crypto/chacha20poly1305"
 )
 
@@ -62,7 +63,9 @@ func (ctx *Ctx) Tx(
        }
        expectedSize := size
        for i := 0; i < len(hops); i++ {
-               expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
+               expectedSize = PktEncOverhead +
+                       PktSizeOverhead +
+                       sizeWithTags(PktOverhead+expectedSize)
        }
        padSize := minSize - expectedSize
        if padSize < 0 {
@@ -79,16 +82,30 @@ func (ctx *Ctx) Tx(
        errs := make(chan error)
        curSize := size
        pipeR, pipeW := io.Pipe()
+       var pktEncRaw []byte
        go func(size int64, src io.Reader, dst io.WriteCloser) {
                ctx.LogD("tx", LEs{
                        {"Node", hops[0].Id},
                        {"Nice", int(nice)},
                        {"Size", size},
-               }, "wrote")
-               errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
+               }, func(les LEs) string {
+                       return fmt.Sprintf(
+                               "Tx packet to %s (%s) nice: %s",
+                               ctx.NodeName(hops[0].Id),
+                               humanize.IBytes(uint64(size)),
+                               NicenessFmt(nice),
+                       )
+               })
+               pktEncRaw, err = PktEncWrite(
+                       ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
+               )
+               errs <- err
                dst.Close() // #nosec G104
        }(curSize, src, pipeW)
-       curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
+       curSize = PktEncOverhead +
+               PktSizeOverhead +
+               sizeWithTags(PktOverhead+curSize) +
+               padSize
 
        var pipeRPrev io.Reader
        for i := 1; i < len(hops); i++ {
@@ -100,8 +117,16 @@ func (ctx *Ctx) Tx(
                                {"Node", node.Id},
                                {"Nice", int(nice)},
                                {"Size", size},
-                       }, "trns wrote")
-                       errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+                       }, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tx trns packet to %s (%s) nice: %s",
+                                       ctx.NodeName(node.Id),
+                                       humanize.IBytes(uint64(size)),
+                                       NicenessFmt(nice),
+                               )
+                       })
+                       _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+                       errs <- err
                        dst.Close() // #nosec G104
                }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
                curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
@@ -124,6 +149,12 @@ func (ctx *Ctx) Tx(
        nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
        err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
        os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
+       if err != nil {
+               return lastNode, err
+       }
+       if ctx.HdrUsage {
+               ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
+       }
        return lastNode, err
 }
 
@@ -355,23 +386,32 @@ func (ctx *Ctx) TxFile(
                        {"Dst", dstPath},
                        {"Size", fileSize},
                }
+               logMsg := func(les LEs) string {
+                       return fmt.Sprintf(
+                               "File %s (%s) sent to %s:%s",
+                               srcPath,
+                               humanize.IBytes(uint64(fileSize)),
+                               ctx.NodeName(node.Id),
+                               dstPath,
+                       )
+               }
                if err == nil {
-                       ctx.LogI("tx", les, "sent")
+                       ctx.LogI("tx", les, logMsg)
                } else {
-                       ctx.LogE("tx", les, err, "sent")
+                       ctx.LogE("tx", les, err, logMsg)
                }
                return err
        }
 
        leftSize := fileSize
        metaPkt := ChunkedMeta{
-               Magic:     MagicNNCPMv1,
+               Magic:     MagicNNCPMv2,
                FileSize:  uint64(fileSize),
                ChunkSize: uint64(chunkSize),
-               Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
+               Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
        }
        for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
-               hsh := new([32]byte)
+               hsh := new([MTHSize]byte)
                metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
        }
        var sizeToSend int64
@@ -390,10 +430,7 @@ func (ctx *Ctx) TxFile(
                if err != nil {
                        return err
                }
-               hsh, err = blake2b.New256(nil)
-               if err != nil {
-                       return err
-               }
+               hsh = MTHNew(0, 0)
                _, err = ctx.Tx(
                        node,
                        pkt,
@@ -411,10 +448,19 @@ func (ctx *Ctx) TxFile(
                        {"Dst", path},
                        {"Size", sizeToSend},
                }
+               logMsg := func(les LEs) string {
+                       return fmt.Sprintf(
+                               "File %s (%s) sent to %s:%s",
+                               srcPath,
+                               humanize.IBytes(uint64(sizeToSend)),
+                               ctx.NodeName(node.Id),
+                               path,
+                       )
+               }
                if err == nil {
-                       ctx.LogI("tx", les, "sent")
+                       ctx.LogI("tx", les, logMsg)
                } else {
-                       ctx.LogE("tx", les, err, "sent")
+                       ctx.LogE("tx", les, err, logMsg)
                        return err
                }
                hsh.Sum(metaPkt.Checksums[chunkNum][:0])
@@ -444,10 +490,19 @@ func (ctx *Ctx) TxFile(
                {"Dst", path},
                {"Size", metaPktSize},
        }
+       logMsg := func(les LEs) string {
+               return fmt.Sprintf(
+                       "File %s (%s) sent to %s:%s",
+                       srcPath,
+                       humanize.IBytes(uint64(metaPktSize)),
+                       ctx.NodeName(node.Id),
+                       path,
+               )
+       }
        if err == nil {
-               ctx.LogI("tx", les, "sent")
+               ctx.LogI("tx", les, logMsg)
        } else {
-               ctx.LogE("tx", les, err, "sent")
+               ctx.LogE("tx", les, err, logMsg)
        }
        return err
 }
@@ -480,10 +535,17 @@ func (ctx *Ctx) TxFreq(
                {"Src", srcPath},
                {"Dst", dstPath},
        }
+       logMsg := func(les LEs) string {
+               return fmt.Sprintf(
+                       "File request from %s:%s to %s sent",
+                       ctx.NodeName(node.Id), srcPath,
+                       dstPath,
+               )
+       }
        if err == nil {
-               ctx.LogI("tx", les, "sent")
+               ctx.LogI("tx", les, logMsg)
        } else {
-               ctx.LogE("tx", les, err, "sent")
+               ctx.LogE("tx", les, err, logMsg)
        }
        return err
 }
@@ -583,18 +645,25 @@ func (ctx *Ctx) TxExec(
                _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
        }
 
+       dst := strings.Join(append([]string{handle}, args...), " ")
        les := LEs{
                {"Type", "exec"},
                {"Node", node.Id},
                {"Nice", int(nice)},
                {"ReplyNice", int(replyNice)},
-               {"Dst", strings.Join(append([]string{handle}, args...), " ")},
+               {"Dst", dst},
                {"Size", size},
        }
+       logMsg := func(les LEs) string {
+               return fmt.Sprintf(
+                       "Exec sent to %s@%s (%s)",
+                       ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
+               )
+       }
        if err == nil {
-               ctx.LogI("tx", les, "sent")
+               ctx.LogI("tx", les, logMsg)
        } else {
-               ctx.LogE("tx", les, err, "sent")
+               ctx.LogE("tx", les, err, logMsg)
        }
        return err
 }
@@ -606,10 +675,18 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error
                {"Nice", int(nice)},
                {"Size", size},
        }
-       ctx.LogD("tx", les, "taken")
+       logMsg := func(les LEs) string {
+               return fmt.Sprintf(
+                       "Transitional packet to %s (%s) (nice %s)",
+                       ctx.NodeName(node.Id),
+                       humanize.IBytes(uint64(size)),
+                       NicenessFmt(nice),
+               )
+       }
+       ctx.LogD("tx", les, logMsg)
        if !ctx.IsEnoughSpace(size) {
                err := errors.New("is not enough space")
-               ctx.LogE("tx", les, err, err.Error())
+               ctx.LogE("tx", les, err, logMsg)
                return err
        }
        tmp, err := ctx.NewTmpFileWHash()
@@ -626,9 +703,9 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error
        nodePath := filepath.Join(ctx.Spool, node.Id.String())
        err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
        if err == nil {
-               ctx.LogI("tx", les, "sent")
+               ctx.LogI("tx", les, logMsg)
        } else {
-               ctx.LogI("tx", append(les, LE{"Err", err}), "sent")
+               ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
        }
        os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
        return err