X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftx.go;h=20482efb234bc5a99e4b2fc5b792d7a976ff8c3a;hb=0fad171c0d79ad583c0faf5427e22d1d62a0a52d;hp=3352f861a8465d63e95cfdacdce8a724fb203db5;hpb=45ec939076353c978fe2e9cb81501bc182d71b79;p=nncp.git diff --git a/src/tx.go b/src/tx.go index 3352f86..20482ef 100644 --- a/src/tx.go +++ b/src/tx.go @@ -23,6 +23,7 @@ import ( "bytes" "crypto/rand" "errors" + "fmt" "hash" "io" "io/ioutil" @@ -33,8 +34,8 @@ import ( "time" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" - "golang.org/x/crypto/blake2b" "golang.org/x/crypto/chacha20poly1305" ) @@ -62,7 +63,9 @@ func (ctx *Ctx) Tx( } expectedSize := size for i := 0; i < len(hops); i++ { - expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize) + expectedSize = PktEncOverhead + + PktSizeOverhead + + sizeWithTags(PktOverhead+expectedSize) } padSize := minSize - expectedSize if padSize < 0 { @@ -79,16 +82,30 @@ func (ctx *Ctx) Tx( errs := make(chan error) curSize := size pipeR, pipeW := io.Pipe() + var pktEncRaw []byte go func(size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", SDS{ - "node": hops[0].Id, - "nice": int(nice), - "size": size, - }, "wrote") - errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) + ctx.LogD("tx", LEs{ + {"Node", hops[0].Id}, + {"Nice", int(nice)}, + {"Size", size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (%s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) + pktEncRaw, err = PktEncWrite( + ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, + ) + errs <- err dst.Close() // #nosec G104 }(curSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize + curSize = PktEncOverhead + + PktSizeOverhead + + sizeWithTags(PktOverhead+curSize) + + padSize var pipeRPrev io.Reader for i := 1; i < len(hops); i++ { @@ -96,12 +113,20 @@ func (ctx *Ctx) Tx( pipeRPrev = pipeR pipeR, pipeW = io.Pipe() go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", SDS{ - "node": node.Id, - "nice": int(nice), - "size": size, - }, "trns wrote") - errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + ctx.LogD("tx", LEs{ + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx trns packet to %s (%s) nice: %s", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) + _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + errs <- err dst.Close() // #nosec G104 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW) curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) @@ -109,7 +134,7 @@ func (ctx *Ctx) Tx( go func() { _, err := CopyProgressed( tmp.W, pipeR, "Tx", - SDS{"pkt": pktName, "fullsize": curSize}, + LEs{{"Pkt", pktName}, {"FullSize", curSize}}, ctx.ShowPrgrs, ) errs <- err @@ -124,6 +149,12 @@ func (ctx *Ctx) Tx( nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104 + if err != nil { + return lastNode, err + } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) + } return lastNode, err } @@ -347,31 +378,40 @@ func (ctx *Ctx) TxFile( return err } _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": dstPath, - "size": fileSize, + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", dstPath}, + {"Size", fileSize}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(fileSize)), + ctx.NodeName(node.Id), + dstPath, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } leftSize := fileSize metaPkt := ChunkedMeta{ - Magic: MagicNNCPMv1, + Magic: MagicNNCPMv2, FileSize: uint64(fileSize), ChunkSize: uint64(chunkSize), - Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1), + Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1), } for i := int64(0); i < (fileSize/chunkSize)+1; i++ { - hsh := new([32]byte) + hsh := new([MTHSize]byte) metaPkt.Checksums = append(metaPkt.Checksums, *hsh) } var sizeToSend int64 @@ -390,10 +430,7 @@ func (ctx *Ctx) TxFile( if err != nil { return err } - hsh, err = blake2b.New256(nil) - if err != nil { - return err - } + hsh = MTHNew(0, 0) _, err = ctx.Tx( node, pkt, @@ -403,18 +440,27 @@ func (ctx *Ctx) TxFile( io.TeeReader(reader, hsh), path, ) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": path, - "size": sizeToSend, + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", path}, + {"Size", sizeToSend}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(sizeToSend)), + ctx.NodeName(node.Id), + path, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) return err } hsh.Sum(metaPkt.Checksums[chunkNum][:0]) @@ -436,18 +482,27 @@ func (ctx *Ctx) TxFile( } metaPktSize := int64(metaBuf.Len()) _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": path, - "size": metaPktSize, + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", path}, + {"Size", metaPktSize}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(metaPktSize)), + ctx.NodeName(node.Id), + path, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -472,18 +527,25 @@ func (ctx *Ctx) TxFreq( src := strings.NewReader(dstPath) size := int64(src.Len()) _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath) - sds := SDS{ - "type": "freq", - "node": node.Id, - "nice": int(nice), - "replynice": int(replyNice), - "src": srcPath, - "dst": dstPath, + les := LEs{ + {"Type", "freq"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"ReplyNice", int(replyNice)}, + {"Src", srcPath}, + {"Dst", dstPath}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File request from %s:%s to %s sent", + ctx.NodeName(node.Id), srcPath, + dstPath, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -583,33 +645,48 @@ func (ctx *Ctx) TxExec( _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle) } - sds := SDS{ - "type": "exec", - "node": node.Id, - "nice": int(nice), - "replynice": int(replyNice), - "dst": strings.Join(append([]string{handle}, args...), " "), - "size": size, + dst := strings.Join(append([]string{handle}, args...), " ") + les := LEs{ + {"Type", "exec"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"ReplyNice", int(replyNice)}, + {"Dst", dst}, + {"Size", size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Exec sent to %s@%s (%s)", + ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error { - sds := SDS{ - "type": "trns", - "node": node.Id, - "nice": int(nice), - "size": size, + les := LEs{ + {"Type", "trns"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Transitional packet to %s (%s) (nice %s)", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) } - ctx.LogD("tx", sds, "taken") + ctx.LogD("tx", les, logMsg) if !ctx.IsEnoughSpace(size) { err := errors.New("is not enough space") - ctx.LogE("tx", sds, err, err.Error()) + ctx.LogE("tx", les, err, logMsg) return err } tmp, err := ctx.NewTmpFileWHash() @@ -618,7 +695,7 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error } if _, err = CopyProgressed( tmp.W, src, "Tx trns", - SDS{"pkt": node.Id.String(), "fullsize": size}, + LEs{{"Pkt", node.Id.String()}, {"FullSize", size}}, ctx.ShowPrgrs, ); err != nil { return err @@ -626,9 +703,9 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error nodePath := filepath.Join(ctx.Spool, node.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent") + ctx.LogI("tx", append(les, LE{"Err", err}), logMsg) } os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104 return err