"bytes"
"crypto/rand"
"errors"
+ "fmt"
"hash"
"io"
"io/ioutil"
"time"
xdr "github.com/davecgh/go-xdr/xdr2"
+ "github.com/dustin/go-humanize"
"github.com/klauspost/compress/zstd"
"golang.org/x/crypto/blake2b"
"golang.org/x/crypto/chacha20poly1305"
}
expectedSize := size
for i := 0; i < len(hops); i++ {
- expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
+ expectedSize = PktEncOverhead +
+ PktSizeOverhead +
+ sizeWithTags(PktOverhead+expectedSize)
}
padSize := minSize - expectedSize
if padSize < 0 {
errs := make(chan error)
curSize := size
pipeR, pipeW := io.Pipe()
+ var pktEncRaw []byte
go func(size int64, src io.Reader, dst io.WriteCloser) {
- ctx.LogD("tx", SDS{
- "node": hops[0].Id,
- "nice": int(nice),
- "size": size,
- }, "wrote")
- errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
+ ctx.LogD("tx", LEs{
+ {"Node", hops[0].Id},
+ {"Nice", int(nice)},
+ {"Size", size},
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tx packet to %s (%s) nice: %s",
+ ctx.NodeName(hops[0].Id),
+ humanize.IBytes(uint64(size)),
+ NicenessFmt(nice),
+ )
+ })
+ pktEncRaw, err = PktEncWrite(
+ ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
+ )
+ errs <- err
dst.Close() // #nosec G104
}(curSize, src, pipeW)
- curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
+ curSize = PktEncOverhead +
+ PktSizeOverhead +
+ sizeWithTags(PktOverhead+curSize) +
+ padSize
var pipeRPrev io.Reader
for i := 1; i < len(hops); i++ {
pipeRPrev = pipeR
pipeR, pipeW = io.Pipe()
go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
- ctx.LogD("tx", SDS{
- "node": node.Id,
- "nice": int(nice),
- "size": size,
- }, "trns wrote")
- errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+ ctx.LogD("tx", LEs{
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"Size", size},
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tx trns packet to %s (%s) nice: %s",
+ ctx.NodeName(node.Id),
+ humanize.IBytes(uint64(size)),
+ NicenessFmt(nice),
+ )
+ })
+ _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+ errs <- err
dst.Close() // #nosec G104
}(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
go func() {
_, err := CopyProgressed(
tmp.W, pipeR, "Tx",
- SDS{"pkt": pktName, "fullsize": curSize},
+ LEs{{"Pkt", pktName}, {"FullSize", curSize}},
ctx.ShowPrgrs,
)
errs <- err
nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
+ if err != nil {
+ return lastNode, err
+ }
+ if ctx.HdrUsage {
+ ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
+ }
return lastNode, err
}
return err
}
_, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
- sds := SDS{
- "type": "file",
- "node": node.Id,
- "nice": int(nice),
- "src": srcPath,
- "dst": dstPath,
- "size": fileSize,
+ les := LEs{
+ {"Type", "file"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"Src", srcPath},
+ {"Dst", dstPath},
+ {"Size", fileSize},
+ }
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File %s (%s) sent to %s:%s",
+ srcPath,
+ humanize.IBytes(uint64(fileSize)),
+ ctx.NodeName(node.Id),
+ dstPath,
+ )
}
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", sds, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
io.TeeReader(reader, hsh),
path,
)
- sds := SDS{
- "type": "file",
- "node": node.Id,
- "nice": int(nice),
- "src": srcPath,
- "dst": path,
- "size": sizeToSend,
+ les := LEs{
+ {"Type", "file"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"Src", srcPath},
+ {"Dst", path},
+ {"Size", sizeToSend},
+ }
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File %s (%s) sent to %s:%s",
+ srcPath,
+ humanize.IBytes(uint64(sizeToSend)),
+ ctx.NodeName(node.Id),
+ path,
+ )
}
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", sds, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
return err
}
hsh.Sum(metaPkt.Checksums[chunkNum][:0])
}
metaPktSize := int64(metaBuf.Len())
_, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
- sds := SDS{
- "type": "file",
- "node": node.Id,
- "nice": int(nice),
- "src": srcPath,
- "dst": path,
- "size": metaPktSize,
+ les := LEs{
+ {"Type", "file"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"Src", srcPath},
+ {"Dst", path},
+ {"Size", metaPktSize},
+ }
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File %s (%s) sent to %s:%s",
+ srcPath,
+ humanize.IBytes(uint64(metaPktSize)),
+ ctx.NodeName(node.Id),
+ path,
+ )
}
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", sds, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
src := strings.NewReader(dstPath)
size := int64(src.Len())
_, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
- sds := SDS{
- "type": "freq",
- "node": node.Id,
- "nice": int(nice),
- "replynice": int(replyNice),
- "src": srcPath,
- "dst": dstPath,
+ les := LEs{
+ {"Type", "freq"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"ReplyNice", int(replyNice)},
+ {"Src", srcPath},
+ {"Dst", dstPath},
+ }
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File request from %s:%s to %s sent",
+ ctx.NodeName(node.Id), srcPath,
+ dstPath,
+ )
}
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", sds, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
_, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
}
- sds := SDS{
- "type": "exec",
- "node": node.Id,
- "nice": int(nice),
- "replynice": int(replyNice),
- "dst": strings.Join(append([]string{handle}, args...), " "),
- "size": size,
+ dst := strings.Join(append([]string{handle}, args...), " ")
+ les := LEs{
+ {"Type", "exec"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"ReplyNice", int(replyNice)},
+ {"Dst", dst},
+ {"Size", size},
+ }
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Exec sent to %s@%s (%s)",
+ ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
+ )
}
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", sds, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
- sds := SDS{
- "type": "trns",
- "node": node.Id,
- "nice": int(nice),
- "size": size,
+ les := LEs{
+ {"Type", "trns"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"Size", size},
+ }
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Transitional packet to %s (%s) (nice %s)",
+ ctx.NodeName(node.Id),
+ humanize.IBytes(uint64(size)),
+ NicenessFmt(nice),
+ )
}
- ctx.LogD("tx", sds, "taken")
+ ctx.LogD("tx", les, logMsg)
if !ctx.IsEnoughSpace(size) {
err := errors.New("is not enough space")
- ctx.LogE("tx", sds, err, err.Error())
+ ctx.LogE("tx", les, err, logMsg)
return err
}
tmp, err := ctx.NewTmpFileWHash()
}
if _, err = CopyProgressed(
tmp.W, src, "Tx trns",
- SDS{"pkt": node.Id.String(), "fullsize": size},
+ LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
ctx.ShowPrgrs,
); err != nil {
return err
nodePath := filepath.Join(ctx.Spool, node.Id.String())
err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent")
+ ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
}
os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
return err