]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Do Mkdir syscall only if directory does not exist
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "fmt"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34         "time"
35
36         xdr "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "github.com/klauspost/compress/zstd"
39         "golang.org/x/crypto/blake2b"
40         "golang.org/x/crypto/chacha20poly1305"
41 )
42
43 const (
44         MaxFileSize = 1 << 62
45
46         TarBlockSize = 512
47         TarExt       = ".tar"
48 )
49
50 func (ctx *Ctx) Tx(
51         node *Node,
52         pkt *Pkt,
53         nice uint8,
54         size, minSize int64,
55         src io.Reader,
56         pktName string,
57         areaId *AreaId,
58 ) (*Node, error) {
59         var area *Area
60         if areaId != nil {
61                 area = ctx.AreaId2Area[*areaId]
62                 if area.Prv == nil {
63                         return nil, errors.New("area has no encryption keys")
64                 }
65         }
66         hops := make([]*Node, 0, 1+len(node.Via))
67         hops = append(hops, node)
68         lastNode := node
69         for i := len(node.Via); i > 0; i-- {
70                 lastNode = ctx.Neigh[*node.Via[i-1]]
71                 hops = append(hops, lastNode)
72         }
73         expectedSize := size
74         wrappers := len(hops)
75         if area != nil {
76                 wrappers++
77         }
78         for i := 0; i < wrappers; i++ {
79                 expectedSize = PktEncOverhead +
80                         PktSizeOverhead +
81                         sizeWithTags(PktOverhead+expectedSize)
82         }
83         padSize := minSize - expectedSize
84         if padSize < 0 {
85                 padSize = 0
86         }
87         if !ctx.IsEnoughSpace(size + padSize) {
88                 return nil, errors.New("is not enough space")
89         }
90         tmp, err := ctx.NewTmpFileWHash()
91         if err != nil {
92                 return nil, err
93         }
94
95         errs := make(chan error)
96         pktEncRaws := make(chan []byte)
97         curSize := size
98         pipeR, pipeW := io.Pipe()
99         var pipeRPrev io.Reader
100         if area == nil {
101                 go func(size int64, src io.Reader, dst io.WriteCloser) {
102                         ctx.LogD("tx", LEs{
103                                 {"Node", hops[0].Id},
104                                 {"Nice", int(nice)},
105                                 {"Size", size},
106                         }, func(les LEs) string {
107                                 return fmt.Sprintf(
108                                         "Tx packet to %s (%s) nice: %s",
109                                         ctx.NodeName(hops[0].Id),
110                                         humanize.IBytes(uint64(size)),
111                                         NicenessFmt(nice),
112                                 )
113                         })
114                         pktEncRaw, err := PktEncWrite(
115                                 ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
116                         )
117                         pktEncRaws <- pktEncRaw
118                         errs <- err
119                         dst.Close()
120                 }(curSize, src, pipeW)
121                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
122                 curSize += padSize
123         } else {
124                 go func(size, padSize int64, src io.Reader, dst io.WriteCloser) {
125                         ctx.LogD("tx", LEs{
126                                 {"Area", area.Id},
127                                 {"Nice", int(nice)},
128                                 {"Size", size},
129                         }, func(les LEs) string {
130                                 return fmt.Sprintf(
131                                         "Tx area packet to %s (%s) nice: %s",
132                                         ctx.AreaName(areaId),
133                                         humanize.IBytes(uint64(size)),
134                                         NicenessFmt(nice),
135                                 )
136                         })
137                         areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
138                         copy(areaNode.Id[:], area.Id[:])
139                         copy(areaNode.ExchPub[:], area.Pub[:])
140                         pktEncRaw, err := PktEncWrite(
141                                 ctx.Self, &areaNode, pkt, nice, size, padSize, src, dst,
142                         )
143                         pktEncRaws <- pktEncRaw
144                         errs <- err
145                         dst.Close()
146                 }(curSize, padSize, src, pipeW)
147                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
148                 curSize += padSize
149                 pipeRPrev = pipeR
150                 pipeR, pipeW = io.Pipe()
151                 go func(size int64, src io.Reader, dst io.WriteCloser) {
152                         pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
153                         if err != nil {
154                                 panic(err)
155                         }
156                         ctx.LogD("tx", LEs{
157                                 {"Node", hops[0].Id},
158                                 {"Nice", int(nice)},
159                                 {"Size", size},
160                         }, func(les LEs) string {
161                                 return fmt.Sprintf(
162                                         "Tx packet to %s (%s) nice: %s",
163                                         ctx.NodeName(hops[0].Id),
164                                         humanize.IBytes(uint64(size)),
165                                         NicenessFmt(nice),
166                                 )
167                         })
168                         pktEncRaw, err := PktEncWrite(
169                                 ctx.Self, hops[0], pktArea, nice, size, 0, src, dst,
170                         )
171                         pktEncRaws <- pktEncRaw
172                         errs <- err
173                         dst.Close()
174                 }(curSize, pipeRPrev, pipeW)
175                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
176         }
177         for i := 1; i < len(hops); i++ {
178                 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
179                 if err != nil {
180                         panic(err)
181                 }
182                 pipeRPrev = pipeR
183                 pipeR, pipeW = io.Pipe()
184                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
185                         ctx.LogD("tx", LEs{
186                                 {"Node", node.Id},
187                                 {"Nice", int(nice)},
188                                 {"Size", size},
189                         }, func(les LEs) string {
190                                 return fmt.Sprintf(
191                                         "Tx trns packet to %s (%s) nice: %s",
192                                         ctx.NodeName(node.Id),
193                                         humanize.IBytes(uint64(size)),
194                                         NicenessFmt(nice),
195                                 )
196                         })
197                         pktEncRaw, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
198                         pktEncRaws <- pktEncRaw
199                         errs <- err
200                         dst.Close()
201                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
202                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
203         }
204         go func() {
205                 _, err := CopyProgressed(
206                         tmp.W, pipeR, "Tx",
207                         LEs{{"Pkt", pktName}, {"FullSize", curSize}},
208                         ctx.ShowPrgrs,
209                 )
210                 errs <- err
211         }()
212         var pktEncRaw []byte
213         var pktEncMsg []byte
214         if area != nil {
215                 pktEncMsg = <-pktEncRaws
216         }
217         for i := 0; i < len(hops); i++ {
218                 pktEncRaw = <-pktEncRaws
219         }
220         for i := 0; i <= wrappers; i++ {
221                 err = <-errs
222                 if err != nil {
223                         tmp.Fd.Close()
224                         return nil, err
225                 }
226         }
227         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
228         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
229         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
230         if err != nil {
231                 return lastNode, err
232         }
233         if ctx.HdrUsage {
234                 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
235         }
236         if area != nil {
237                 msgHashRaw := blake2b.Sum256(pktEncMsg)
238                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
239                 seenDir := filepath.Join(
240                         ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
241                 )
242                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
243                 les := LEs{
244                         {"Node", node.Id},
245                         {"Nice", int(nice)},
246                         {"Size", size},
247                         {"Area", areaId},
248                         {"AreaMsg", msgHash},
249                 }
250                 logMsg := func(les LEs) string {
251                         return fmt.Sprintf(
252                                 "Tx area packet to %s (%s) nice: %s, area %s: %s",
253                                 ctx.NodeName(node.Id),
254                                 humanize.IBytes(uint64(size)),
255                                 NicenessFmt(nice),
256                                 area.Name,
257                                 msgHash,
258                         )
259                 }
260                 if _, err = os.Stat(seenDir); err != nil {
261                         if !os.IsNotExist(err) {
262                                 ctx.LogE("tx-mkdir", les, err, logMsg)
263                                 return lastNode, err
264                         }
265                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
266                                 ctx.LogE("tx-mkdir", les, err, logMsg)
267                                 return lastNode, err
268                         }
269                 }
270                 if fd, err := os.Create(seenPath); err == nil {
271                         fd.Close()
272                         if err = DirSync(seenDir); err != nil {
273                                 ctx.LogE("tx-dirsync", les, err, logMsg)
274                                 return lastNode, err
275                         }
276                 }
277                 ctx.LogI("tx-area", les, logMsg)
278         }
279         return lastNode, err
280 }
281
282 type DummyCloser struct{}
283
284 func (dc DummyCloser) Close() error { return nil }
285
286 func throughTmpFile(r io.Reader) (
287         reader io.Reader,
288         closer io.Closer,
289         fileSize int64,
290         rerr error,
291 ) {
292         src, err := ioutil.TempFile("", "nncp-file")
293         if err != nil {
294                 rerr = err
295                 return
296         }
297         os.Remove(src.Name())
298         tmpW := bufio.NewWriter(src)
299         tmpKey := make([]byte, chacha20poly1305.KeySize)
300         if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
301                 return
302         }
303         aead, err := chacha20poly1305.New(tmpKey)
304         if err != nil {
305                 rerr = err
306                 return
307         }
308         nonce := make([]byte, aead.NonceSize())
309         written, err := aeadProcess(aead, nonce, nil, true, r, tmpW)
310         if err != nil {
311                 rerr = err
312                 return
313         }
314         fileSize = int64(written)
315         if err = tmpW.Flush(); err != nil {
316                 rerr = err
317                 return
318         }
319         if _, err = src.Seek(0, io.SeekStart); err != nil {
320                 rerr = err
321                 return
322         }
323         r, w := io.Pipe()
324         go func() {
325                 for i := 0; i < aead.NonceSize(); i++ {
326                         nonce[i] = 0
327                 }
328                 if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil {
329                         w.CloseWithError(err)
330                 }
331         }()
332         reader = r
333         closer = src
334         return
335 }
336
337 func prepareTxFile(srcPath string) (
338         reader io.Reader,
339         closer io.Closer,
340         fileSize int64,
341         archived bool,
342         rerr error,
343 ) {
344         if srcPath == "-" {
345                 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
346                 return
347         }
348
349         srcStat, err := os.Stat(srcPath)
350         if err != nil {
351                 rerr = err
352                 return
353         }
354         mode := srcStat.Mode()
355
356         if mode.IsRegular() {
357                 // It is regular file, just send it
358                 src, err := os.Open(srcPath)
359                 if err != nil {
360                         rerr = err
361                         return
362                 }
363                 fileSize = srcStat.Size()
364                 reader = bufio.NewReader(src)
365                 closer = src
366                 return
367         }
368
369         if !mode.IsDir() {
370                 rerr = errors.New("unsupported file type")
371                 return
372         }
373
374         // It is directory, create PAX archive with its contents
375         archived = true
376         basePath := filepath.Base(srcPath)
377         rootPath, err := filepath.Abs(srcPath)
378         if err != nil {
379                 rerr = err
380                 return
381         }
382         type einfo struct {
383                 path    string
384                 modTime time.Time
385                 size    int64
386         }
387         dirs := make([]einfo, 0, 1<<10)
388         files := make([]einfo, 0, 1<<10)
389         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
390                 if err != nil {
391                         return err
392                 }
393                 if info.IsDir() {
394                         // directory header, PAX record header+contents
395                         fileSize += TarBlockSize + 2*TarBlockSize
396                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
397                 } else {
398                         // file header, PAX record header+contents, file content
399                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
400                         if n := info.Size() % TarBlockSize; n != 0 {
401                                 fileSize += TarBlockSize - n // padding
402                         }
403                         files = append(files, einfo{
404                                 path:    path,
405                                 modTime: info.ModTime(),
406                                 size:    info.Size(),
407                         })
408                 }
409                 return nil
410         })
411         if rerr != nil {
412                 return
413         }
414
415         r, w := io.Pipe()
416         reader = r
417         closer = DummyCloser{}
418         fileSize += 2 * TarBlockSize // termination block
419
420         go func() error {
421                 tarWr := tar.NewWriter(w)
422                 hdr := tar.Header{
423                         Typeflag: tar.TypeDir,
424                         Mode:     0777,
425                         PAXRecords: map[string]string{
426                                 "comment": "Autogenerated by " + VersionGet(),
427                         },
428                         Format: tar.FormatPAX,
429                 }
430                 for _, e := range dirs {
431                         hdr.Name = basePath + e.path[len(rootPath):]
432                         hdr.ModTime = e.modTime
433                         if err = tarWr.WriteHeader(&hdr); err != nil {
434                                 return w.CloseWithError(err)
435                         }
436                 }
437                 hdr.Typeflag = tar.TypeReg
438                 hdr.Mode = 0666
439                 for _, e := range files {
440                         hdr.Name = basePath + e.path[len(rootPath):]
441                         hdr.ModTime = e.modTime
442                         hdr.Size = e.size
443                         if err = tarWr.WriteHeader(&hdr); err != nil {
444                                 return w.CloseWithError(err)
445                         }
446                         fd, err := os.Open(e.path)
447                         if err != nil {
448                                 fd.Close()
449                                 return w.CloseWithError(err)
450                         }
451                         if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
452                                 fd.Close()
453                                 return w.CloseWithError(err)
454                         }
455                         fd.Close()
456                 }
457                 if err = tarWr.Close(); err != nil {
458                         return w.CloseWithError(err)
459                 }
460                 return w.Close()
461         }()
462         return
463 }
464
465 func (ctx *Ctx) TxFile(
466         node *Node,
467         nice uint8,
468         srcPath, dstPath string,
469         chunkSize int64,
470         minSize, maxSize int64,
471         areaId *AreaId,
472 ) error {
473         dstPathSpecified := false
474         if dstPath == "" {
475                 if srcPath == "-" {
476                         return errors.New("Must provide destination filename")
477                 }
478                 dstPath = filepath.Base(srcPath)
479         } else {
480                 dstPathSpecified = true
481         }
482         dstPath = filepath.Clean(dstPath)
483         if filepath.IsAbs(dstPath) {
484                 return errors.New("Relative destination path required")
485         }
486         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
487         if closer != nil {
488                 defer closer.Close()
489         }
490         if err != nil {
491                 return err
492         }
493         if fileSize > maxSize {
494                 return errors.New("Too big than allowed")
495         }
496         if archived && !dstPathSpecified {
497                 dstPath += TarExt
498         }
499
500         if fileSize <= chunkSize {
501                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
502                 if err != nil {
503                         return err
504                 }
505                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath, areaId)
506                 les := LEs{
507                         {"Type", "file"},
508                         {"Node", node.Id},
509                         {"Nice", int(nice)},
510                         {"Src", srcPath},
511                         {"Dst", dstPath},
512                         {"Size", fileSize},
513                 }
514                 logMsg := func(les LEs) string {
515                         return fmt.Sprintf(
516                                 "File %s (%s) sent to %s:%s",
517                                 srcPath,
518                                 humanize.IBytes(uint64(fileSize)),
519                                 ctx.NodeName(node.Id),
520                                 dstPath,
521                         )
522                 }
523                 if err == nil {
524                         ctx.LogI("tx", les, logMsg)
525                 } else {
526                         ctx.LogE("tx", les, err, logMsg)
527                 }
528                 return err
529         }
530
531         leftSize := fileSize
532         metaPkt := ChunkedMeta{
533                 Magic:     MagicNNCPMv2.B,
534                 FileSize:  uint64(fileSize),
535                 ChunkSize: uint64(chunkSize),
536                 Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
537         }
538         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
539                 hsh := new([MTHSize]byte)
540                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
541         }
542         var sizeToSend int64
543         var hsh hash.Hash
544         var pkt *Pkt
545         var chunkNum int
546         var path string
547         for {
548                 if leftSize <= chunkSize {
549                         sizeToSend = leftSize
550                 } else {
551                         sizeToSend = chunkSize
552                 }
553                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
554                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
555                 if err != nil {
556                         return err
557                 }
558                 hsh = MTHNew(0, 0)
559                 _, err = ctx.Tx(
560                         node,
561                         pkt,
562                         nice,
563                         sizeToSend,
564                         minSize,
565                         io.TeeReader(reader, hsh),
566                         path,
567                         areaId,
568                 )
569                 les := LEs{
570                         {"Type", "file"},
571                         {"Node", node.Id},
572                         {"Nice", int(nice)},
573                         {"Src", srcPath},
574                         {"Dst", path},
575                         {"Size", sizeToSend},
576                 }
577                 logMsg := func(les LEs) string {
578                         return fmt.Sprintf(
579                                 "File %s (%s) sent to %s:%s",
580                                 srcPath,
581                                 humanize.IBytes(uint64(sizeToSend)),
582                                 ctx.NodeName(node.Id),
583                                 path,
584                         )
585                 }
586                 if err == nil {
587                         ctx.LogI("tx", les, logMsg)
588                 } else {
589                         ctx.LogE("tx", les, err, logMsg)
590                         return err
591                 }
592                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
593                 leftSize -= sizeToSend
594                 chunkNum++
595                 if leftSize == 0 {
596                         break
597                 }
598         }
599         var metaBuf bytes.Buffer
600         _, err = xdr.Marshal(&metaBuf, metaPkt)
601         if err != nil {
602                 return err
603         }
604         path = dstPath + ChunkedSuffixMeta
605         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
606         if err != nil {
607                 return err
608         }
609         metaPktSize := int64(metaBuf.Len())
610         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path, areaId)
611         les := LEs{
612                 {"Type", "file"},
613                 {"Node", node.Id},
614                 {"Nice", int(nice)},
615                 {"Src", srcPath},
616                 {"Dst", path},
617                 {"Size", metaPktSize},
618         }
619         logMsg := func(les LEs) string {
620                 return fmt.Sprintf(
621                         "File %s (%s) sent to %s:%s",
622                         srcPath,
623                         humanize.IBytes(uint64(metaPktSize)),
624                         ctx.NodeName(node.Id),
625                         path,
626                 )
627         }
628         if err == nil {
629                 ctx.LogI("tx", les, logMsg)
630         } else {
631                 ctx.LogE("tx", les, err, logMsg)
632         }
633         return err
634 }
635
636 func (ctx *Ctx) TxFreq(
637         node *Node,
638         nice, replyNice uint8,
639         srcPath, dstPath string,
640         minSize int64) error {
641         dstPath = filepath.Clean(dstPath)
642         if filepath.IsAbs(dstPath) {
643                 return errors.New("Relative destination path required")
644         }
645         srcPath = filepath.Clean(srcPath)
646         if filepath.IsAbs(srcPath) {
647                 return errors.New("Relative source path required")
648         }
649         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
650         if err != nil {
651                 return err
652         }
653         src := strings.NewReader(dstPath)
654         size := int64(src.Len())
655         _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath, nil)
656         les := LEs{
657                 {"Type", "freq"},
658                 {"Node", node.Id},
659                 {"Nice", int(nice)},
660                 {"ReplyNice", int(replyNice)},
661                 {"Src", srcPath},
662                 {"Dst", dstPath},
663         }
664         logMsg := func(les LEs) string {
665                 return fmt.Sprintf(
666                         "File request from %s:%s to %s sent",
667                         ctx.NodeName(node.Id), srcPath,
668                         dstPath,
669                 )
670         }
671         if err == nil {
672                 ctx.LogI("tx", les, logMsg)
673         } else {
674                 ctx.LogE("tx", les, err, logMsg)
675         }
676         return err
677 }
678
679 func (ctx *Ctx) TxExec(
680         node *Node,
681         nice, replyNice uint8,
682         handle string,
683         args []string,
684         in io.Reader,
685         minSize int64,
686         useTmp bool,
687         noCompress bool,
688         areaId *AreaId,
689 ) error {
690         path := make([][]byte, 0, 1+len(args))
691         path = append(path, []byte(handle))
692         for _, arg := range args {
693                 path = append(path, []byte(arg))
694         }
695         pktType := PktTypeExec
696         if noCompress {
697                 pktType = PktTypeExecFat
698         }
699         pkt, rerr := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
700         if rerr != nil {
701                 return rerr
702         }
703         var size int64
704
705         if !noCompress && !useTmp {
706                 var compressed bytes.Buffer
707                 compressor, err := zstd.NewWriter(
708                         &compressed,
709                         zstd.WithEncoderLevel(zstd.SpeedDefault),
710                 )
711                 if err != nil {
712                         return err
713                 }
714                 if _, err = io.Copy(compressor, in); err != nil {
715                         compressor.Close()
716                         return err
717                 }
718                 if err = compressor.Close(); err != nil {
719                         return err
720                 }
721                 size = int64(compressed.Len())
722                 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle, areaId)
723         }
724         if noCompress && !useTmp {
725                 var data bytes.Buffer
726                 if _, err := io.Copy(&data, in); err != nil {
727                         return err
728                 }
729                 size = int64(data.Len())
730                 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &data, handle, areaId)
731         }
732         if !noCompress && useTmp {
733                 r, w := io.Pipe()
734                 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
735                 if err != nil {
736                         return err
737                 }
738                 copyErr := make(chan error)
739                 go func() {
740                         _, err := io.Copy(compressor, in)
741                         if err != nil {
742                                 compressor.Close()
743                                 copyErr <- err
744                         }
745                         err = compressor.Close()
746                         w.Close()
747                         copyErr <- err
748                 }()
749                 tmpReader, closer, fileSize, err := throughTmpFile(r)
750                 if closer != nil {
751                         defer closer.Close()
752                 }
753                 if err != nil {
754                         return err
755                 }
756                 err = <-copyErr
757                 if err != nil {
758                         return err
759                 }
760                 size = fileSize
761                 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId)
762         }
763         if noCompress && useTmp {
764                 tmpReader, closer, fileSize, err := throughTmpFile(in)
765                 if closer != nil {
766                         defer closer.Close()
767                 }
768                 if err != nil {
769                         return err
770                 }
771                 size = fileSize
772                 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId)
773         }
774
775         dst := strings.Join(append([]string{handle}, args...), " ")
776         les := LEs{
777                 {"Type", "exec"},
778                 {"Node", node.Id},
779                 {"Nice", int(nice)},
780                 {"ReplyNice", int(replyNice)},
781                 {"Dst", dst},
782                 {"Size", size},
783         }
784         logMsg := func(les LEs) string {
785                 return fmt.Sprintf(
786                         "Exec sent to %s@%s (%s)",
787                         ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
788                 )
789         }
790         if rerr == nil {
791                 ctx.LogI("tx", les, logMsg)
792         } else {
793                 ctx.LogE("tx", les, rerr, logMsg)
794         }
795         return rerr
796 }
797
798 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
799         les := LEs{
800                 {"Type", "trns"},
801                 {"Node", node.Id},
802                 {"Nice", int(nice)},
803                 {"Size", size},
804         }
805         logMsg := func(les LEs) string {
806                 return fmt.Sprintf(
807                         "Transitional packet to %s (%s) (nice %s)",
808                         ctx.NodeName(node.Id),
809                         humanize.IBytes(uint64(size)),
810                         NicenessFmt(nice),
811                 )
812         }
813         ctx.LogD("tx", les, logMsg)
814         if !ctx.IsEnoughSpace(size) {
815                 err := errors.New("is not enough space")
816                 ctx.LogE("tx", les, err, logMsg)
817                 return err
818         }
819         tmp, err := ctx.NewTmpFileWHash()
820         if err != nil {
821                 return err
822         }
823         if _, err = CopyProgressed(
824                 tmp.W, src, "Tx trns",
825                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
826                 ctx.ShowPrgrs,
827         ); err != nil {
828                 return err
829         }
830         nodePath := filepath.Join(ctx.Spool, node.Id.String())
831         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
832         if err == nil {
833                 ctx.LogI("tx", les, logMsg)
834         } else {
835                 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
836         }
837         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
838         return err
839 }