"bytes"
"crypto/rand"
"errors"
+ "fmt"
"hash"
"io"
"io/ioutil"
"time"
xdr "github.com/davecgh/go-xdr/xdr2"
+ "github.com/dustin/go-humanize"
"github.com/klauspost/compress/zstd"
- "golang.org/x/crypto/blake2b"
"golang.org/x/crypto/chacha20poly1305"
)
}
expectedSize := size
for i := 0; i < len(hops); i++ {
- expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
+ expectedSize = PktEncOverhead +
+ PktSizeOverhead +
+ sizeWithTags(PktOverhead+expectedSize)
}
padSize := minSize - expectedSize
if padSize < 0 {
errs := make(chan error)
curSize := size
pipeR, pipeW := io.Pipe()
+ var pktEncRaw []byte
go func(size int64, src io.Reader, dst io.WriteCloser) {
ctx.LogD("tx", LEs{
{"Node", hops[0].Id},
{"Nice", int(nice)},
{"Size", size},
- }, "wrote")
- errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tx packet to %s (%s) nice: %s",
+ ctx.NodeName(hops[0].Id),
+ humanize.IBytes(uint64(size)),
+ NicenessFmt(nice),
+ )
+ })
+ pktEncRaw, err = PktEncWrite(
+ ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
+ )
+ errs <- err
dst.Close() // #nosec G104
}(curSize, src, pipeW)
- curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
+ curSize = PktEncOverhead +
+ PktSizeOverhead +
+ sizeWithTags(PktOverhead+curSize) +
+ padSize
var pipeRPrev io.Reader
for i := 1; i < len(hops); i++ {
{"Node", node.Id},
{"Nice", int(nice)},
{"Size", size},
- }, "trns wrote")
- errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tx trns packet to %s (%s) nice: %s",
+ ctx.NodeName(node.Id),
+ humanize.IBytes(uint64(size)),
+ NicenessFmt(nice),
+ )
+ })
+ _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+ errs <- err
dst.Close() // #nosec G104
}(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
+ if err != nil {
+ return lastNode, err
+ }
+ if ctx.HdrUsage {
+ ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
+ }
return lastNode, err
}
{"Dst", dstPath},
{"Size", fileSize},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File %s (%s) sent to %s:%s",
+ srcPath,
+ humanize.IBytes(uint64(fileSize)),
+ ctx.NodeName(node.Id),
+ dstPath,
+ )
+ }
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", les, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
leftSize := fileSize
metaPkt := ChunkedMeta{
- Magic: MagicNNCPMv1,
+ Magic: MagicNNCPMv2,
FileSize: uint64(fileSize),
ChunkSize: uint64(chunkSize),
- Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
+ Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
}
for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
- hsh := new([32]byte)
+ hsh := new([MTHSize]byte)
metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
}
var sizeToSend int64
if err != nil {
return err
}
- hsh, err = blake2b.New256(nil)
- if err != nil {
- return err
- }
+ hsh = MTHNew(0, 0)
_, err = ctx.Tx(
node,
pkt,
{"Dst", path},
{"Size", sizeToSend},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File %s (%s) sent to %s:%s",
+ srcPath,
+ humanize.IBytes(uint64(sizeToSend)),
+ ctx.NodeName(node.Id),
+ path,
+ )
+ }
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", les, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
return err
}
hsh.Sum(metaPkt.Checksums[chunkNum][:0])
{"Dst", path},
{"Size", metaPktSize},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File %s (%s) sent to %s:%s",
+ srcPath,
+ humanize.IBytes(uint64(metaPktSize)),
+ ctx.NodeName(node.Id),
+ path,
+ )
+ }
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", les, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
{"Src", srcPath},
{"Dst", dstPath},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File request from %s:%s to %s sent",
+ ctx.NodeName(node.Id), srcPath,
+ dstPath,
+ )
+ }
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", les, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
_, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
}
+ dst := strings.Join(append([]string{handle}, args...), " ")
les := LEs{
{"Type", "exec"},
{"Node", node.Id},
{"Nice", int(nice)},
{"ReplyNice", int(replyNice)},
- {"Dst", strings.Join(append([]string{handle}, args...), " ")},
+ {"Dst", dst},
{"Size", size},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Exec sent to %s@%s (%s)",
+ ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
+ )
+ }
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", les, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
{"Nice", int(nice)},
{"Size", size},
}
- ctx.LogD("tx", les, "taken")
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Transitional packet to %s (%s) (nice %s)",
+ ctx.NodeName(node.Id),
+ humanize.IBytes(uint64(size)),
+ NicenessFmt(nice),
+ )
+ }
+ ctx.LogD("tx", les, logMsg)
if !ctx.IsEnoughSpace(size) {
err := errors.New("is not enough space")
- ctx.LogE("tx", les, err, err.Error())
+ ctx.LogE("tx", les, err, logMsg)
return err
}
tmp, err := ctx.NewTmpFileWHash()
nodePath := filepath.Join(ctx.Spool, node.Id.String())
err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogI("tx", append(les, LE{"Err", err}), "sent")
+ ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
}
os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
return err