]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Multicast areas
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "fmt"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34         "time"
35
36         xdr "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "github.com/klauspost/compress/zstd"
39         "golang.org/x/crypto/blake2b"
40         "golang.org/x/crypto/chacha20poly1305"
41 )
42
43 const (
44         MaxFileSize = 1 << 62
45
46         TarBlockSize = 512
47         TarExt       = ".tar"
48 )
49
50 func (ctx *Ctx) Tx(
51         node *Node,
52         pkt *Pkt,
53         nice uint8,
54         size, minSize int64,
55         src io.Reader,
56         pktName string,
57         areaId *AreaId,
58 ) (*Node, error) {
59         var area *Area
60         if areaId != nil {
61                 area = ctx.AreaId2Area[*areaId]
62                 if area.Prv == nil {
63                         return nil, errors.New("unknown area id")
64                 }
65         }
66         hops := make([]*Node, 0, 1+len(node.Via))
67         hops = append(hops, node)
68         lastNode := node
69         for i := len(node.Via); i > 0; i-- {
70                 lastNode = ctx.Neigh[*node.Via[i-1]]
71                 hops = append(hops, lastNode)
72         }
73         expectedSize := size
74         wrappers := len(hops)
75         if area != nil {
76                 wrappers++
77         }
78         for i := 0; i < wrappers; i++ {
79                 expectedSize = PktEncOverhead +
80                         PktSizeOverhead +
81                         sizeWithTags(PktOverhead+expectedSize)
82         }
83         padSize := minSize - expectedSize
84         if padSize < 0 {
85                 padSize = 0
86         }
87         if !ctx.IsEnoughSpace(size + padSize) {
88                 return nil, errors.New("is not enough space")
89         }
90         tmp, err := ctx.NewTmpFileWHash()
91         if err != nil {
92                 return nil, err
93         }
94
95         errs := make(chan error)
96         pktEncRaws := make(chan []byte)
97         curSize := size
98         pipeR, pipeW := io.Pipe()
99         var pipeRPrev io.Reader
100         if area == nil {
101                 go func(size int64, src io.Reader, dst io.WriteCloser) {
102                         ctx.LogD("tx", LEs{
103                                 {"Node", hops[0].Id},
104                                 {"Nice", int(nice)},
105                                 {"Size", size},
106                         }, func(les LEs) string {
107                                 return fmt.Sprintf(
108                                         "Tx packet to %s (%s) nice: %s",
109                                         ctx.NodeName(hops[0].Id),
110                                         humanize.IBytes(uint64(size)),
111                                         NicenessFmt(nice),
112                                 )
113                         })
114                         pktEncRaw, err := PktEncWrite(
115                                 ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
116                         )
117                         pktEncRaws <- pktEncRaw
118                         errs <- err
119                         dst.Close() // #nosec G104
120                 }(curSize, src, pipeW)
121                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
122                 curSize += padSize
123         } else {
124                 go func(size, padSize int64, src io.Reader, dst io.WriteCloser) {
125                         ctx.LogD("tx", LEs{
126                                 {"Area", area.Id},
127                                 {"Nice", int(nice)},
128                                 {"Size", size},
129                         }, func(les LEs) string {
130                                 return fmt.Sprintf(
131                                         "Tx area packet to %s (%s) nice: %s",
132                                         ctx.AreaName(areaId),
133                                         humanize.IBytes(uint64(size)),
134                                         NicenessFmt(nice),
135                                 )
136                         })
137                         areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
138                         copy(areaNode.Id[:], area.Id[:])
139                         copy(areaNode.ExchPub[:], area.Pub[:])
140                         pktEncRaw, err := PktEncWrite(
141                                 ctx.Self, &areaNode, pkt, nice, size, padSize, src, dst,
142                         )
143                         pktEncRaws <- pktEncRaw
144                         errs <- err
145                         dst.Close() // #nosec G104
146                 }(curSize, padSize, src, pipeW)
147                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
148                 curSize += padSize
149                 pipeRPrev = pipeR
150                 pipeR, pipeW = io.Pipe()
151                 go func(size int64, src io.Reader, dst io.WriteCloser) {
152                         pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
153                         if err != nil {
154                                 panic(err)
155                         }
156                         ctx.LogD("tx", LEs{
157                                 {"Node", hops[0].Id},
158                                 {"Nice", int(nice)},
159                                 {"Size", size},
160                         }, func(les LEs) string {
161                                 return fmt.Sprintf(
162                                         "Tx packet to %s (%s) nice: %s",
163                                         ctx.NodeName(hops[0].Id),
164                                         humanize.IBytes(uint64(size)),
165                                         NicenessFmt(nice),
166                                 )
167                         })
168                         pktEncRaw, err := PktEncWrite(
169                                 ctx.Self, hops[0], pktArea, nice, size, 0, src, dst,
170                         )
171                         pktEncRaws <- pktEncRaw
172                         errs <- err
173                         dst.Close() // #nosec G104
174                 }(curSize, pipeRPrev, pipeW)
175                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
176         }
177         for i := 1; i < len(hops); i++ {
178                 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
179                 if err != nil {
180                         panic(err)
181                 }
182                 pipeRPrev = pipeR
183                 pipeR, pipeW = io.Pipe()
184                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
185                         ctx.LogD("tx", LEs{
186                                 {"Node", node.Id},
187                                 {"Nice", int(nice)},
188                                 {"Size", size},
189                         }, func(les LEs) string {
190                                 return fmt.Sprintf(
191                                         "Tx trns packet to %s (%s) nice: %s",
192                                         ctx.NodeName(node.Id),
193                                         humanize.IBytes(uint64(size)),
194                                         NicenessFmt(nice),
195                                 )
196                         })
197                         pktEncRaw, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
198                         pktEncRaws <- pktEncRaw
199                         errs <- err
200                         dst.Close() // #nosec G104
201                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
202                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
203         }
204         go func() {
205                 _, err := CopyProgressed(
206                         tmp.W, pipeR, "Tx",
207                         LEs{{"Pkt", pktName}, {"FullSize", curSize}},
208                         ctx.ShowPrgrs,
209                 )
210                 errs <- err
211         }()
212         var pktEncRaw []byte
213         var pktEncMsg []byte
214         if area != nil {
215                 pktEncMsg = <-pktEncRaws
216         }
217         for i := 0; i < len(hops); i++ {
218                 pktEncRaw = <-pktEncRaws
219         }
220         for i := 0; i <= wrappers; i++ {
221                 err = <-errs
222                 if err != nil {
223                         tmp.Fd.Close() // #nosec G104
224                         return nil, err
225                 }
226         }
227         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
228         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
229         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
230         if err != nil {
231                 return lastNode, err
232         }
233         if ctx.HdrUsage {
234                 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
235         }
236         if area != nil {
237                 msgHashRaw := blake2b.Sum256(pktEncMsg)
238                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
239                 seenDir := filepath.Join(
240                         ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
241                 )
242                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
243                 les := LEs{
244                         {"Node", node.Id},
245                         {"Nice", int(nice)},
246                         {"Size", size},
247                         {"Area", areaId},
248                         {"AreaMsg", msgHash},
249                 }
250                 logMsg := func(les LEs) string {
251                         return fmt.Sprintf(
252                                 "Tx area packet to %s (%s) nice: %s, area %s: %s",
253                                 ctx.NodeName(node.Id),
254                                 humanize.IBytes(uint64(size)),
255                                 NicenessFmt(nice),
256                                 area.Name,
257                                 msgHash,
258                         )
259                 }
260                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
261                         ctx.LogE("tx-mkdir", les, err, logMsg)
262                         return lastNode, err
263                 }
264                 if fd, err := os.Create(seenPath); err == nil {
265                         fd.Close()
266                         if err = DirSync(seenDir); err != nil {
267                                 ctx.LogE("tx-dirsync", les, err, logMsg)
268                                 return lastNode, err
269                         }
270                 }
271                 ctx.LogI("tx-area", les, logMsg)
272         }
273         return lastNode, err
274 }
275
276 type DummyCloser struct{}
277
278 func (dc DummyCloser) Close() error { return nil }
279
280 func throughTmpFile(r io.Reader) (
281         reader io.Reader,
282         closer io.Closer,
283         fileSize int64,
284         rerr error,
285 ) {
286         src, err := ioutil.TempFile("", "nncp-file")
287         if err != nil {
288                 rerr = err
289                 return
290         }
291         os.Remove(src.Name()) // #nosec G104
292         tmpW := bufio.NewWriter(src)
293         tmpKey := make([]byte, chacha20poly1305.KeySize)
294         if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
295                 return
296         }
297         aead, err := chacha20poly1305.New(tmpKey)
298         if err != nil {
299                 rerr = err
300                 return
301         }
302         nonce := make([]byte, aead.NonceSize())
303         written, err := aeadProcess(aead, nonce, nil, true, r, tmpW)
304         if err != nil {
305                 rerr = err
306                 return
307         }
308         fileSize = int64(written)
309         if err = tmpW.Flush(); err != nil {
310                 rerr = err
311                 return
312         }
313         if _, err = src.Seek(0, io.SeekStart); err != nil {
314                 rerr = err
315                 return
316         }
317         r, w := io.Pipe()
318         go func() {
319                 if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil {
320                         w.CloseWithError(err) // #nosec G104
321                 }
322         }()
323         reader = r
324         closer = src
325         return
326 }
327
328 func prepareTxFile(srcPath string) (
329         reader io.Reader,
330         closer io.Closer,
331         fileSize int64,
332         archived bool,
333         rerr error,
334 ) {
335         if srcPath == "-" {
336                 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
337                 return
338         }
339
340         srcStat, err := os.Stat(srcPath)
341         if err != nil {
342                 rerr = err
343                 return
344         }
345         mode := srcStat.Mode()
346
347         if mode.IsRegular() {
348                 // It is regular file, just send it
349                 src, err := os.Open(srcPath)
350                 if err != nil {
351                         rerr = err
352                         return
353                 }
354                 fileSize = srcStat.Size()
355                 reader = bufio.NewReader(src)
356                 closer = src
357                 return
358         }
359
360         if !mode.IsDir() {
361                 rerr = errors.New("unsupported file type")
362                 return
363         }
364
365         // It is directory, create PAX archive with its contents
366         archived = true
367         basePath := filepath.Base(srcPath)
368         rootPath, err := filepath.Abs(srcPath)
369         if err != nil {
370                 rerr = err
371                 return
372         }
373         type einfo struct {
374                 path    string
375                 modTime time.Time
376                 size    int64
377         }
378         dirs := make([]einfo, 0, 1<<10)
379         files := make([]einfo, 0, 1<<10)
380         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
381                 if err != nil {
382                         return err
383                 }
384                 if info.IsDir() {
385                         // directory header, PAX record header+contents
386                         fileSize += TarBlockSize + 2*TarBlockSize
387                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
388                 } else {
389                         // file header, PAX record header+contents, file content
390                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
391                         if n := info.Size() % TarBlockSize; n != 0 {
392                                 fileSize += TarBlockSize - n // padding
393                         }
394                         files = append(files, einfo{
395                                 path:    path,
396                                 modTime: info.ModTime(),
397                                 size:    info.Size(),
398                         })
399                 }
400                 return nil
401         })
402         if rerr != nil {
403                 return
404         }
405
406         r, w := io.Pipe()
407         reader = r
408         closer = DummyCloser{}
409         fileSize += 2 * TarBlockSize // termination block
410
411         go func() error {
412                 tarWr := tar.NewWriter(w)
413                 hdr := tar.Header{
414                         Typeflag: tar.TypeDir,
415                         Mode:     0777,
416                         PAXRecords: map[string]string{
417                                 "comment": "Autogenerated by " + VersionGet(),
418                         },
419                         Format: tar.FormatPAX,
420                 }
421                 for _, e := range dirs {
422                         hdr.Name = basePath + e.path[len(rootPath):]
423                         hdr.ModTime = e.modTime
424                         if err = tarWr.WriteHeader(&hdr); err != nil {
425                                 return w.CloseWithError(err)
426                         }
427                 }
428                 hdr.Typeflag = tar.TypeReg
429                 hdr.Mode = 0666
430                 for _, e := range files {
431                         hdr.Name = basePath + e.path[len(rootPath):]
432                         hdr.ModTime = e.modTime
433                         hdr.Size = e.size
434                         if err = tarWr.WriteHeader(&hdr); err != nil {
435                                 return w.CloseWithError(err)
436                         }
437                         fd, err := os.Open(e.path)
438                         if err != nil {
439                                 fd.Close() // #nosec G104
440                                 return w.CloseWithError(err)
441                         }
442                         if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
443                                 fd.Close() // #nosec G104
444                                 return w.CloseWithError(err)
445                         }
446                         fd.Close() // #nosec G104
447                 }
448                 if err = tarWr.Close(); err != nil {
449                         return w.CloseWithError(err)
450                 }
451                 return w.Close()
452         }()
453         return
454 }
455
456 func (ctx *Ctx) TxFile(
457         node *Node,
458         nice uint8,
459         srcPath, dstPath string,
460         chunkSize int64,
461         minSize, maxSize int64,
462         areaId *AreaId,
463 ) error {
464         dstPathSpecified := false
465         if dstPath == "" {
466                 if srcPath == "-" {
467                         return errors.New("Must provide destination filename")
468                 }
469                 dstPath = filepath.Base(srcPath)
470         } else {
471                 dstPathSpecified = true
472         }
473         dstPath = filepath.Clean(dstPath)
474         if filepath.IsAbs(dstPath) {
475                 return errors.New("Relative destination path required")
476         }
477         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
478         if closer != nil {
479                 defer closer.Close()
480         }
481         if err != nil {
482                 return err
483         }
484         if fileSize > maxSize {
485                 return errors.New("Too big than allowed")
486         }
487         if archived && !dstPathSpecified {
488                 dstPath += TarExt
489         }
490
491         if fileSize <= chunkSize {
492                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
493                 if err != nil {
494                         return err
495                 }
496                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath, areaId)
497                 les := LEs{
498                         {"Type", "file"},
499                         {"Node", node.Id},
500                         {"Nice", int(nice)},
501                         {"Src", srcPath},
502                         {"Dst", dstPath},
503                         {"Size", fileSize},
504                 }
505                 logMsg := func(les LEs) string {
506                         return fmt.Sprintf(
507                                 "File %s (%s) sent to %s:%s",
508                                 srcPath,
509                                 humanize.IBytes(uint64(fileSize)),
510                                 ctx.NodeName(node.Id),
511                                 dstPath,
512                         )
513                 }
514                 if err == nil {
515                         ctx.LogI("tx", les, logMsg)
516                 } else {
517                         ctx.LogE("tx", les, err, logMsg)
518                 }
519                 return err
520         }
521
522         leftSize := fileSize
523         metaPkt := ChunkedMeta{
524                 Magic:     MagicNNCPMv2.B,
525                 FileSize:  uint64(fileSize),
526                 ChunkSize: uint64(chunkSize),
527                 Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
528         }
529         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
530                 hsh := new([MTHSize]byte)
531                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
532         }
533         var sizeToSend int64
534         var hsh hash.Hash
535         var pkt *Pkt
536         var chunkNum int
537         var path string
538         for {
539                 if leftSize <= chunkSize {
540                         sizeToSend = leftSize
541                 } else {
542                         sizeToSend = chunkSize
543                 }
544                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
545                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
546                 if err != nil {
547                         return err
548                 }
549                 hsh = MTHNew(0, 0)
550                 _, err = ctx.Tx(
551                         node,
552                         pkt,
553                         nice,
554                         sizeToSend,
555                         minSize,
556                         io.TeeReader(reader, hsh),
557                         path,
558                         areaId,
559                 )
560                 les := LEs{
561                         {"Type", "file"},
562                         {"Node", node.Id},
563                         {"Nice", int(nice)},
564                         {"Src", srcPath},
565                         {"Dst", path},
566                         {"Size", sizeToSend},
567                 }
568                 logMsg := func(les LEs) string {
569                         return fmt.Sprintf(
570                                 "File %s (%s) sent to %s:%s",
571                                 srcPath,
572                                 humanize.IBytes(uint64(sizeToSend)),
573                                 ctx.NodeName(node.Id),
574                                 path,
575                         )
576                 }
577                 if err == nil {
578                         ctx.LogI("tx", les, logMsg)
579                 } else {
580                         ctx.LogE("tx", les, err, logMsg)
581                         return err
582                 }
583                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
584                 leftSize -= sizeToSend
585                 chunkNum++
586                 if leftSize == 0 {
587                         break
588                 }
589         }
590         var metaBuf bytes.Buffer
591         _, err = xdr.Marshal(&metaBuf, metaPkt)
592         if err != nil {
593                 return err
594         }
595         path = dstPath + ChunkedSuffixMeta
596         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
597         if err != nil {
598                 return err
599         }
600         metaPktSize := int64(metaBuf.Len())
601         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path, areaId)
602         les := LEs{
603                 {"Type", "file"},
604                 {"Node", node.Id},
605                 {"Nice", int(nice)},
606                 {"Src", srcPath},
607                 {"Dst", path},
608                 {"Size", metaPktSize},
609         }
610         logMsg := func(les LEs) string {
611                 return fmt.Sprintf(
612                         "File %s (%s) sent to %s:%s",
613                         srcPath,
614                         humanize.IBytes(uint64(metaPktSize)),
615                         ctx.NodeName(node.Id),
616                         path,
617                 )
618         }
619         if err == nil {
620                 ctx.LogI("tx", les, logMsg)
621         } else {
622                 ctx.LogE("tx", les, err, logMsg)
623         }
624         return err
625 }
626
627 func (ctx *Ctx) TxFreq(
628         node *Node,
629         nice, replyNice uint8,
630         srcPath, dstPath string,
631         minSize int64) error {
632         dstPath = filepath.Clean(dstPath)
633         if filepath.IsAbs(dstPath) {
634                 return errors.New("Relative destination path required")
635         }
636         srcPath = filepath.Clean(srcPath)
637         if filepath.IsAbs(srcPath) {
638                 return errors.New("Relative source path required")
639         }
640         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
641         if err != nil {
642                 return err
643         }
644         src := strings.NewReader(dstPath)
645         size := int64(src.Len())
646         _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath, nil)
647         les := LEs{
648                 {"Type", "freq"},
649                 {"Node", node.Id},
650                 {"Nice", int(nice)},
651                 {"ReplyNice", int(replyNice)},
652                 {"Src", srcPath},
653                 {"Dst", dstPath},
654         }
655         logMsg := func(les LEs) string {
656                 return fmt.Sprintf(
657                         "File request from %s:%s to %s sent",
658                         ctx.NodeName(node.Id), srcPath,
659                         dstPath,
660                 )
661         }
662         if err == nil {
663                 ctx.LogI("tx", les, logMsg)
664         } else {
665                 ctx.LogE("tx", les, err, logMsg)
666         }
667         return err
668 }
669
670 func (ctx *Ctx) TxExec(
671         node *Node,
672         nice, replyNice uint8,
673         handle string,
674         args []string,
675         in io.Reader,
676         minSize int64,
677         useTmp bool,
678         noCompress bool,
679         areaId *AreaId,
680 ) error {
681         path := make([][]byte, 0, 1+len(args))
682         path = append(path, []byte(handle))
683         for _, arg := range args {
684                 path = append(path, []byte(arg))
685         }
686         pktType := PktTypeExec
687         if noCompress {
688                 pktType = PktTypeExecFat
689         }
690         pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
691         if err != nil {
692                 return err
693         }
694         var size int64
695
696         if !noCompress && !useTmp {
697                 var compressed bytes.Buffer
698                 compressor, err := zstd.NewWriter(
699                         &compressed,
700                         zstd.WithEncoderLevel(zstd.SpeedDefault),
701                 )
702                 if err != nil {
703                         return err
704                 }
705                 if _, err = io.Copy(compressor, in); err != nil {
706                         compressor.Close() // #nosec G104
707                         return err
708                 }
709                 if err = compressor.Close(); err != nil {
710                         return err
711                 }
712                 size = int64(compressed.Len())
713                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle, areaId)
714         }
715         if noCompress && !useTmp {
716                 var data bytes.Buffer
717                 if _, err = io.Copy(&data, in); err != nil {
718                         return err
719                 }
720                 size = int64(data.Len())
721                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle, areaId)
722         }
723         if !noCompress && useTmp {
724                 r, w := io.Pipe()
725                 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
726                 if err != nil {
727                         return err
728                 }
729                 copyErr := make(chan error)
730                 go func() {
731                         _, err := io.Copy(compressor, in)
732                         if err != nil {
733                                 compressor.Close() // #nosec G104
734                                 copyErr <- err
735                         }
736                         err = compressor.Close()
737                         w.Close()
738                         copyErr <- err
739                 }()
740                 tmpReader, closer, fileSize, err := throughTmpFile(r)
741                 if closer != nil {
742                         defer closer.Close()
743                 }
744                 if err != nil {
745                         return err
746                 }
747                 err = <-copyErr
748                 if err != nil {
749                         return err
750                 }
751                 size = fileSize
752                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId)
753         }
754         if noCompress && useTmp {
755                 tmpReader, closer, fileSize, err := throughTmpFile(in)
756                 if closer != nil {
757                         defer closer.Close()
758                 }
759                 if err != nil {
760                         return err
761                 }
762                 size = fileSize
763                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId)
764         }
765
766         dst := strings.Join(append([]string{handle}, args...), " ")
767         les := LEs{
768                 {"Type", "exec"},
769                 {"Node", node.Id},
770                 {"Nice", int(nice)},
771                 {"ReplyNice", int(replyNice)},
772                 {"Dst", dst},
773                 {"Size", size},
774         }
775         logMsg := func(les LEs) string {
776                 return fmt.Sprintf(
777                         "Exec sent to %s@%s (%s)",
778                         ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
779                 )
780         }
781         if err == nil {
782                 ctx.LogI("tx", les, logMsg)
783         } else {
784                 ctx.LogE("tx", les, err, logMsg)
785         }
786         return err
787 }
788
789 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
790         les := LEs{
791                 {"Type", "trns"},
792                 {"Node", node.Id},
793                 {"Nice", int(nice)},
794                 {"Size", size},
795         }
796         logMsg := func(les LEs) string {
797                 return fmt.Sprintf(
798                         "Transitional packet to %s (%s) (nice %s)",
799                         ctx.NodeName(node.Id),
800                         humanize.IBytes(uint64(size)),
801                         NicenessFmt(nice),
802                 )
803         }
804         ctx.LogD("tx", les, logMsg)
805         if !ctx.IsEnoughSpace(size) {
806                 err := errors.New("is not enough space")
807                 ctx.LogE("tx", les, err, logMsg)
808                 return err
809         }
810         tmp, err := ctx.NewTmpFileWHash()
811         if err != nil {
812                 return err
813         }
814         if _, err = CopyProgressed(
815                 tmp.W, src, "Tx trns",
816                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
817                 ctx.ShowPrgrs,
818         ); err != nil {
819                 return err
820         }
821         nodePath := filepath.Join(ctx.Spool, node.Id.String())
822         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
823         if err == nil {
824                 ctx.LogI("tx", les, logMsg)
825         } else {
826                 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
827         }
828         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
829         return err
830 }