]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
e096ed65f873ddeb48f9ac635a01bbeea8849ba2
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "fmt"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34         "time"
35
36         xdr "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "github.com/klauspost/compress/zstd"
39         "golang.org/x/crypto/blake2b"
40         "golang.org/x/crypto/chacha20poly1305"
41 )
42
43 const (
44         MaxFileSize = 1 << 62
45
46         TarBlockSize = 512
47         TarExt       = ".tar"
48 )
49
50 func (ctx *Ctx) Tx(
51         node *Node,
52         pkt *Pkt,
53         nice uint8,
54         size, minSize int64,
55         src io.Reader,
56         pktName string,
57         areaId *AreaId,
58 ) (*Node, error) {
59         var area *Area
60         if areaId != nil {
61                 area = ctx.AreaId2Area[*areaId]
62                 if area.Prv == nil {
63                         return nil, errors.New("area has no encryption keys")
64                 }
65         }
66         hops := make([]*Node, 0, 1+len(node.Via))
67         hops = append(hops, node)
68         lastNode := node
69         for i := len(node.Via); i > 0; i-- {
70                 lastNode = ctx.Neigh[*node.Via[i-1]]
71                 hops = append(hops, lastNode)
72         }
73         expectedSize := size
74         wrappers := len(hops)
75         if area != nil {
76                 wrappers++
77         }
78         for i := 0; i < wrappers; i++ {
79                 expectedSize = PktEncOverhead +
80                         PktSizeOverhead +
81                         sizeWithTags(PktOverhead+expectedSize)
82         }
83         padSize := minSize - expectedSize
84         if padSize < 0 {
85                 padSize = 0
86         }
87         if !ctx.IsEnoughSpace(size + padSize) {
88                 return nil, errors.New("is not enough space")
89         }
90         tmp, err := ctx.NewTmpFileWHash()
91         if err != nil {
92                 return nil, err
93         }
94
95         errs := make(chan error)
96         pktEncRaws := make(chan []byte)
97         curSize := size
98         pipeR, pipeW := io.Pipe()
99         var pipeRPrev io.Reader
100         if area == nil {
101                 go func(size int64, src io.Reader, dst io.WriteCloser) {
102                         ctx.LogD("tx", LEs{
103                                 {"Node", hops[0].Id},
104                                 {"Nice", int(nice)},
105                                 {"Size", size},
106                         }, func(les LEs) string {
107                                 return fmt.Sprintf(
108                                         "Tx packet to %s (%s) nice: %s",
109                                         ctx.NodeName(hops[0].Id),
110                                         humanize.IBytes(uint64(size)),
111                                         NicenessFmt(nice),
112                                 )
113                         })
114                         pktEncRaw, err := PktEncWrite(
115                                 ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
116                         )
117                         pktEncRaws <- pktEncRaw
118                         errs <- err
119                         dst.Close()
120                 }(curSize, src, pipeW)
121                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
122                 curSize += padSize
123         } else {
124                 go func(size, padSize int64, src io.Reader, dst io.WriteCloser) {
125                         ctx.LogD("tx", LEs{
126                                 {"Area", area.Id},
127                                 {"Nice", int(nice)},
128                                 {"Size", size},
129                         }, func(les LEs) string {
130                                 return fmt.Sprintf(
131                                         "Tx area packet to %s (%s) nice: %s",
132                                         ctx.AreaName(areaId),
133                                         humanize.IBytes(uint64(size)),
134                                         NicenessFmt(nice),
135                                 )
136                         })
137                         areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
138                         copy(areaNode.Id[:], area.Id[:])
139                         copy(areaNode.ExchPub[:], area.Pub[:])
140                         pktEncRaw, err := PktEncWrite(
141                                 ctx.Self, &areaNode, pkt, nice, size, padSize, src, dst,
142                         )
143                         pktEncRaws <- pktEncRaw
144                         errs <- err
145                         dst.Close()
146                 }(curSize, padSize, src, pipeW)
147                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
148                 curSize += padSize
149                 pipeRPrev = pipeR
150                 pipeR, pipeW = io.Pipe()
151                 go func(size int64, src io.Reader, dst io.WriteCloser) {
152                         pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
153                         if err != nil {
154                                 panic(err)
155                         }
156                         ctx.LogD("tx", LEs{
157                                 {"Node", hops[0].Id},
158                                 {"Nice", int(nice)},
159                                 {"Size", size},
160                         }, func(les LEs) string {
161                                 return fmt.Sprintf(
162                                         "Tx packet to %s (%s) nice: %s",
163                                         ctx.NodeName(hops[0].Id),
164                                         humanize.IBytes(uint64(size)),
165                                         NicenessFmt(nice),
166                                 )
167                         })
168                         pktEncRaw, err := PktEncWrite(
169                                 ctx.Self, hops[0], pktArea, nice, size, 0, src, dst,
170                         )
171                         pktEncRaws <- pktEncRaw
172                         errs <- err
173                         dst.Close()
174                 }(curSize, pipeRPrev, pipeW)
175                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
176         }
177         for i := 1; i < len(hops); i++ {
178                 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
179                 if err != nil {
180                         panic(err)
181                 }
182                 pipeRPrev = pipeR
183                 pipeR, pipeW = io.Pipe()
184                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
185                         ctx.LogD("tx", LEs{
186                                 {"Node", node.Id},
187                                 {"Nice", int(nice)},
188                                 {"Size", size},
189                         }, func(les LEs) string {
190                                 return fmt.Sprintf(
191                                         "Tx trns packet to %s (%s) nice: %s",
192                                         ctx.NodeName(node.Id),
193                                         humanize.IBytes(uint64(size)),
194                                         NicenessFmt(nice),
195                                 )
196                         })
197                         pktEncRaw, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
198                         pktEncRaws <- pktEncRaw
199                         errs <- err
200                         dst.Close()
201                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
202                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
203         }
204         go func() {
205                 _, err := CopyProgressed(
206                         tmp.W, pipeR, "Tx",
207                         LEs{{"Pkt", pktName}, {"FullSize", curSize}},
208                         ctx.ShowPrgrs,
209                 )
210                 errs <- err
211         }()
212         var pktEncRaw []byte
213         var pktEncMsg []byte
214         if area != nil {
215                 pktEncMsg = <-pktEncRaws
216         }
217         for i := 0; i < len(hops); i++ {
218                 pktEncRaw = <-pktEncRaws
219         }
220         for i := 0; i <= wrappers; i++ {
221                 err = <-errs
222                 if err != nil {
223                         tmp.Fd.Close()
224                         return nil, err
225                 }
226         }
227         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
228         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
229         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
230         if err != nil {
231                 return lastNode, err
232         }
233         if ctx.HdrUsage {
234                 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
235         }
236         if area != nil {
237                 msgHashRaw := blake2b.Sum256(pktEncMsg)
238                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
239                 seenDir := filepath.Join(
240                         ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
241                 )
242                 seenPath := filepath.Join(seenDir, msgHash)
243                 les := LEs{
244                         {"Node", node.Id},
245                         {"Nice", int(nice)},
246                         {"Size", size},
247                         {"Area", areaId},
248                         {"AreaMsg", msgHash},
249                 }
250                 logMsg := func(les LEs) string {
251                         return fmt.Sprintf(
252                                 "Tx area packet to %s (%s) nice: %s, area %s: %s",
253                                 ctx.NodeName(node.Id),
254                                 humanize.IBytes(uint64(size)),
255                                 NicenessFmt(nice),
256                                 area.Name,
257                                 msgHash,
258                         )
259                 }
260                 if err = ensureDir(seenDir); err != nil {
261                         ctx.LogE("tx-mkdir", les, err, logMsg)
262                         return lastNode, err
263                 }
264                 if fd, err := os.Create(seenPath); err == nil {
265                         fd.Close()
266                         if err = DirSync(seenDir); err != nil {
267                                 ctx.LogE("tx-dirsync", les, err, logMsg)
268                                 return lastNode, err
269                         }
270                 }
271                 ctx.LogI("tx-area", les, logMsg)
272         }
273         return lastNode, err
274 }
275
276 type DummyCloser struct{}
277
278 func (dc DummyCloser) Close() error { return nil }
279
280 func throughTmpFile(r io.Reader) (
281         reader io.Reader,
282         closer io.Closer,
283         fileSize int64,
284         rerr error,
285 ) {
286         src, err := ioutil.TempFile("", "nncp-file")
287         if err != nil {
288                 rerr = err
289                 return
290         }
291         os.Remove(src.Name())
292         tmpW := bufio.NewWriter(src)
293         tmpKey := make([]byte, chacha20poly1305.KeySize)
294         if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
295                 return
296         }
297         aead, err := chacha20poly1305.New(tmpKey)
298         if err != nil {
299                 rerr = err
300                 return
301         }
302         nonce := make([]byte, aead.NonceSize())
303         written, err := aeadProcess(aead, nonce, nil, true, r, tmpW)
304         if err != nil {
305                 rerr = err
306                 return
307         }
308         fileSize = int64(written)
309         if err = tmpW.Flush(); err != nil {
310                 rerr = err
311                 return
312         }
313         if _, err = src.Seek(0, io.SeekStart); err != nil {
314                 rerr = err
315                 return
316         }
317         r, w := io.Pipe()
318         go func() {
319                 for i := 0; i < aead.NonceSize(); i++ {
320                         nonce[i] = 0
321                 }
322                 if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil {
323                         w.CloseWithError(err)
324                 }
325         }()
326         reader = r
327         closer = src
328         return
329 }
330
331 func prepareTxFile(srcPath string) (
332         reader io.Reader,
333         closer io.Closer,
334         fileSize int64,
335         archived bool,
336         rerr error,
337 ) {
338         if srcPath == "-" {
339                 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
340                 return
341         }
342
343         srcStat, err := os.Stat(srcPath)
344         if err != nil {
345                 rerr = err
346                 return
347         }
348         mode := srcStat.Mode()
349
350         if mode.IsRegular() {
351                 // It is regular file, just send it
352                 src, err := os.Open(srcPath)
353                 if err != nil {
354                         rerr = err
355                         return
356                 }
357                 fileSize = srcStat.Size()
358                 reader = bufio.NewReader(src)
359                 closer = src
360                 return
361         }
362
363         if !mode.IsDir() {
364                 rerr = errors.New("unsupported file type")
365                 return
366         }
367
368         // It is directory, create PAX archive with its contents
369         archived = true
370         basePath := filepath.Base(srcPath)
371         rootPath, err := filepath.Abs(srcPath)
372         if err != nil {
373                 rerr = err
374                 return
375         }
376         type einfo struct {
377                 path    string
378                 modTime time.Time
379                 size    int64
380         }
381         dirs := make([]einfo, 0, 1<<10)
382         files := make([]einfo, 0, 1<<10)
383         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
384                 if err != nil {
385                         return err
386                 }
387                 if info.IsDir() {
388                         // directory header, PAX record header+contents
389                         fileSize += TarBlockSize + 2*TarBlockSize
390                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
391                 } else {
392                         // file header, PAX record header+contents, file content
393                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
394                         if n := info.Size() % TarBlockSize; n != 0 {
395                                 fileSize += TarBlockSize - n // padding
396                         }
397                         files = append(files, einfo{
398                                 path:    path,
399                                 modTime: info.ModTime(),
400                                 size:    info.Size(),
401                         })
402                 }
403                 return nil
404         })
405         if rerr != nil {
406                 return
407         }
408
409         r, w := io.Pipe()
410         reader = r
411         closer = DummyCloser{}
412         fileSize += 2 * TarBlockSize // termination block
413
414         go func() error {
415                 tarWr := tar.NewWriter(w)
416                 hdr := tar.Header{
417                         Typeflag: tar.TypeDir,
418                         Mode:     0777,
419                         PAXRecords: map[string]string{
420                                 "comment": "Autogenerated by " + VersionGet(),
421                         },
422                         Format: tar.FormatPAX,
423                 }
424                 for _, e := range dirs {
425                         hdr.Name = basePath + e.path[len(rootPath):]
426                         hdr.ModTime = e.modTime
427                         if err = tarWr.WriteHeader(&hdr); err != nil {
428                                 return w.CloseWithError(err)
429                         }
430                 }
431                 hdr.Typeflag = tar.TypeReg
432                 hdr.Mode = 0666
433                 for _, e := range files {
434                         hdr.Name = basePath + e.path[len(rootPath):]
435                         hdr.ModTime = e.modTime
436                         hdr.Size = e.size
437                         if err = tarWr.WriteHeader(&hdr); err != nil {
438                                 return w.CloseWithError(err)
439                         }
440                         fd, err := os.Open(e.path)
441                         if err != nil {
442                                 fd.Close()
443                                 return w.CloseWithError(err)
444                         }
445                         if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
446                                 fd.Close()
447                                 return w.CloseWithError(err)
448                         }
449                         fd.Close()
450                 }
451                 if err = tarWr.Close(); err != nil {
452                         return w.CloseWithError(err)
453                 }
454                 return w.Close()
455         }()
456         return
457 }
458
459 func (ctx *Ctx) TxFile(
460         node *Node,
461         nice uint8,
462         srcPath, dstPath string,
463         chunkSize int64,
464         minSize, maxSize int64,
465         areaId *AreaId,
466 ) error {
467         dstPathSpecified := false
468         if dstPath == "" {
469                 if srcPath == "-" {
470                         return errors.New("Must provide destination filename")
471                 }
472                 dstPath = filepath.Base(srcPath)
473         } else {
474                 dstPathSpecified = true
475         }
476         dstPath = filepath.Clean(dstPath)
477         if filepath.IsAbs(dstPath) {
478                 return errors.New("Relative destination path required")
479         }
480         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
481         if closer != nil {
482                 defer closer.Close()
483         }
484         if err != nil {
485                 return err
486         }
487         if fileSize > maxSize {
488                 return errors.New("Too big than allowed")
489         }
490         if archived && !dstPathSpecified {
491                 dstPath += TarExt
492         }
493
494         if fileSize <= chunkSize {
495                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
496                 if err != nil {
497                         return err
498                 }
499                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath, areaId)
500                 les := LEs{
501                         {"Type", "file"},
502                         {"Node", node.Id},
503                         {"Nice", int(nice)},
504                         {"Src", srcPath},
505                         {"Dst", dstPath},
506                         {"Size", fileSize},
507                 }
508                 logMsg := func(les LEs) string {
509                         return fmt.Sprintf(
510                                 "File %s (%s) sent to %s:%s",
511                                 srcPath,
512                                 humanize.IBytes(uint64(fileSize)),
513                                 ctx.NodeName(node.Id),
514                                 dstPath,
515                         )
516                 }
517                 if err == nil {
518                         ctx.LogI("tx", les, logMsg)
519                 } else {
520                         ctx.LogE("tx", les, err, logMsg)
521                 }
522                 return err
523         }
524
525         leftSize := fileSize
526         metaPkt := ChunkedMeta{
527                 Magic:     MagicNNCPMv2.B,
528                 FileSize:  uint64(fileSize),
529                 ChunkSize: uint64(chunkSize),
530                 Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
531         }
532         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
533                 hsh := new([MTHSize]byte)
534                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
535         }
536         var sizeToSend int64
537         var hsh hash.Hash
538         var pkt *Pkt
539         var chunkNum int
540         var path string
541         for {
542                 if leftSize <= chunkSize {
543                         sizeToSend = leftSize
544                 } else {
545                         sizeToSend = chunkSize
546                 }
547                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
548                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
549                 if err != nil {
550                         return err
551                 }
552                 hsh = MTHNew(0, 0)
553                 _, err = ctx.Tx(
554                         node,
555                         pkt,
556                         nice,
557                         sizeToSend,
558                         minSize,
559                         io.TeeReader(reader, hsh),
560                         path,
561                         areaId,
562                 )
563                 les := LEs{
564                         {"Type", "file"},
565                         {"Node", node.Id},
566                         {"Nice", int(nice)},
567                         {"Src", srcPath},
568                         {"Dst", path},
569                         {"Size", sizeToSend},
570                 }
571                 logMsg := func(les LEs) string {
572                         return fmt.Sprintf(
573                                 "File %s (%s) sent to %s:%s",
574                                 srcPath,
575                                 humanize.IBytes(uint64(sizeToSend)),
576                                 ctx.NodeName(node.Id),
577                                 path,
578                         )
579                 }
580                 if err == nil {
581                         ctx.LogI("tx", les, logMsg)
582                 } else {
583                         ctx.LogE("tx", les, err, logMsg)
584                         return err
585                 }
586                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
587                 leftSize -= sizeToSend
588                 chunkNum++
589                 if leftSize == 0 {
590                         break
591                 }
592         }
593         var metaBuf bytes.Buffer
594         _, err = xdr.Marshal(&metaBuf, metaPkt)
595         if err != nil {
596                 return err
597         }
598         path = dstPath + ChunkedSuffixMeta
599         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
600         if err != nil {
601                 return err
602         }
603         metaPktSize := int64(metaBuf.Len())
604         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path, areaId)
605         les := LEs{
606                 {"Type", "file"},
607                 {"Node", node.Id},
608                 {"Nice", int(nice)},
609                 {"Src", srcPath},
610                 {"Dst", path},
611                 {"Size", metaPktSize},
612         }
613         logMsg := func(les LEs) string {
614                 return fmt.Sprintf(
615                         "File %s (%s) sent to %s:%s",
616                         srcPath,
617                         humanize.IBytes(uint64(metaPktSize)),
618                         ctx.NodeName(node.Id),
619                         path,
620                 )
621         }
622         if err == nil {
623                 ctx.LogI("tx", les, logMsg)
624         } else {
625                 ctx.LogE("tx", les, err, logMsg)
626         }
627         return err
628 }
629
630 func (ctx *Ctx) TxFreq(
631         node *Node,
632         nice, replyNice uint8,
633         srcPath, dstPath string,
634         minSize int64) error {
635         dstPath = filepath.Clean(dstPath)
636         if filepath.IsAbs(dstPath) {
637                 return errors.New("Relative destination path required")
638         }
639         srcPath = filepath.Clean(srcPath)
640         if filepath.IsAbs(srcPath) {
641                 return errors.New("Relative source path required")
642         }
643         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
644         if err != nil {
645                 return err
646         }
647         src := strings.NewReader(dstPath)
648         size := int64(src.Len())
649         _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath, nil)
650         les := LEs{
651                 {"Type", "freq"},
652                 {"Node", node.Id},
653                 {"Nice", int(nice)},
654                 {"ReplyNice", int(replyNice)},
655                 {"Src", srcPath},
656                 {"Dst", dstPath},
657         }
658         logMsg := func(les LEs) string {
659                 return fmt.Sprintf(
660                         "File request from %s:%s to %s sent",
661                         ctx.NodeName(node.Id), srcPath,
662                         dstPath,
663                 )
664         }
665         if err == nil {
666                 ctx.LogI("tx", les, logMsg)
667         } else {
668                 ctx.LogE("tx", les, err, logMsg)
669         }
670         return err
671 }
672
673 func (ctx *Ctx) TxExec(
674         node *Node,
675         nice, replyNice uint8,
676         handle string,
677         args []string,
678         in io.Reader,
679         minSize int64,
680         useTmp bool,
681         noCompress bool,
682         areaId *AreaId,
683 ) error {
684         path := make([][]byte, 0, 1+len(args))
685         path = append(path, []byte(handle))
686         for _, arg := range args {
687                 path = append(path, []byte(arg))
688         }
689         pktType := PktTypeExec
690         if noCompress {
691                 pktType = PktTypeExecFat
692         }
693         pkt, rerr := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
694         if rerr != nil {
695                 return rerr
696         }
697         var size int64
698
699         if !noCompress && !useTmp {
700                 var compressed bytes.Buffer
701                 compressor, err := zstd.NewWriter(
702                         &compressed,
703                         zstd.WithEncoderLevel(zstd.SpeedDefault),
704                 )
705                 if err != nil {
706                         return err
707                 }
708                 if _, err = io.Copy(compressor, in); err != nil {
709                         compressor.Close()
710                         return err
711                 }
712                 if err = compressor.Close(); err != nil {
713                         return err
714                 }
715                 size = int64(compressed.Len())
716                 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle, areaId)
717         }
718         if noCompress && !useTmp {
719                 var data bytes.Buffer
720                 if _, err := io.Copy(&data, in); err != nil {
721                         return err
722                 }
723                 size = int64(data.Len())
724                 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &data, handle, areaId)
725         }
726         if !noCompress && useTmp {
727                 r, w := io.Pipe()
728                 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
729                 if err != nil {
730                         return err
731                 }
732                 copyErr := make(chan error)
733                 go func() {
734                         _, err := io.Copy(compressor, in)
735                         if err != nil {
736                                 compressor.Close()
737                                 copyErr <- err
738                         }
739                         err = compressor.Close()
740                         w.Close()
741                         copyErr <- err
742                 }()
743                 tmpReader, closer, fileSize, err := throughTmpFile(r)
744                 if closer != nil {
745                         defer closer.Close()
746                 }
747                 if err != nil {
748                         return err
749                 }
750                 err = <-copyErr
751                 if err != nil {
752                         return err
753                 }
754                 size = fileSize
755                 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId)
756         }
757         if noCompress && useTmp {
758                 tmpReader, closer, fileSize, err := throughTmpFile(in)
759                 if closer != nil {
760                         defer closer.Close()
761                 }
762                 if err != nil {
763                         return err
764                 }
765                 size = fileSize
766                 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId)
767         }
768
769         dst := strings.Join(append([]string{handle}, args...), " ")
770         les := LEs{
771                 {"Type", "exec"},
772                 {"Node", node.Id},
773                 {"Nice", int(nice)},
774                 {"ReplyNice", int(replyNice)},
775                 {"Dst", dst},
776                 {"Size", size},
777         }
778         logMsg := func(les LEs) string {
779                 return fmt.Sprintf(
780                         "Exec sent to %s@%s (%s)",
781                         ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
782                 )
783         }
784         if rerr == nil {
785                 ctx.LogI("tx", les, logMsg)
786         } else {
787                 ctx.LogE("tx", les, rerr, logMsg)
788         }
789         return rerr
790 }
791
792 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
793         les := LEs{
794                 {"Type", "trns"},
795                 {"Node", node.Id},
796                 {"Nice", int(nice)},
797                 {"Size", size},
798         }
799         logMsg := func(les LEs) string {
800                 return fmt.Sprintf(
801                         "Transitional packet to %s (%s) (nice %s)",
802                         ctx.NodeName(node.Id),
803                         humanize.IBytes(uint64(size)),
804                         NicenessFmt(nice),
805                 )
806         }
807         ctx.LogD("tx", les, logMsg)
808         if !ctx.IsEnoughSpace(size) {
809                 err := errors.New("is not enough space")
810                 ctx.LogE("tx", les, err, logMsg)
811                 return err
812         }
813         tmp, err := ctx.NewTmpFileWHash()
814         if err != nil {
815                 return err
816         }
817         if _, err = CopyProgressed(
818                 tmp.W, src, "Tx trns",
819                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
820                 ctx.ShowPrgrs,
821         ); err != nil {
822                 return err
823         }
824         nodePath := filepath.Join(ctx.Spool, node.Id.String())
825         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
826         if err == nil {
827                 ctx.LogI("tx", les, logMsg)
828         } else {
829                 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
830         }
831         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
832         return err
833 }