X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftx.go;h=a90a49e42d05f0f96e3639e3740099f15a40c29c;hb=ab7c7eca0e53661f0ba904c2a6ba752990bea367;hp=831cb6fde721eac9eb982ab83c80a074d7ee79d7;hpb=271870ad4f56253e0918f673b90615e4749cf201;p=nncp.git diff --git a/src/tx.go b/src/tx.go index 831cb6f..a90a49e 100644 --- a/src/tx.go +++ b/src/tx.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2020 Sergey Matveev +Copyright (C) 2016-2021 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -23,6 +23,7 @@ import ( "bytes" "crypto/rand" "errors" + "fmt" "hash" "io" "io/ioutil" @@ -33,6 +34,7 @@ import ( "time" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" "golang.org/x/crypto/blake2b" "golang.org/x/crypto/chacha20poly1305" @@ -52,7 +54,15 @@ func (ctx *Ctx) Tx( size, minSize int64, src io.Reader, pktName string, + areaId *AreaId, ) (*Node, error) { + var area *Area + if areaId != nil { + area = ctx.AreaId2Area[*areaId] + if area.Prv == nil { + return nil, errors.New("unknown area id") + } + } hops := make([]*Node, 0, 1+len(node.Via)) hops = append(hops, node) lastNode := node @@ -61,8 +71,14 @@ func (ctx *Ctx) Tx( hops = append(hops, lastNode) } expectedSize := size - for i := 0; i < len(hops); i++ { - expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize) + wrappers := len(hops) + if area != nil { + wrappers++ + } + for i := 0; i < wrappers; i++ { + expectedSize = PktEncOverhead + + PktSizeOverhead + + sizeWithTags(PktOverhead+expectedSize) } padSize := minSize - expectedSize if padSize < 0 { @@ -77,53 +93,183 @@ func (ctx *Ctx) Tx( } errs := make(chan error) + pktEncRaws := make(chan []byte) curSize := size pipeR, pipeW := io.Pipe() - go func(size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", SDS{ - "node": hops[0].Id, - "nice": int(nice), - "size": size, - }, "wrote") - errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) - dst.Close() - }(curSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize - var pipeRPrev io.Reader + if area == nil { + go func(size int64, src io.Reader, dst io.WriteCloser) { + ctx.LogD("tx", LEs{ + {"Node", hops[0].Id}, + {"Nice", int(nice)}, + {"Size", size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (%s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) + pktEncRaw, err := PktEncWrite( + ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, + ) + pktEncRaws <- pktEncRaw + errs <- err + dst.Close() // #nosec G104 + }(curSize, src, pipeW) + curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + curSize += padSize + } else { + go func(size, padSize int64, src io.Reader, dst io.WriteCloser) { + ctx.LogD("tx", LEs{ + {"Area", area.Id}, + {"Nice", int(nice)}, + {"Size", size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx area packet to %s (%s) nice: %s", + ctx.AreaName(areaId), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) + areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)} + copy(areaNode.Id[:], area.Id[:]) + copy(areaNode.ExchPub[:], area.Pub[:]) + pktEncRaw, err := PktEncWrite( + ctx.Self, &areaNode, pkt, nice, size, padSize, src, dst, + ) + pktEncRaws <- pktEncRaw + errs <- err + dst.Close() // #nosec G104 + }(curSize, padSize, src, pipeW) + curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + curSize += padSize + pipeRPrev = pipeR + pipeR, pipeW = io.Pipe() + go func(size int64, src io.Reader, dst io.WriteCloser) { + pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:]) + if err != nil { + panic(err) + } + ctx.LogD("tx", LEs{ + {"Node", hops[0].Id}, + {"Nice", int(nice)}, + {"Size", size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (%s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) + pktEncRaw, err := PktEncWrite( + ctx.Self, hops[0], pktArea, nice, size, 0, src, dst, + ) + pktEncRaws <- pktEncRaw + errs <- err + dst.Close() // #nosec G104 + }(curSize, pipeRPrev, pipeW) + curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + } for i := 1; i < len(hops); i++ { - pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:]) + pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:]) + if err != nil { + panic(err) + } pipeRPrev = pipeR pipeR, pipeW = io.Pipe() go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", SDS{ - "node": node.Id, - "nice": int(nice), - "size": size, - }, "trns wrote") - errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) - dst.Close() + ctx.LogD("tx", LEs{ + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx trns packet to %s (%s) nice: %s", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) + pktEncRaw, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + pktEncRaws <- pktEncRaw + errs <- err + dst.Close() // #nosec G104 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW) curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) } go func() { _, err := CopyProgressed( tmp.W, pipeR, "Tx", - SDS{"pkt": pktName, "fullsize": curSize}, + LEs{{"Pkt", pktName}, {"FullSize", curSize}}, ctx.ShowPrgrs, ) errs <- err }() - for i := 0; i <= len(hops); i++ { + var pktEncRaw []byte + var pktEncMsg []byte + if area != nil { + pktEncMsg = <-pktEncRaws + } + for i := 0; i < len(hops); i++ { + pktEncRaw = <-pktEncRaws + } + for i := 0; i <= wrappers; i++ { err = <-errs if err != nil { - tmp.Fd.Close() + tmp.Fd.Close() // #nosec G104 return nil, err } } nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) - os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) + os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104 + if err != nil { + return lastNode, err + } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) + } + if area != nil { + msgHashRaw := blake2b.Sum256(pktEncMsg) + msgHash := Base32Codec.EncodeToString(msgHashRaw[:]) + seenDir := filepath.Join( + ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(), + ) + seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + les := LEs{ + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", size}, + {"Area", areaId}, + {"AreaMsg", msgHash}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Tx area packet to %s (%s) nice: %s, area %s: %s", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + area.Name, + msgHash, + ) + } + if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil { + ctx.LogE("tx-mkdir", les, err, logMsg) + return lastNode, err + } + if fd, err := os.Create(seenPath); err == nil { + fd.Close() + if err = DirSync(seenDir); err != nil { + ctx.LogE("tx-dirsync", les, err, logMsg) + return lastNode, err + } + } + ctx.LogI("tx-area", les, logMsg) + } return lastNode, err } @@ -131,45 +277,63 @@ type DummyCloser struct{} func (dc DummyCloser) Close() error { return nil } -func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) { - if srcPath == "-" { - // Read content from stdin, saving to temporary file, encrypting - // on the fly - src, err := ioutil.TempFile("", "nncp-file") - if err != nil { - rerr = err - return - } - os.Remove(src.Name()) - tmpW := bufio.NewWriter(src) - tmpKey := make([]byte, chacha20poly1305.KeySize) - if _, rerr = rand.Read(tmpKey[:]); rerr != nil { - return - } - aead, err := chacha20poly1305.New(tmpKey) - if err != nil { - rerr = err - return - } - nonce := make([]byte, aead.NonceSize()) - written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW) - if err != nil { - rerr = err - return - } - fileSize = int64(written) - if err = tmpW.Flush(); err != nil { - return +func throughTmpFile(r io.Reader) ( + reader io.Reader, + closer io.Closer, + fileSize int64, + rerr error, +) { + src, err := ioutil.TempFile("", "nncp-file") + if err != nil { + rerr = err + return + } + os.Remove(src.Name()) // #nosec G104 + tmpW := bufio.NewWriter(src) + tmpKey := make([]byte, chacha20poly1305.KeySize) + if _, rerr = rand.Read(tmpKey[:]); rerr != nil { + return + } + aead, err := chacha20poly1305.New(tmpKey) + if err != nil { + rerr = err + return + } + nonce := make([]byte, aead.NonceSize()) + written, err := aeadProcess(aead, nonce, nil, true, r, tmpW) + if err != nil { + rerr = err + return + } + fileSize = int64(written) + if err = tmpW.Flush(); err != nil { + rerr = err + return + } + if _, err = src.Seek(0, io.SeekStart); err != nil { + rerr = err + return + } + r, w := io.Pipe() + go func() { + if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil { + w.CloseWithError(err) // #nosec G104 } - src.Seek(0, io.SeekStart) - r, w := io.Pipe() - go func() { - if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil { - w.CloseWithError(err) - } - }() - reader = r - closer = src + }() + reader = r + closer = src + return +} + +func prepareTxFile(srcPath string) ( + reader io.Reader, + closer io.Closer, + fileSize int64, + archived bool, + rerr error, +) { + if srcPath == "-" { + reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin)) return } @@ -244,7 +408,7 @@ func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize closer = DummyCloser{} fileSize += 2 * TarBlockSize // termination block - go func() { + go func() error { tarWr := tar.NewWriter(w) hdr := tar.Header{ Typeflag: tar.TypeDir, @@ -258,7 +422,7 @@ func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize hdr.Name = basePath + e.path[len(rootPath):] hdr.ModTime = e.modTime if err = tarWr.WriteHeader(&hdr); err != nil { - w.CloseWithError(err) + return w.CloseWithError(err) } } hdr.Typeflag = tar.TypeReg @@ -268,20 +432,23 @@ func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize hdr.ModTime = e.modTime hdr.Size = e.size if err = tarWr.WriteHeader(&hdr); err != nil { - w.CloseWithError(err) + return w.CloseWithError(err) } fd, err := os.Open(e.path) if err != nil { - w.CloseWithError(err) + fd.Close() // #nosec G104 + return w.CloseWithError(err) } - _, err = io.Copy(tarWr, bufio.NewReader(fd)) - if err != nil { - w.CloseWithError(err) + if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil { + fd.Close() // #nosec G104 + return w.CloseWithError(err) } - fd.Close() + fd.Close() // #nosec G104 + } + if err = tarWr.Close(); err != nil { + return w.CloseWithError(err) } - tarWr.Close() - w.Close() + return w.Close() }() return } @@ -292,6 +459,7 @@ func (ctx *Ctx) TxFile( srcPath, dstPath string, chunkSize int64, minSize, maxSize int64, + areaId *AreaId, ) error { dstPathSpecified := false if dstPath == "" { @@ -325,32 +493,41 @@ func (ctx *Ctx) TxFile( if err != nil { return err } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": dstPath, - "size": fileSize, + _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath, areaId) + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", dstPath}, + {"Size", fileSize}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(fileSize)), + ctx.NodeName(node.Id), + dstPath, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } leftSize := fileSize metaPkt := ChunkedMeta{ - Magic: MagicNNCPMv1, + Magic: MagicNNCPMv2.B, FileSize: uint64(fileSize), ChunkSize: uint64(chunkSize), - Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1), + Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1), } for i := int64(0); i < (fileSize/chunkSize)+1; i++ { - hsh := new([32]byte) + hsh := new([MTHSize]byte) metaPkt.Checksums = append(metaPkt.Checksums, *hsh) } var sizeToSend int64 @@ -369,10 +546,7 @@ func (ctx *Ctx) TxFile( if err != nil { return err } - hsh, err = blake2b.New256(nil) - if err != nil { - return err - } + hsh = MTHNew(0, 0) _, err = ctx.Tx( node, pkt, @@ -381,19 +555,29 @@ func (ctx *Ctx) TxFile( minSize, io.TeeReader(reader, hsh), path, + areaId, ) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": path, - "size": sizeToSend, + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", path}, + {"Size", sizeToSend}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(sizeToSend)), + ctx.NodeName(node.Id), + path, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) return err } hsh.Sum(metaPkt.Checksums[chunkNum][:0]) @@ -414,19 +598,28 @@ func (ctx *Ctx) TxFile( return err } metaPktSize := int64(metaBuf.Len()) - _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": path, - "size": metaPktSize, + _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path, areaId) + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", path}, + {"Size", metaPktSize}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(metaPktSize)), + ctx.NodeName(node.Id), + path, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -450,19 +643,26 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath) - sds := SDS{ - "type": "freq", - "node": node.Id, - "nice": int(nice), - "replynice": int(replyNice), - "src": srcPath, - "dst": dstPath, + _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath, nil) + les := LEs{ + {"Type", "freq"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"ReplyNice", int(replyNice)}, + {"Src", srcPath}, + {"Dst", dstPath}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File request from %s:%s to %s sent", + ctx.NodeName(node.Id), srcPath, + dstPath, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -474,58 +674,137 @@ func (ctx *Ctx) TxExec( args []string, in io.Reader, minSize int64, + useTmp bool, + noCompress bool, + areaId *AreaId, ) error { path := make([][]byte, 0, 1+len(args)) path = append(path, []byte(handle)) for _, arg := range args { path = append(path, []byte(arg)) } - pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0})) - if err != nil { - return err + pktType := PktTypeExec + if noCompress { + pktType = PktTypeExecFat } - var compressed bytes.Buffer - compressor, err := zstd.NewWriter( - &compressed, - zstd.WithEncoderLevel(zstd.SpeedDefault), - ) + pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0})) if err != nil { return err } - _, err = io.Copy(compressor, in) - compressor.Close() - if err != nil { - return err + var size int64 + + if !noCompress && !useTmp { + var compressed bytes.Buffer + compressor, err := zstd.NewWriter( + &compressed, + zstd.WithEncoderLevel(zstd.SpeedDefault), + ) + if err != nil { + return err + } + if _, err = io.Copy(compressor, in); err != nil { + compressor.Close() // #nosec G104 + return err + } + if err = compressor.Close(); err != nil { + return err + } + size = int64(compressed.Len()) + _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle, areaId) } - size := int64(compressed.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle) - sds := SDS{ - "type": "exec", - "node": node.Id, - "nice": int(nice), - "replynice": int(replyNice), - "dst": strings.Join(append([]string{handle}, args...), " "), - "size": size, + if noCompress && !useTmp { + var data bytes.Buffer + if _, err = io.Copy(&data, in); err != nil { + return err + } + size = int64(data.Len()) + _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle, areaId) + } + if !noCompress && useTmp { + r, w := io.Pipe() + compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault)) + if err != nil { + return err + } + copyErr := make(chan error) + go func() { + _, err := io.Copy(compressor, in) + if err != nil { + compressor.Close() // #nosec G104 + copyErr <- err + } + err = compressor.Close() + w.Close() + copyErr <- err + }() + tmpReader, closer, fileSize, err := throughTmpFile(r) + if closer != nil { + defer closer.Close() + } + if err != nil { + return err + } + err = <-copyErr + if err != nil { + return err + } + size = fileSize + _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId) + } + if noCompress && useTmp { + tmpReader, closer, fileSize, err := throughTmpFile(in) + if closer != nil { + defer closer.Close() + } + if err != nil { + return err + } + size = fileSize + _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId) + } + + dst := strings.Join(append([]string{handle}, args...), " ") + les := LEs{ + {"Type", "exec"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"ReplyNice", int(replyNice)}, + {"Dst", dst}, + {"Size", size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Exec sent to %s@%s (%s)", + ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error { - sds := SDS{ - "type": "trns", - "node": node.Id, - "nice": int(nice), - "size": size, + les := LEs{ + {"Type", "trns"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Transitional packet to %s (%s) (nice %s)", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) } - ctx.LogD("tx", sds, "taken") + ctx.LogD("tx", les, logMsg) if !ctx.IsEnoughSpace(size) { err := errors.New("is not enough space") - ctx.LogE("tx", sds, err, err.Error()) + ctx.LogE("tx", les, err, logMsg) return err } tmp, err := ctx.NewTmpFileWHash() @@ -534,7 +813,7 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error } if _, err = CopyProgressed( tmp.W, src, "Tx trns", - SDS{"pkt": node.Id.String(), "fullsize": size}, + LEs{{"Pkt", node.Id.String()}, {"FullSize", size}}, ctx.ShowPrgrs, ); err != nil { return err @@ -542,10 +821,10 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error nodePath := filepath.Join(ctx.Spool, node.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent") + ctx.LogI("tx", append(les, LE{"Err", err}), logMsg) } - os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) + os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104 return err }