]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/tx.go
Updated github.com/klauspost/compress without snappy dependency
[nncp.git] / src / tx.go
index 3e57f9b865b1a56d99a0623edd602942f17a1dcf..e096ed65f873ddeb48f9ac635a01bbeea8849ba2 100644 (file)
--- a/src/tx.go
+++ b/src/tx.go
@@ -23,6 +23,7 @@ import (
        "bytes"
        "crypto/rand"
        "errors"
+       "fmt"
        "hash"
        "io"
        "io/ioutil"
@@ -33,6 +34,7 @@ import (
        "time"
 
        xdr "github.com/davecgh/go-xdr/xdr2"
+       "github.com/dustin/go-humanize"
        "github.com/klauspost/compress/zstd"
        "golang.org/x/crypto/blake2b"
        "golang.org/x/crypto/chacha20poly1305"
@@ -52,7 +54,15 @@ func (ctx *Ctx) Tx(
        size, minSize int64,
        src io.Reader,
        pktName string,
+       areaId *AreaId,
 ) (*Node, error) {
+       var area *Area
+       if areaId != nil {
+               area = ctx.AreaId2Area[*areaId]
+               if area.Prv == nil {
+                       return nil, errors.New("area has no encryption keys")
+               }
+       }
        hops := make([]*Node, 0, 1+len(node.Via))
        hops = append(hops, node)
        lastNode := node
@@ -61,7 +71,11 @@ func (ctx *Ctx) Tx(
                hops = append(hops, lastNode)
        }
        expectedSize := size
-       for i := 0; i < len(hops); i++ {
+       wrappers := len(hops)
+       if area != nil {
+               wrappers++
+       }
+       for i := 0; i < wrappers; i++ {
                expectedSize = PktEncOverhead +
                        PktSizeOverhead +
                        sizeWithTags(PktOverhead+expectedSize)
@@ -79,29 +93,92 @@ func (ctx *Ctx) Tx(
        }
 
        errs := make(chan error)
+       pktEncRaws := make(chan []byte)
        curSize := size
        pipeR, pipeW := io.Pipe()
-       var pktEncRaw []byte
-       go func(size int64, src io.Reader, dst io.WriteCloser) {
-               ctx.LogD("tx", LEs{
-                       {"Node", hops[0].Id},
-                       {"Nice", int(nice)},
-                       {"Size", size},
-               }, "wrote")
-               pktEncRaw, err = PktEncWrite(
-                       ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
-               )
-               errs <- err
-               dst.Close() // #nosec G104
-       }(curSize, src, pipeW)
-       curSize = PktEncOverhead +
-               PktSizeOverhead +
-               sizeWithTags(PktOverhead+curSize) +
-               padSize
-
        var pipeRPrev io.Reader
+       if area == nil {
+               go func(size int64, src io.Reader, dst io.WriteCloser) {
+                       ctx.LogD("tx", LEs{
+                               {"Node", hops[0].Id},
+                               {"Nice", int(nice)},
+                               {"Size", size},
+                       }, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tx packet to %s (%s) nice: %s",
+                                       ctx.NodeName(hops[0].Id),
+                                       humanize.IBytes(uint64(size)),
+                                       NicenessFmt(nice),
+                               )
+                       })
+                       pktEncRaw, err := PktEncWrite(
+                               ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
+                       )
+                       pktEncRaws <- pktEncRaw
+                       errs <- err
+                       dst.Close()
+               }(curSize, src, pipeW)
+               curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
+               curSize += padSize
+       } else {
+               go func(size, padSize int64, src io.Reader, dst io.WriteCloser) {
+                       ctx.LogD("tx", LEs{
+                               {"Area", area.Id},
+                               {"Nice", int(nice)},
+                               {"Size", size},
+                       }, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tx area packet to %s (%s) nice: %s",
+                                       ctx.AreaName(areaId),
+                                       humanize.IBytes(uint64(size)),
+                                       NicenessFmt(nice),
+                               )
+                       })
+                       areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
+                       copy(areaNode.Id[:], area.Id[:])
+                       copy(areaNode.ExchPub[:], area.Pub[:])
+                       pktEncRaw, err := PktEncWrite(
+                               ctx.Self, &areaNode, pkt, nice, size, padSize, src, dst,
+                       )
+                       pktEncRaws <- pktEncRaw
+                       errs <- err
+                       dst.Close()
+               }(curSize, padSize, src, pipeW)
+               curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
+               curSize += padSize
+               pipeRPrev = pipeR
+               pipeR, pipeW = io.Pipe()
+               go func(size int64, src io.Reader, dst io.WriteCloser) {
+                       pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
+                       if err != nil {
+                               panic(err)
+                       }
+                       ctx.LogD("tx", LEs{
+                               {"Node", hops[0].Id},
+                               {"Nice", int(nice)},
+                               {"Size", size},
+                       }, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tx packet to %s (%s) nice: %s",
+                                       ctx.NodeName(hops[0].Id),
+                                       humanize.IBytes(uint64(size)),
+                                       NicenessFmt(nice),
+                               )
+                       })
+                       pktEncRaw, err := PktEncWrite(
+                               ctx.Self, hops[0], pktArea, nice, size, 0, src, dst,
+                       )
+                       pktEncRaws <- pktEncRaw
+                       errs <- err
+                       dst.Close()
+               }(curSize, pipeRPrev, pipeW)
+               curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
+       }
        for i := 1; i < len(hops); i++ {
-               pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
+               pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
+               if err != nil {
+                       panic(err)
+               }
                pipeRPrev = pipeR
                pipeR, pipeW = io.Pipe()
                go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
@@ -109,10 +186,18 @@ func (ctx *Ctx) Tx(
                                {"Node", node.Id},
                                {"Nice", int(nice)},
                                {"Size", size},
-                       }, "trns wrote")
-                       _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+                       }, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tx trns packet to %s (%s) nice: %s",
+                                       ctx.NodeName(node.Id),
+                                       humanize.IBytes(uint64(size)),
+                                       NicenessFmt(nice),
+                               )
+                       })
+                       pktEncRaw, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+                       pktEncRaws <- pktEncRaw
                        errs <- err
-                       dst.Close() // #nosec G104
+                       dst.Close()
                }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
                curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
        }
@@ -124,22 +209,67 @@ func (ctx *Ctx) Tx(
                )
                errs <- err
        }()
-       for i := 0; i <= len(hops); i++ {
+       var pktEncRaw []byte
+       var pktEncMsg []byte
+       if area != nil {
+               pktEncMsg = <-pktEncRaws
+       }
+       for i := 0; i < len(hops); i++ {
+               pktEncRaw = <-pktEncRaws
+       }
+       for i := 0; i <= wrappers; i++ {
                err = <-errs
                if err != nil {
-                       tmp.Fd.Close() // #nosec G104
+                       tmp.Fd.Close()
                        return nil, err
                }
        }
        nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
        err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
-       os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
+       os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
        if err != nil {
                return lastNode, err
        }
        if ctx.HdrUsage {
                ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
        }
+       if area != nil {
+               msgHashRaw := blake2b.Sum256(pktEncMsg)
+               msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
+               seenDir := filepath.Join(
+                       ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
+               )
+               seenPath := filepath.Join(seenDir, msgHash)
+               les := LEs{
+                       {"Node", node.Id},
+                       {"Nice", int(nice)},
+                       {"Size", size},
+                       {"Area", areaId},
+                       {"AreaMsg", msgHash},
+               }
+               logMsg := func(les LEs) string {
+                       return fmt.Sprintf(
+                               "Tx area packet to %s (%s) nice: %s, area %s: %s",
+                               ctx.NodeName(node.Id),
+                               humanize.IBytes(uint64(size)),
+                               NicenessFmt(nice),
+                               area.Name,
+                               msgHash,
+                       )
+               }
+               if err = ensureDir(seenDir); err != nil {
+                       ctx.LogE("tx-mkdir", les, err, logMsg)
+                       return lastNode, err
+               }
+               if fd, err := os.Create(seenPath); err == nil {
+                       fd.Close()
+                       if err = DirSync(seenDir); err != nil {
+                               ctx.LogE("tx-dirsync", les, err, logMsg)
+                               return lastNode, err
+                       }
+               }
+               ctx.LogI("tx-area", les, logMsg)
+       }
        return lastNode, err
 }
 
@@ -158,7 +288,7 @@ func throughTmpFile(r io.Reader) (
                rerr = err
                return
        }
-       os.Remove(src.Name()) // #nosec G104
+       os.Remove(src.Name())
        tmpW := bufio.NewWriter(src)
        tmpKey := make([]byte, chacha20poly1305.KeySize)
        if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
@@ -170,7 +300,7 @@ func throughTmpFile(r io.Reader) (
                return
        }
        nonce := make([]byte, aead.NonceSize())
-       written, err := aeadProcess(aead, nonce, true, r, tmpW)
+       written, err := aeadProcess(aead, nonce, nil, true, r, tmpW)
        if err != nil {
                rerr = err
                return
@@ -186,8 +316,11 @@ func throughTmpFile(r io.Reader) (
        }
        r, w := io.Pipe()
        go func() {
-               if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
-                       w.CloseWithError(err) // #nosec G104
+               for i := 0; i < aead.NonceSize(); i++ {
+                       nonce[i] = 0
+               }
+               if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil {
+                       w.CloseWithError(err)
                }
        }()
        reader = r
@@ -306,14 +439,14 @@ func prepareTxFile(srcPath string) (
                        }
                        fd, err := os.Open(e.path)
                        if err != nil {
-                               fd.Close() // #nosec G104
+                               fd.Close()
                                return w.CloseWithError(err)
                        }
                        if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
-                               fd.Close() // #nosec G104
+                               fd.Close()
                                return w.CloseWithError(err)
                        }
-                       fd.Close() // #nosec G104
+                       fd.Close()
                }
                if err = tarWr.Close(); err != nil {
                        return w.CloseWithError(err)
@@ -329,6 +462,7 @@ func (ctx *Ctx) TxFile(
        srcPath, dstPath string,
        chunkSize int64,
        minSize, maxSize int64,
+       areaId *AreaId,
 ) error {
        dstPathSpecified := false
        if dstPath == "" {
@@ -362,7 +496,7 @@ func (ctx *Ctx) TxFile(
                if err != nil {
                        return err
                }
-               _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
+               _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath, areaId)
                les := LEs{
                        {"Type", "file"},
                        {"Node", node.Id},
@@ -371,23 +505,32 @@ func (ctx *Ctx) TxFile(
                        {"Dst", dstPath},
                        {"Size", fileSize},
                }
+               logMsg := func(les LEs) string {
+                       return fmt.Sprintf(
+                               "File %s (%s) sent to %s:%s",
+                               srcPath,
+                               humanize.IBytes(uint64(fileSize)),
+                               ctx.NodeName(node.Id),
+                               dstPath,
+                       )
+               }
                if err == nil {
-                       ctx.LogI("tx", les, "sent")
+                       ctx.LogI("tx", les, logMsg)
                } else {
-                       ctx.LogE("tx", les, err, "sent")
+                       ctx.LogE("tx", les, err, logMsg)
                }
                return err
        }
 
        leftSize := fileSize
        metaPkt := ChunkedMeta{
-               Magic:     MagicNNCPMv1,
+               Magic:     MagicNNCPMv2.B,
                FileSize:  uint64(fileSize),
                ChunkSize: uint64(chunkSize),
-               Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
+               Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
        }
        for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
-               hsh := new([32]byte)
+               hsh := new([MTHSize]byte)
                metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
        }
        var sizeToSend int64
@@ -406,10 +549,7 @@ func (ctx *Ctx) TxFile(
                if err != nil {
                        return err
                }
-               hsh, err = blake2b.New256(nil)
-               if err != nil {
-                       return err
-               }
+               hsh = MTHNew(0, 0)
                _, err = ctx.Tx(
                        node,
                        pkt,
@@ -418,6 +558,7 @@ func (ctx *Ctx) TxFile(
                        minSize,
                        io.TeeReader(reader, hsh),
                        path,
+                       areaId,
                )
                les := LEs{
                        {"Type", "file"},
@@ -427,10 +568,19 @@ func (ctx *Ctx) TxFile(
                        {"Dst", path},
                        {"Size", sizeToSend},
                }
+               logMsg := func(les LEs) string {
+                       return fmt.Sprintf(
+                               "File %s (%s) sent to %s:%s",
+                               srcPath,
+                               humanize.IBytes(uint64(sizeToSend)),
+                               ctx.NodeName(node.Id),
+                               path,
+                       )
+               }
                if err == nil {
-                       ctx.LogI("tx", les, "sent")
+                       ctx.LogI("tx", les, logMsg)
                } else {
-                       ctx.LogE("tx", les, err, "sent")
+                       ctx.LogE("tx", les, err, logMsg)
                        return err
                }
                hsh.Sum(metaPkt.Checksums[chunkNum][:0])
@@ -451,7 +601,7 @@ func (ctx *Ctx) TxFile(
                return err
        }
        metaPktSize := int64(metaBuf.Len())
-       _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
+       _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path, areaId)
        les := LEs{
                {"Type", "file"},
                {"Node", node.Id},
@@ -460,10 +610,19 @@ func (ctx *Ctx) TxFile(
                {"Dst", path},
                {"Size", metaPktSize},
        }
+       logMsg := func(les LEs) string {
+               return fmt.Sprintf(
+                       "File %s (%s) sent to %s:%s",
+                       srcPath,
+                       humanize.IBytes(uint64(metaPktSize)),
+                       ctx.NodeName(node.Id),
+                       path,
+               )
+       }
        if err == nil {
-               ctx.LogI("tx", les, "sent")
+               ctx.LogI("tx", les, logMsg)
        } else {
-               ctx.LogE("tx", les, err, "sent")
+               ctx.LogE("tx", les, err, logMsg)
        }
        return err
 }
@@ -487,7 +646,7 @@ func (ctx *Ctx) TxFreq(
        }
        src := strings.NewReader(dstPath)
        size := int64(src.Len())
-       _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
+       _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath, nil)
        les := LEs{
                {"Type", "freq"},
                {"Node", node.Id},
@@ -496,10 +655,17 @@ func (ctx *Ctx) TxFreq(
                {"Src", srcPath},
                {"Dst", dstPath},
        }
+       logMsg := func(les LEs) string {
+               return fmt.Sprintf(
+                       "File request from %s:%s to %s sent",
+                       ctx.NodeName(node.Id), srcPath,
+                       dstPath,
+               )
+       }
        if err == nil {
-               ctx.LogI("tx", les, "sent")
+               ctx.LogI("tx", les, logMsg)
        } else {
-               ctx.LogE("tx", les, err, "sent")
+               ctx.LogE("tx", les, err, logMsg)
        }
        return err
 }
@@ -513,6 +679,7 @@ func (ctx *Ctx) TxExec(
        minSize int64,
        useTmp bool,
        noCompress bool,
+       areaId *AreaId,
 ) error {
        path := make([][]byte, 0, 1+len(args))
        path = append(path, []byte(handle))
@@ -523,9 +690,9 @@ func (ctx *Ctx) TxExec(
        if noCompress {
                pktType = PktTypeExecFat
        }
-       pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
-       if err != nil {
-               return err
+       pkt, rerr := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
+       if rerr != nil {
+               return rerr
        }
        var size int64
 
@@ -539,22 +706,22 @@ func (ctx *Ctx) TxExec(
                        return err
                }
                if _, err = io.Copy(compressor, in); err != nil {
-                       compressor.Close() // #nosec G104
+                       compressor.Close()
                        return err
                }
                if err = compressor.Close(); err != nil {
                        return err
                }
                size = int64(compressed.Len())
-               _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
+               _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle, areaId)
        }
        if noCompress && !useTmp {
                var data bytes.Buffer
-               if _, err = io.Copy(&data, in); err != nil {
+               if _, err := io.Copy(&data, in); err != nil {
                        return err
                }
                size = int64(data.Len())
-               _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
+               _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &data, handle, areaId)
        }
        if !noCompress && useTmp {
                r, w := io.Pipe()
@@ -566,7 +733,7 @@ func (ctx *Ctx) TxExec(
                go func() {
                        _, err := io.Copy(compressor, in)
                        if err != nil {
-                               compressor.Close() // #nosec G104
+                               compressor.Close()
                                copyErr <- err
                        }
                        err = compressor.Close()
@@ -585,7 +752,7 @@ func (ctx *Ctx) TxExec(
                        return err
                }
                size = fileSize
-               _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
+               _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId)
        }
        if noCompress && useTmp {
                tmpReader, closer, fileSize, err := throughTmpFile(in)
@@ -596,23 +763,30 @@ func (ctx *Ctx) TxExec(
                        return err
                }
                size = fileSize
-               _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
+               _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId)
        }
 
+       dst := strings.Join(append([]string{handle}, args...), " ")
        les := LEs{
                {"Type", "exec"},
                {"Node", node.Id},
                {"Nice", int(nice)},
                {"ReplyNice", int(replyNice)},
-               {"Dst", strings.Join(append([]string{handle}, args...), " ")},
+               {"Dst", dst},
                {"Size", size},
        }
-       if err == nil {
-               ctx.LogI("tx", les, "sent")
+       logMsg := func(les LEs) string {
+               return fmt.Sprintf(
+                       "Exec sent to %s@%s (%s)",
+                       ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
+               )
+       }
+       if rerr == nil {
+               ctx.LogI("tx", les, logMsg)
        } else {
-               ctx.LogE("tx", les, err, "sent")
+               ctx.LogE("tx", les, rerr, logMsg)
        }
-       return err
+       return rerr
 }
 
 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
@@ -622,10 +796,18 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error
                {"Nice", int(nice)},
                {"Size", size},
        }
-       ctx.LogD("tx", les, "taken")
+       logMsg := func(les LEs) string {
+               return fmt.Sprintf(
+                       "Transitional packet to %s (%s) (nice %s)",
+                       ctx.NodeName(node.Id),
+                       humanize.IBytes(uint64(size)),
+                       NicenessFmt(nice),
+               )
+       }
+       ctx.LogD("tx", les, logMsg)
        if !ctx.IsEnoughSpace(size) {
                err := errors.New("is not enough space")
-               ctx.LogE("tx", les, err, err.Error())
+               ctx.LogE("tx", les, err, logMsg)
                return err
        }
        tmp, err := ctx.NewTmpFileWHash()
@@ -642,10 +824,10 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error
        nodePath := filepath.Join(ctx.Spool, node.Id.String())
        err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
        if err == nil {
-               ctx.LogI("tx", les, "sent")
+               ctx.LogI("tx", les, logMsg)
        } else {
-               ctx.LogI("tx", append(les, LE{"Err", err}), "sent")
+               ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
        }
-       os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
+       os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
        return err
 }