X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftx.go;h=f9da81b03093f6c82d500da82d5f492dfa056f33;hb=5d5c951d8fecf27acbe3895091637a70bf7f7f39;hp=67bb659a672426895919fcd5961633c75d455df9;hpb=1d2ce674b042d07fd9b37a46578c8b62bb0345b7;p=nncp.git diff --git a/src/tx.go b/src/tx.go index 67bb659..f9da81b 100644 --- a/src/tx.go +++ b/src/tx.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2019 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -21,11 +21,9 @@ import ( "archive/tar" "bufio" "bytes" - "crypto/rand" "errors" - "hash" + "fmt" "io" - "io/ioutil" "os" "path/filepath" "strconv" @@ -33,9 +31,9 @@ import ( "time" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" "golang.org/x/crypto/blake2b" - "golang.org/x/crypto/chacha20poly1305" ) const ( @@ -45,14 +43,28 @@ const ( TarExt = ".tar" ) +type PktEncWriteResult struct { + pktEncRaw []byte + size int64 + err error +} + func (ctx *Ctx) Tx( node *Node, pkt *Pkt, nice uint8, - size, minSize int64, + srcSize, minSize, maxSize int64, src io.Reader, pktName string, -) (*Node, error) { + areaId *AreaId, +) (*Node, int64, error) { + var area *Area + if areaId != nil { + area = ctx.AreaId2Area[*areaId] + if area.Prv == nil { + return nil, 0, errors.New("area has no encryption keys") + } + } hops := make([]*Node, 0, 1+len(node.Via)) hops = append(hops, node) lastNode := node @@ -60,116 +72,217 @@ func (ctx *Ctx) Tx( lastNode = ctx.Neigh[*node.Via[i-1]] hops = append(hops, lastNode) } - expectedSize := size - for i := 0; i < len(hops); i++ { - expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize) - } - padSize := minSize - expectedSize - if padSize < 0 { - padSize = 0 - } - if !ctx.IsEnoughSpace(size + padSize) { - return nil, errors.New("is not enough space") + wrappers := len(hops) + if area != nil { + wrappers++ + } + var expectedSize int64 + if srcSize > 0 { + expectedSize = srcSize + PktOverhead + expectedSize += sizePadCalc(expectedSize, minSize, wrappers) + expectedSize = PktEncOverhead + sizeWithTags(expectedSize) + if maxSize != 0 && expectedSize > maxSize { + return nil, 0, TooBig + } + if !ctx.IsEnoughSpace(expectedSize) { + return nil, 0, errors.New("is not enough space") + } } tmp, err := ctx.NewTmpFileWHash() if err != nil { - return nil, err + return nil, 0, err } - errs := make(chan error) - curSize := size + results := make(chan PktEncWriteResult) pipeR, pipeW := io.Pipe() - go func(size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", SDS{ - "node": hops[0].Id, - "nice": int(nice), - "size": size, - }, "wrote") - errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) - dst.Close() - }(curSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize - var pipeRPrev io.Reader + if area == nil { + go func(src io.Reader, dst io.WriteCloser) { + ctx.LogD("tx", LEs{ + {"Node", hops[0].Id}, + {"Nice", int(nice)}, + {"Size", expectedSize}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (source %s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(expectedSize)), + NicenessFmt(nice), + ) + }) + pktEncRaw, size, err := PktEncWrite( + ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst, + ) + results <- PktEncWriteResult{pktEncRaw, size, err} + dst.Close() + }(src, pipeW) + } else { + go func(src io.Reader, dst io.WriteCloser) { + ctx.LogD("tx", LEs{ + {"Area", area.Id}, + {"Nice", int(nice)}, + {"Size", expectedSize}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx area packet to %s (source %s) nice: %s", + ctx.AreaName(areaId), + humanize.IBytes(uint64(expectedSize)), + NicenessFmt(nice), + ) + }) + areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)} + copy(areaNode.Id[:], area.Id[:]) + copy(areaNode.ExchPub[:], area.Pub[:]) + pktEncRaw, size, err := PktEncWrite( + ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst, + ) + results <- PktEncWriteResult{pktEncRaw, size, err} + dst.Close() + }(src, pipeW) + pipeRPrev = pipeR + pipeR, pipeW = io.Pipe() + go func(src io.Reader, dst io.WriteCloser) { + pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:]) + if err != nil { + panic(err) + } + ctx.LogD("tx", LEs{ + {"Node", hops[0].Id}, + {"Nice", int(nice)}, + {"Size", expectedSize}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (source %s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(expectedSize)), + NicenessFmt(nice), + ) + }) + pktEncRaw, size, err := PktEncWrite( + ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst, + ) + results <- PktEncWriteResult{pktEncRaw, size, err} + dst.Close() + }(pipeRPrev, pipeW) + } for i := 1; i < len(hops); i++ { - pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:]) + pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:]) + if err != nil { + panic(err) + } pipeRPrev = pipeR pipeR, pipeW = io.Pipe() - go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", SDS{ - "node": node.Id, - "nice": int(nice), - "size": size, - }, "trns wrote") - errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) { + ctx.LogD("tx", LEs{ + {"Node", node.Id}, + {"Nice", int(nice)}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx trns packet to %s nice: %s", + ctx.NodeName(node.Id), + NicenessFmt(nice), + ) + }) + pktEncRaw, size, err := PktEncWrite( + ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst, + ) + results <- PktEncWriteResult{pktEncRaw, size, err} dst.Close() - }(hops[i], pktTrns, curSize, pipeRPrev, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + }(hops[i], pktTrns, pipeRPrev, pipeW) } go func() { _, err := CopyProgressed( - tmp.W, pipeR, - SDS{"xx": string(TTx), "pkt": pktName, "fullsize": curSize}, + tmp.W, pipeR, "Tx", + LEs{{"Pkt", pktName}, {"FullSize", expectedSize}}, ctx.ShowPrgrs, ) - errs <- err + results <- PktEncWriteResult{err: err} }() - for i := 0; i <= len(hops); i++ { - err = <-errs - if err != nil { + var pktEncRaw []byte + var pktEncMsg []byte + var payloadSize int64 + if area != nil { + r := <-results + payloadSize = r.size + pktEncMsg = r.pktEncRaw + wrappers-- + } + for i := 0; i <= wrappers; i++ { + r := <-results + if r.err != nil { tmp.Fd.Close() - return nil, err + return nil, 0, r.err + } + if r.pktEncRaw != nil { + pktEncRaw = r.pktEncRaw + if payloadSize == 0 { + payloadSize = r.size + } } } nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) - return lastNode, err + if err != nil { + return lastNode, 0, err + } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) + } + if area != nil { + msgHashRaw := blake2b.Sum256(pktEncMsg) + msgHash := Base32Codec.EncodeToString(msgHashRaw[:]) + seenDir := filepath.Join( + ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(), + ) + seenPath := filepath.Join(seenDir, msgHash) + les := LEs{ + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", expectedSize}, + {"Area", areaId}, + {"AreaMsg", msgHash}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Tx area packet to %s (source %s) nice: %s, area %s: %s", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(expectedSize)), + NicenessFmt(nice), + area.Name, + msgHash, + ) + } + if err = ensureDir(seenDir); err != nil { + ctx.LogE("tx-mkdir", les, err, logMsg) + return lastNode, 0, err + } + if fd, err := os.Create(seenPath); err == nil { + fd.Close() + if err = DirSync(seenDir); err != nil { + ctx.LogE("tx-dirsync", les, err, logMsg) + return lastNode, 0, err + } + } + ctx.LogI("tx-area", les, logMsg) + } + return lastNode, payloadSize, err } type DummyCloser struct{} func (dc DummyCloser) Close() error { return nil } -func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) { +func prepareTxFile(srcPath string) ( + reader io.Reader, + closer io.Closer, + srcSize int64, + archived bool, + rerr error, +) { if srcPath == "-" { - // Read content from stdin, saving to temporary file, encrypting - // on the fly - src, err := ioutil.TempFile("", "nncp-file") - if err != nil { - rerr = err - return - } - os.Remove(src.Name()) - tmpW := bufio.NewWriter(src) - tmpKey := make([]byte, chacha20poly1305.KeySize) - if _, rerr = rand.Read(tmpKey[:]); rerr != nil { - return - } - aead, err := chacha20poly1305.New(tmpKey) - if err != nil { - rerr = err - return - } - nonce := make([]byte, aead.NonceSize()) - written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW) - if err != nil { - rerr = err - return - } - fileSize = int64(written) - if err = tmpW.Flush(); err != nil { - return - } - src.Seek(0, io.SeekStart) - r, w := io.Pipe() - go func() { - if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil { - w.CloseWithError(err) - } - }() - reader = r - closer = src + reader = os.Stdin + closer = os.Stdin return } @@ -187,9 +300,9 @@ func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize rerr = err return } - fileSize = srcStat.Size() - reader = bufio.NewReader(src) + reader = src closer = src + srcSize = srcStat.Size() return } @@ -217,15 +330,15 @@ func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize if err != nil { return err } - if info.IsDir() { + if info.Mode().IsDir() { // directory header, PAX record header+contents - fileSize += TarBlockSize + 2*TarBlockSize + srcSize += TarBlockSize + 2*TarBlockSize dirs = append(dirs, einfo{path: path, modTime: info.ModTime()}) - } else { + } else if info.Mode().IsRegular() { // file header, PAX record header+contents, file content - fileSize += TarBlockSize + 2*TarBlockSize + info.Size() + srcSize += TarBlockSize + 2*TarBlockSize + info.Size() if n := info.Size() % TarBlockSize; n != 0 { - fileSize += TarBlockSize - n // padding + srcSize += TarBlockSize - n // padding } files = append(files, einfo{ path: path, @@ -242,9 +355,9 @@ func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize r, w := io.Pipe() reader = r closer = DummyCloser{} - fileSize += 2 * TarBlockSize // termination block + srcSize += 2 * TarBlockSize // termination block - go func() { + go func() error { tarWr := tar.NewWriter(w) hdr := tar.Header{ Typeflag: tar.TypeDir, @@ -258,7 +371,7 @@ func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize hdr.Name = basePath + e.path[len(rootPath):] hdr.ModTime = e.modTime if err = tarWr.WriteHeader(&hdr); err != nil { - w.CloseWithError(err) + return w.CloseWithError(err) } } hdr.Typeflag = tar.TypeReg @@ -268,20 +381,23 @@ func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize hdr.ModTime = e.modTime hdr.Size = e.size if err = tarWr.WriteHeader(&hdr); err != nil { - w.CloseWithError(err) + return w.CloseWithError(err) } fd, err := os.Open(e.path) if err != nil { - w.CloseWithError(err) + fd.Close() + return w.CloseWithError(err) } - _, err = io.Copy(tarWr, bufio.NewReader(fd)) - if err != nil { - w.CloseWithError(err) + if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil { + fd.Close() + return w.CloseWithError(err) } fd.Close() } - tarWr.Close() - w.Close() + if err = tarWr.Close(); err != nil { + return w.CloseWithError(err) + } + return w.Close() }() return } @@ -290,8 +406,8 @@ func (ctx *Ctx) TxFile( node *Node, nice uint8, srcPath, dstPath string, - chunkSize int64, - minSize, maxSize int64, + chunkSize, minSize, maxSize int64, + areaId *AreaId, ) error { dstPathSpecified := false if dstPath == "" { @@ -306,127 +422,153 @@ func (ctx *Ctx) TxFile( if filepath.IsAbs(dstPath) { return errors.New("Relative destination path required") } - reader, closer, fileSize, archived, err := prepareTxFile(srcPath) + reader, closer, srcSize, archived, err := prepareTxFile(srcPath) if closer != nil { defer closer.Close() } if err != nil { return err } - if fileSize > maxSize { - return errors.New("Too big than allowed") - } if archived && !dstPathSpecified { dstPath += TarExt } - if fileSize <= chunkSize { + if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) { pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath)) if err != nil { return err } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": dstPath, - "size": fileSize, + _, finalSize, err := ctx.Tx( + node, pkt, nice, + srcSize, minSize, maxSize, + bufio.NewReader(reader), dstPath, areaId, + ) + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", dstPath}, + {"Size", finalSize}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) is sent to %s:%s", + srcPath, + humanize.IBytes(uint64(finalSize)), + ctx.NodeName(node.Id), + dstPath, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } - leftSize := fileSize - metaPkt := ChunkedMeta{ - Magic: MagicNNCPMv1, - FileSize: uint64(fileSize), - ChunkSize: uint64(chunkSize), - Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1), - } - for i := int64(0); i < (fileSize/chunkSize)+1; i++ { - hsh := new([32]byte) - metaPkt.Checksums = append(metaPkt.Checksums, *hsh) - } - var sizeToSend int64 - var hsh hash.Hash - var pkt *Pkt + br := bufio.NewReader(reader) + var sizeFull int64 var chunkNum int - var path string + checksums := [][MTHSize]byte{} for { - if leftSize <= chunkSize { - sizeToSend = leftSize - } else { - sizeToSend = chunkSize - } - path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum) - pkt, err = NewPkt(PktTypeFile, nice, []byte(path)) + lr := io.LimitReader(br, chunkSize) + path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum) + pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) if err != nil { return err } - hsh, err = blake2b.New256(nil) - if err != nil { - return err - } - _, err = ctx.Tx( - node, - pkt, - nice, - sizeToSend, - minSize, - io.TeeReader(reader, hsh), - path, + hsh := MTHNew(0, 0) + _, size, err := ctx.Tx( + node, pkt, nice, + 0, minSize, maxSize, + io.TeeReader(lr, hsh), + path, areaId, ) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": path, - "size": sizeToSend, + + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", path}, + {"Size", size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) is sent to %s:%s", + srcPath, + humanize.IBytes(uint64(size)), + ctx.NodeName(node.Id), + path, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) return err } - hsh.Sum(metaPkt.Checksums[chunkNum][:0]) - leftSize -= sizeToSend + + sizeFull += size - PktOverhead + var checksum [MTHSize]byte + hsh.Sum(checksum[:0]) + checksums = append(checksums, checksum) chunkNum++ - if leftSize == 0 { + if size < chunkSize { + break + } + if _, err = br.Peek(1); err != nil { break } } - var metaBuf bytes.Buffer - _, err = xdr.Marshal(&metaBuf, metaPkt) + + metaPkt := ChunkedMeta{ + Magic: MagicNNCPMv2.B, + FileSize: uint64(sizeFull), + ChunkSize: uint64(chunkSize), + Checksums: checksums, + } + var buf bytes.Buffer + _, err = xdr.Marshal(&buf, metaPkt) if err != nil { return err } - path = dstPath + ChunkedSuffixMeta - pkt, err = NewPkt(PktTypeFile, nice, []byte(path)) + path := dstPath + ChunkedSuffixMeta + pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) if err != nil { return err } - metaPktSize := int64(metaBuf.Len()) - _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": path, - "size": metaPktSize, + metaPktSize := int64(buf.Len()) + _, _, err = ctx.Tx( + node, + pkt, + nice, + metaPktSize, minSize, maxSize, + &buf, path, areaId, + ) + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", path}, + {"Size", metaPktSize}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) is sent to %s:%s", + srcPath, + humanize.IBytes(uint64(metaPktSize)), + ctx.NodeName(node.Id), + path, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -435,7 +577,8 @@ func (ctx *Ctx) TxFreq( node *Node, nice, replyNice uint8, srcPath, dstPath string, - minSize int64) error { + minSize int64, +) error { dstPath = filepath.Clean(dstPath) if filepath.IsAbs(dstPath) { return errors.New("Relative destination path required") @@ -450,19 +593,26 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath) - sds := SDS{ - "type": "freq", - "node": node.Id, - "nice": int(nice), - "replynice": int(replyNice), - "src": srcPath, - "dst": dstPath, + _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) + les := LEs{ + {"Type", "freq"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"ReplyNice", int(replyNice)}, + {"Src", srcPath}, + {"Dst", dstPath}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File request from %s:%s to %s is sent", + ctx.NodeName(node.Id), srcPath, + dstPath, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -473,78 +623,108 @@ func (ctx *Ctx) TxExec( handle string, args []string, in io.Reader, - minSize int64, + minSize int64, maxSize int64, + noCompress bool, + areaId *AreaId, ) error { path := make([][]byte, 0, 1+len(args)) path = append(path, []byte(handle)) for _, arg := range args { path = append(path, []byte(arg)) } - pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0})) - if err != nil { - return err + pktType := PktTypeExec + if noCompress { + pktType = PktTypeExecFat } - var compressed bytes.Buffer - compressor, err := zstd.NewWriter( - &compressed, - zstd.WithEncoderLevel(zstd.SpeedDefault), - ) + pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0})) if err != nil { return err } - _, err = io.Copy(compressor, in) - compressor.Close() - if err != nil { - return err + compressErr := make(chan error, 1) + if !noCompress { + pr, pw := io.Pipe() + compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault)) + if err != nil { + return err + } + go func(r io.Reader) { + if _, err := io.Copy(compressor, r); err != nil { + compressErr <- err + return + } + compressErr <- compressor.Close() + pw.Close() + }(in) + in = pr + } + _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) + if !noCompress { + e := <-compressErr + if err == nil { + err = e + } } - size := int64(compressed.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle) - sds := SDS{ - "type": "exec", - "node": node.Id, - "nice": int(nice), - "replynice": int(replyNice), - "dst": strings.Join(append([]string{handle}, args...), " "), - "size": size, + dst := strings.Join(append([]string{handle}, args...), " ") + les := LEs{ + {"Type", "exec"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"ReplyNice", int(replyNice)}, + {"Dst", dst}, + {"Size", size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Exec is sent to %s@%s (%s)", + ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error { - sds := SDS{ - "type": "trns", - "node": node.Id, - "nice": int(nice), - "size": size, + les := LEs{ + {"Type", "trns"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Transitional packet to %s (%s) (nice %s)", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) } - ctx.LogD("tx", sds, "taken") + ctx.LogD("tx", les, logMsg) if !ctx.IsEnoughSpace(size) { err := errors.New("is not enough space") - ctx.LogE("tx", sds, err, err.Error()) + ctx.LogE("tx", les, err, logMsg) return err } tmp, err := ctx.NewTmpFileWHash() if err != nil { return err } - if _, err = CopyProgressed(tmp.W, src, SDS{ - "xx": string(TTx), - "pkt": node.Id.String(), - "fullsize": size, - }, ctx.ShowPrgrs); err != nil { + if _, err = CopyProgressed( + tmp.W, src, "Tx trns", + LEs{{"Pkt", node.Id.String()}, {"FullSize", size}}, + ctx.ShowPrgrs, + ); err != nil { return err } nodePath := filepath.Join(ctx.Spool, node.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent") + ctx.LogI("tx", append(les, LE{"Err", err}), logMsg) } os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) return err