- size := int64(compressed.Len())
- _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
- sds := SDS{
- "type": "exec",
- "node": node.Id,
- "nice": strconv.Itoa(int(nice)),
- "replynice": strconv.Itoa(int(replyNice)),
- "dst": strings.Join(append([]string{handle}, args...), " "),
- "size": strconv.FormatInt(size, 10),
+ if noCompress && !useTmp {
+ var data bytes.Buffer
+ if _, err = io.Copy(&data, in); err != nil {
+ return err
+ }
+ size = int64(data.Len())
+ _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
+ }
+ if !noCompress && useTmp {
+ r, w := io.Pipe()
+ compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
+ if err != nil {
+ return err
+ }
+ copyErr := make(chan error)
+ go func() {
+ _, err := io.Copy(compressor, in)
+ if err != nil {
+ compressor.Close() // #nosec G104
+ copyErr <- err
+ }
+ err = compressor.Close()
+ w.Close()
+ copyErr <- err
+ }()
+ tmpReader, closer, fileSize, err := throughTmpFile(r)
+ if closer != nil {
+ defer closer.Close()
+ }
+ if err != nil {
+ return err
+ }
+ err = <-copyErr
+ if err != nil {
+ return err
+ }
+ size = fileSize
+ _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
+ }
+ if noCompress && useTmp {
+ tmpReader, closer, fileSize, err := throughTmpFile(in)
+ if closer != nil {
+ defer closer.Close()
+ }
+ if err != nil {
+ return err
+ }
+ size = fileSize
+ _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
+ }
+
+ dst := strings.Join(append([]string{handle}, args...), " ")
+ les := LEs{
+ {"Type", "exec"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"ReplyNice", int(replyNice)},
+ {"Dst", dst},
+ {"Size", size},
+ }
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Exec sent to %s@%s (%s)",
+ ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
+ )