]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Use explicitly larger bufio's buffer
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "errors"
25         "fmt"
26         "io"
27         "os"
28         "path/filepath"
29         "strconv"
30         "strings"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/klauspost/compress/zstd"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 const (
40         MaxFileSize = 1 << 62
41
42         TarBlockSize = 512
43         TarExt       = ".tar"
44 )
45
46 type PktEncWriteResult struct {
47         pktEncRaw []byte
48         size      int64
49         err       error
50 }
51
52 func (ctx *Ctx) Tx(
53         node *Node,
54         pkt *Pkt,
55         nice uint8,
56         srcSize, minSize, maxSize int64,
57         src io.Reader,
58         pktName string,
59         areaId *AreaId,
60 ) (*Node, int64, string, error) {
61         var area *Area
62         if areaId != nil {
63                 area = ctx.AreaId2Area[*areaId]
64                 if area.Prv == nil {
65                         return nil, 0, "", errors.New("area has no encryption keys")
66                 }
67         }
68         hops := make([]*Node, 0, 1+len(node.Via))
69         hops = append(hops, node)
70         lastNode := node
71         for i := len(node.Via); i > 0; i-- {
72                 lastNode = ctx.Neigh[*node.Via[i-1]]
73                 hops = append(hops, lastNode)
74         }
75         wrappers := len(hops)
76         if area != nil {
77                 wrappers++
78         }
79         var expectedSize int64
80         if srcSize > 0 {
81                 expectedSize = srcSize + PktOverhead
82                 expectedSize += sizePadCalc(expectedSize, minSize, wrappers)
83                 expectedSize = PktEncOverhead + sizeWithTags(expectedSize)
84                 if maxSize != 0 && expectedSize > maxSize {
85                         return nil, 0, "", TooBig
86                 }
87                 if !ctx.IsEnoughSpace(expectedSize) {
88                         return nil, 0, "", errors.New("is not enough space")
89                 }
90         }
91         tmp, err := ctx.NewTmpFileWHash()
92         if err != nil {
93                 return nil, 0, "", err
94         }
95
96         results := make(chan PktEncWriteResult)
97         pipeR, pipeW := io.Pipe()
98         var pipeRPrev io.Reader
99         if area == nil {
100                 go func(src io.Reader, dst io.WriteCloser) {
101                         ctx.LogD("tx", LEs{
102                                 {"Node", hops[0].Id},
103                                 {"Nice", int(nice)},
104                                 {"Size", expectedSize},
105                         }, func(les LEs) string {
106                                 return fmt.Sprintf(
107                                         "Tx packet to %s (source %s) nice: %s",
108                                         ctx.NodeName(hops[0].Id),
109                                         humanize.IBytes(uint64(expectedSize)),
110                                         NicenessFmt(nice),
111                                 )
112                         })
113                         pktEncRaw, size, err := PktEncWrite(
114                                 ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst,
115                         )
116                         results <- PktEncWriteResult{pktEncRaw, size, err}
117                         dst.Close()
118                 }(src, pipeW)
119         } else {
120                 go func(src io.Reader, dst io.WriteCloser) {
121                         ctx.LogD("tx", LEs{
122                                 {"Area", area.Id},
123                                 {"Nice", int(nice)},
124                                 {"Size", expectedSize},
125                         }, func(les LEs) string {
126                                 return fmt.Sprintf(
127                                         "Tx area packet to %s (source %s) nice: %s",
128                                         ctx.AreaName(areaId),
129                                         humanize.IBytes(uint64(expectedSize)),
130                                         NicenessFmt(nice),
131                                 )
132                         })
133                         areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
134                         copy(areaNode.Id[:], area.Id[:])
135                         copy(areaNode.ExchPub[:], area.Pub[:])
136                         pktEncRaw, size, err := PktEncWrite(
137                                 ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst,
138                         )
139                         results <- PktEncWriteResult{pktEncRaw, size, err}
140                         dst.Close()
141                 }(src, pipeW)
142                 pipeRPrev = pipeR
143                 pipeR, pipeW = io.Pipe()
144                 go func(src io.Reader, dst io.WriteCloser) {
145                         pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
146                         if err != nil {
147                                 panic(err)
148                         }
149                         ctx.LogD("tx", LEs{
150                                 {"Node", hops[0].Id},
151                                 {"Nice", int(nice)},
152                                 {"Size", expectedSize},
153                         }, func(les LEs) string {
154                                 return fmt.Sprintf(
155                                         "Tx packet to %s (source %s) nice: %s",
156                                         ctx.NodeName(hops[0].Id),
157                                         humanize.IBytes(uint64(expectedSize)),
158                                         NicenessFmt(nice),
159                                 )
160                         })
161                         pktEncRaw, size, err := PktEncWrite(
162                                 ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst,
163                         )
164                         results <- PktEncWriteResult{pktEncRaw, size, err}
165                         dst.Close()
166                 }(pipeRPrev, pipeW)
167         }
168         for i := 1; i < len(hops); i++ {
169                 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
170                 if err != nil {
171                         panic(err)
172                 }
173                 pipeRPrev = pipeR
174                 pipeR, pipeW = io.Pipe()
175                 go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) {
176                         ctx.LogD("tx", LEs{
177                                 {"Node", node.Id},
178                                 {"Nice", int(nice)},
179                         }, func(les LEs) string {
180                                 return fmt.Sprintf(
181                                         "Tx trns packet to %s nice: %s",
182                                         ctx.NodeName(node.Id),
183                                         NicenessFmt(nice),
184                                 )
185                         })
186                         pktEncRaw, size, err := PktEncWrite(
187                                 ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst,
188                         )
189                         results <- PktEncWriteResult{pktEncRaw, size, err}
190                         dst.Close()
191                 }(hops[i], pktTrns, pipeRPrev, pipeW)
192         }
193         go func() {
194                 _, err := CopyProgressed(
195                         tmp.W, pipeR, "Tx",
196                         LEs{{"Pkt", pktName}, {"FullSize", expectedSize}},
197                         ctx.ShowPrgrs,
198                 )
199                 results <- PktEncWriteResult{err: err}
200         }()
201         var pktEncRaw []byte
202         var pktEncMsg []byte
203         var payloadSize int64
204         if area != nil {
205                 r := <-results
206                 payloadSize = r.size
207                 pktEncMsg = r.pktEncRaw
208                 wrappers--
209         }
210         for i := 0; i <= wrappers; i++ {
211                 r := <-results
212                 if r.err != nil {
213                         tmp.Fd.Close()
214                         return nil, 0, "", r.err
215                 }
216                 if r.pktEncRaw != nil {
217                         pktEncRaw = r.pktEncRaw
218                         if payloadSize == 0 {
219                                 payloadSize = r.size
220                         }
221                 }
222         }
223         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
224         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
225         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
226         if err != nil {
227                 return lastNode, 0, "", err
228         }
229         if ctx.HdrUsage {
230                 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
231         }
232         if area != nil {
233                 msgHashRaw := blake2b.Sum256(pktEncMsg)
234                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
235                 seenDir := filepath.Join(
236                         ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
237                 )
238                 seenPath := filepath.Join(seenDir, msgHash)
239                 les := LEs{
240                         {"Node", node.Id},
241                         {"Nice", int(nice)},
242                         {"Size", expectedSize},
243                         {"Area", areaId},
244                         {"AreaMsg", msgHash},
245                 }
246                 logMsg := func(les LEs) string {
247                         return fmt.Sprintf(
248                                 "Tx area packet to %s (source %s) nice: %s, area %s: %s",
249                                 ctx.NodeName(node.Id),
250                                 humanize.IBytes(uint64(expectedSize)),
251                                 NicenessFmt(nice),
252                                 area.Name,
253                                 msgHash,
254                         )
255                 }
256                 if err = ensureDir(seenDir); err != nil {
257                         ctx.LogE("tx-mkdir", les, err, logMsg)
258                         return lastNode, 0, "", err
259                 }
260                 if fd, err := os.Create(seenPath); err == nil {
261                         fd.Close()
262                         if err = DirSync(seenDir); err != nil {
263                                 ctx.LogE("tx-dirsync", les, err, logMsg)
264                                 return lastNode, 0, "", err
265                         }
266                 }
267                 ctx.LogI("tx-area", les, logMsg)
268         }
269         return lastNode, payloadSize, tmp.Checksum(), err
270 }
271
272 type DummyCloser struct{}
273
274 func (dc DummyCloser) Close() error { return nil }
275
276 func prepareTxFile(srcPath string) (
277         reader io.Reader,
278         closer io.Closer,
279         srcSize int64,
280         archived bool,
281         rerr error,
282 ) {
283         if srcPath == "-" {
284                 reader = os.Stdin
285                 closer = os.Stdin
286                 return
287         }
288
289         srcStat, err := os.Stat(srcPath)
290         if err != nil {
291                 rerr = err
292                 return
293         }
294         mode := srcStat.Mode()
295
296         if mode.IsRegular() {
297                 // It is regular file, just send it
298                 src, err := os.Open(srcPath)
299                 if err != nil {
300                         rerr = err
301                         return
302                 }
303                 reader = src
304                 closer = src
305                 srcSize = srcStat.Size()
306                 return
307         }
308
309         if !mode.IsDir() {
310                 rerr = errors.New("unsupported file type")
311                 return
312         }
313
314         // It is directory, create PAX archive with its contents
315         archived = true
316         basePath := filepath.Base(srcPath)
317         rootPath, err := filepath.Abs(srcPath)
318         if err != nil {
319                 rerr = err
320                 return
321         }
322         type einfo struct {
323                 path    string
324                 modTime time.Time
325                 size    int64
326         }
327         dirs := make([]einfo, 0, 1<<10)
328         files := make([]einfo, 0, 1<<10)
329         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
330                 if err != nil {
331                         return err
332                 }
333                 if info.Mode().IsDir() {
334                         // directory header, PAX record header+contents
335                         srcSize += TarBlockSize + 2*TarBlockSize
336                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
337                 } else if info.Mode().IsRegular() {
338                         // file header, PAX record header+contents, file content
339                         srcSize += TarBlockSize + 2*TarBlockSize + info.Size()
340                         if n := info.Size() % TarBlockSize; n != 0 {
341                                 srcSize += TarBlockSize - n // padding
342                         }
343                         files = append(files, einfo{
344                                 path:    path,
345                                 modTime: info.ModTime(),
346                                 size:    info.Size(),
347                         })
348                 }
349                 return nil
350         })
351         if rerr != nil {
352                 return
353         }
354
355         r, w := io.Pipe()
356         reader = r
357         closer = DummyCloser{}
358         srcSize += 2 * TarBlockSize // termination block
359
360         go func() error {
361                 tarWr := tar.NewWriter(w)
362                 hdr := tar.Header{
363                         Typeflag: tar.TypeDir,
364                         Mode:     0777,
365                         PAXRecords: map[string]string{
366                                 "comment": "Autogenerated by " + VersionGet(),
367                         },
368                         Format: tar.FormatPAX,
369                 }
370                 for _, e := range dirs {
371                         hdr.Name = basePath + e.path[len(rootPath):]
372                         hdr.ModTime = e.modTime
373                         if err = tarWr.WriteHeader(&hdr); err != nil {
374                                 return w.CloseWithError(err)
375                         }
376                 }
377                 hdr.Typeflag = tar.TypeReg
378                 hdr.Mode = 0666
379                 for _, e := range files {
380                         hdr.Name = basePath + e.path[len(rootPath):]
381                         hdr.ModTime = e.modTime
382                         hdr.Size = e.size
383                         if err = tarWr.WriteHeader(&hdr); err != nil {
384                                 return w.CloseWithError(err)
385                         }
386                         fd, err := os.Open(e.path)
387                         if err != nil {
388                                 fd.Close()
389                                 return w.CloseWithError(err)
390                         }
391                         if _, err = io.Copy(
392                                 tarWr, bufio.NewReaderSize(fd, MTHBlockSize),
393                         ); err != nil {
394                                 fd.Close()
395                                 return w.CloseWithError(err)
396                         }
397                         fd.Close()
398                 }
399                 if err = tarWr.Close(); err != nil {
400                         return w.CloseWithError(err)
401                 }
402                 return w.Close()
403         }()
404         return
405 }
406
407 func (ctx *Ctx) TxFile(
408         node *Node,
409         nice uint8,
410         srcPath, dstPath string,
411         chunkSize, minSize, maxSize int64,
412         areaId *AreaId,
413 ) error {
414         dstPathSpecified := false
415         if dstPath == "" {
416                 if srcPath == "-" {
417                         return errors.New("Must provide destination filename")
418                 }
419                 dstPath = filepath.Base(srcPath)
420         } else {
421                 dstPathSpecified = true
422         }
423         dstPath = filepath.Clean(dstPath)
424         if filepath.IsAbs(dstPath) {
425                 return errors.New("Relative destination path required")
426         }
427         reader, closer, srcSize, archived, err := prepareTxFile(srcPath)
428         if closer != nil {
429                 defer closer.Close()
430         }
431         if err != nil {
432                 return err
433         }
434         if archived && !dstPathSpecified {
435                 dstPath += TarExt
436         }
437
438         if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) {
439                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
440                 if err != nil {
441                         return err
442                 }
443                 _, finalSize, pktName, err := ctx.Tx(
444                         node, pkt, nice,
445                         srcSize, minSize, maxSize,
446                         bufio.NewReaderSize(reader, MTHBlockSize), dstPath, areaId,
447                 )
448                 les := LEs{
449                         {"Type", "file"},
450                         {"Node", node.Id},
451                         {"Nice", int(nice)},
452                         {"Src", srcPath},
453                         {"Dst", dstPath},
454                         {"Size", finalSize},
455                         {"Pkt", pktName},
456                 }
457                 logMsg := func(les LEs) string {
458                         return fmt.Sprintf(
459                                 "File %s (%s) is sent to %s:%s",
460                                 srcPath,
461                                 humanize.IBytes(uint64(finalSize)),
462                                 ctx.NodeName(node.Id),
463                                 dstPath,
464                         )
465                 }
466                 if err == nil {
467                         ctx.LogI("tx", les, logMsg)
468                 } else {
469                         ctx.LogE("tx", les, err, logMsg)
470                 }
471                 return err
472         }
473
474         br := bufio.NewReaderSize(reader, MTHBlockSize)
475         var sizeFull int64
476         var chunkNum int
477         checksums := [][MTHSize]byte{}
478         for {
479                 lr := io.LimitReader(br, chunkSize)
480                 path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
481                 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
482                 if err != nil {
483                         return err
484                 }
485                 hsh := MTHNew(0, 0)
486                 _, size, pktName, err := ctx.Tx(
487                         node, pkt, nice,
488                         0, minSize, maxSize,
489                         io.TeeReader(lr, hsh),
490                         path, areaId,
491                 )
492
493                 les := LEs{
494                         {"Type", "file"},
495                         {"Node", node.Id},
496                         {"Nice", int(nice)},
497                         {"Src", srcPath},
498                         {"Dst", path},
499                         {"Size", size},
500                         {"Pkt", pktName},
501                 }
502                 logMsg := func(les LEs) string {
503                         return fmt.Sprintf(
504                                 "File %s (%s) is sent to %s:%s",
505                                 srcPath,
506                                 humanize.IBytes(uint64(size)),
507                                 ctx.NodeName(node.Id),
508                                 path,
509                         )
510                 }
511                 if err == nil {
512                         ctx.LogI("tx", les, logMsg)
513                 } else {
514                         ctx.LogE("tx", les, err, logMsg)
515                         return err
516                 }
517
518                 sizeFull += size - PktOverhead
519                 var checksum [MTHSize]byte
520                 hsh.Sum(checksum[:0])
521                 checksums = append(checksums, checksum)
522                 chunkNum++
523                 if size < chunkSize {
524                         break
525                 }
526                 if _, err = br.Peek(1); err != nil {
527                         break
528                 }
529         }
530
531         metaPkt := ChunkedMeta{
532                 Magic:     MagicNNCPMv2.B,
533                 FileSize:  uint64(sizeFull),
534                 ChunkSize: uint64(chunkSize),
535                 Checksums: checksums,
536         }
537         var buf bytes.Buffer
538         _, err = xdr.Marshal(&buf, metaPkt)
539         if err != nil {
540                 return err
541         }
542         path := dstPath + ChunkedSuffixMeta
543         pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
544         if err != nil {
545                 return err
546         }
547         metaPktSize := int64(buf.Len())
548         _, _, pktName, err := ctx.Tx(
549                 node,
550                 pkt,
551                 nice,
552                 metaPktSize, minSize, maxSize,
553                 &buf, path, areaId,
554         )
555         les := LEs{
556                 {"Type", "file"},
557                 {"Node", node.Id},
558                 {"Nice", int(nice)},
559                 {"Src", srcPath},
560                 {"Dst", path},
561                 {"Size", metaPktSize},
562                 {"Pkt", pktName},
563         }
564         logMsg := func(les LEs) string {
565                 return fmt.Sprintf(
566                         "File %s (%s) is sent to %s:%s",
567                         srcPath,
568                         humanize.IBytes(uint64(metaPktSize)),
569                         ctx.NodeName(node.Id),
570                         path,
571                 )
572         }
573         if err == nil {
574                 ctx.LogI("tx", les, logMsg)
575         } else {
576                 ctx.LogE("tx", les, err, logMsg)
577         }
578         return err
579 }
580
581 func (ctx *Ctx) TxFreq(
582         node *Node,
583         nice, replyNice uint8,
584         srcPath, dstPath string,
585         minSize int64,
586 ) error {
587         dstPath = filepath.Clean(dstPath)
588         if filepath.IsAbs(dstPath) {
589                 return errors.New("Relative destination path required")
590         }
591         srcPath = filepath.Clean(srcPath)
592         if filepath.IsAbs(srcPath) {
593                 return errors.New("Relative source path required")
594         }
595         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
596         if err != nil {
597                 return err
598         }
599         src := strings.NewReader(dstPath)
600         size := int64(src.Len())
601         _, _, pktName, err := ctx.Tx(
602                 node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil,
603         )
604         les := LEs{
605                 {"Type", "freq"},
606                 {"Node", node.Id},
607                 {"Nice", int(nice)},
608                 {"ReplyNice", int(replyNice)},
609                 {"Src", srcPath},
610                 {"Dst", dstPath},
611                 {"Pkt", pktName},
612         }
613         logMsg := func(les LEs) string {
614                 return fmt.Sprintf(
615                         "File request from %s:%s to %s is sent",
616                         ctx.NodeName(node.Id), srcPath,
617                         dstPath,
618                 )
619         }
620         if err == nil {
621                 ctx.LogI("tx", les, logMsg)
622         } else {
623                 ctx.LogE("tx", les, err, logMsg)
624         }
625         return err
626 }
627
628 func (ctx *Ctx) TxExec(
629         node *Node,
630         nice, replyNice uint8,
631         handle string,
632         args []string,
633         in io.Reader,
634         minSize int64, maxSize int64,
635         noCompress bool,
636         areaId *AreaId,
637 ) error {
638         path := make([][]byte, 0, 1+len(args))
639         path = append(path, []byte(handle))
640         for _, arg := range args {
641                 path = append(path, []byte(arg))
642         }
643         pktType := PktTypeExec
644         if noCompress {
645                 pktType = PktTypeExecFat
646         }
647         pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
648         if err != nil {
649                 return err
650         }
651         compressErr := make(chan error, 1)
652         if !noCompress {
653                 pr, pw := io.Pipe()
654                 compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault))
655                 if err != nil {
656                         return err
657                 }
658                 go func(r io.Reader) {
659                         if _, err := io.Copy(compressor, r); err != nil {
660                                 compressErr <- err
661                                 return
662                         }
663                         compressErr <- compressor.Close()
664                         pw.Close()
665                 }(in)
666                 in = pr
667         }
668         _, size, pktName, err := ctx.Tx(
669                 node, pkt, nice, 0, minSize, maxSize, in, handle, areaId,
670         )
671         if !noCompress {
672                 e := <-compressErr
673                 if err == nil {
674                         err = e
675                 }
676         }
677         dst := strings.Join(append([]string{handle}, args...), " ")
678         les := LEs{
679                 {"Type", "exec"},
680                 {"Node", node.Id},
681                 {"Nice", int(nice)},
682                 {"ReplyNice", int(replyNice)},
683                 {"Dst", dst},
684                 {"Size", size},
685                 {"Pkt", pktName},
686         }
687         logMsg := func(les LEs) string {
688                 return fmt.Sprintf(
689                         "Exec is sent to %s@%s (%s)",
690                         ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
691                 )
692         }
693         if err == nil {
694                 ctx.LogI("tx", les, logMsg)
695         } else {
696                 ctx.LogE("tx", les, err, logMsg)
697         }
698         return err
699 }
700
701 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
702         les := LEs{
703                 {"Type", "trns"},
704                 {"Node", node.Id},
705                 {"Nice", int(nice)},
706                 {"Size", size},
707         }
708         logMsg := func(les LEs) string {
709                 return fmt.Sprintf(
710                         "Transitional packet to %s (%s) (nice %s)",
711                         ctx.NodeName(node.Id),
712                         humanize.IBytes(uint64(size)),
713                         NicenessFmt(nice),
714                 )
715         }
716         ctx.LogD("tx", les, logMsg)
717         if !ctx.IsEnoughSpace(size) {
718                 err := errors.New("is not enough space")
719                 ctx.LogE("tx", les, err, logMsg)
720                 return err
721         }
722         tmp, err := ctx.NewTmpFileWHash()
723         if err != nil {
724                 return err
725         }
726         if _, err = CopyProgressed(
727                 tmp.W, src, "Tx trns",
728                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
729                 ctx.ShowPrgrs,
730         ); err != nil {
731                 return err
732         }
733         nodePath := filepath.Join(ctx.Spool, node.Id.String())
734         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
735         if err == nil {
736                 ctx.LogI("tx", les, logMsg)
737         } else {
738                 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
739         }
740         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
741         return err
742 }
743
744 func (ctx *Ctx) TxACK(
745         node *Node,
746         nice uint8,
747         hsh string,
748         minSize int64,
749 ) (pktName string, err error) {
750         hshRaw, err := Base32Codec.DecodeString(hsh)
751         if err != nil {
752                 return "", err
753         }
754         if len(hshRaw) != MTHSize {
755                 return "", errors.New("Invalid packet id size")
756         }
757         pkt, err := NewPkt(PktTypeACK, nice, []byte(hshRaw))
758         if err != nil {
759                 return "", err
760         }
761         src := bytes.NewReader([]byte{})
762         _, _, pktName, err = ctx.Tx(
763                 node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil,
764         )
765         les := LEs{
766                 {"Type", "ack"},
767                 {"Node", node.Id},
768                 {"Nice", int(nice)},
769                 {"Pkt", hsh},
770                 {"NewPkt", pktName},
771         }
772         logMsg := func(les LEs) string {
773                 return fmt.Sprintf("ACK to %s of %s is sent", ctx.NodeName(node.Id), hsh)
774         }
775         if err == nil {
776                 ctx.LogI("tx", les, logMsg)
777         } else {
778                 ctx.LogE("tx", les, err, logMsg)
779         }
780         return
781 }