]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Unify copyright comment format
[nncp.git] / src / tx.go
1 // NNCP -- Node to Node copy, utilities for store-and-forward data exchange
2 // Copyright (C) 2016-2024 Sergey Matveev <stargrave@stargrave.org>
3 //
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, version 3 of the License.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU General Public License
14 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
16 package nncp
17
18 import (
19         "archive/tar"
20         "bufio"
21         "bytes"
22         "errors"
23         "fmt"
24         "io"
25         "os"
26         "path/filepath"
27         "strconv"
28         "strings"
29         "time"
30
31         xdr "github.com/davecgh/go-xdr/xdr2"
32         "github.com/dustin/go-humanize"
33         "github.com/klauspost/compress/zstd"
34         "golang.org/x/crypto/blake2b"
35 )
36
37 const (
38         MaxFileSize = 1 << 62
39
40         TarBlockSize = 512
41         TarExt       = ".tar"
42 )
43
44 type PktEncWriteResult struct {
45         pktEncRaw []byte
46         size      int64
47         err       error
48 }
49
50 func (ctx *Ctx) Tx(
51         node *Node,
52         pkt *Pkt,
53         nice uint8,
54         srcSize, minSize, maxSize int64,
55         src io.Reader,
56         pktName string,
57         areaId *AreaId,
58 ) (*Node, int64, string, error) {
59         var area *Area
60         if areaId != nil {
61                 area = ctx.AreaId2Area[*areaId]
62                 if area.Prv == nil {
63                         return nil, 0, "", errors.New("area has no encryption keys")
64                 }
65         }
66         hops := make([]*Node, 0, 1+len(node.Via))
67         hops = append(hops, node)
68         lastNode := node
69         for i := len(node.Via); i > 0; i-- {
70                 lastNode = ctx.Neigh[*node.Via[i-1]]
71                 hops = append(hops, lastNode)
72         }
73         wrappers := len(hops)
74         if area != nil {
75                 wrappers++
76         }
77         var expectedSize int64
78         if srcSize > 0 {
79                 expectedSize = srcSize + PktOverhead
80                 expectedSize += sizePadCalc(expectedSize, minSize, wrappers)
81                 expectedSize = PktEncOverhead + sizeWithTags(expectedSize)
82                 if maxSize != 0 && expectedSize > maxSize {
83                         return nil, 0, "", TooBig
84                 }
85                 if !ctx.IsEnoughSpace(expectedSize) {
86                         return nil, 0, "", errors.New("is not enough space")
87                 }
88         }
89         tmp, err := ctx.NewTmpFileWHash()
90         if err != nil {
91                 return nil, 0, "", err
92         }
93
94         results := make(chan PktEncWriteResult)
95         pipeR, pipeW := io.Pipe()
96         var pipeRPrev io.Reader
97         if area == nil {
98                 go func(src io.Reader, dst io.WriteCloser) {
99                         ctx.LogD("tx", LEs{
100                                 {"Node", hops[0].Id},
101                                 {"Nice", int(nice)},
102                                 {"Size", expectedSize},
103                         }, func(les LEs) string {
104                                 return fmt.Sprintf(
105                                         "Tx packet to %s (source %s) nice: %s",
106                                         ctx.NodeName(hops[0].Id),
107                                         humanize.IBytes(uint64(expectedSize)),
108                                         NicenessFmt(nice),
109                                 )
110                         })
111                         pktEncRaw, size, err := PktEncWrite(
112                                 ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst,
113                         )
114                         results <- PktEncWriteResult{pktEncRaw, size, err}
115                         dst.Close()
116                 }(src, pipeW)
117         } else {
118                 go func(src io.Reader, dst io.WriteCloser) {
119                         ctx.LogD("tx", LEs{
120                                 {"Area", area.Id},
121                                 {"Nice", int(nice)},
122                                 {"Size", expectedSize},
123                         }, func(les LEs) string {
124                                 return fmt.Sprintf(
125                                         "Tx area packet to %s (source %s) nice: %s",
126                                         ctx.AreaName(areaId),
127                                         humanize.IBytes(uint64(expectedSize)),
128                                         NicenessFmt(nice),
129                                 )
130                         })
131                         areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
132                         copy(areaNode.Id[:], area.Id[:])
133                         copy(areaNode.ExchPub[:], area.Pub[:])
134                         pktEncRaw, size, err := PktEncWrite(
135                                 ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst,
136                         )
137                         results <- PktEncWriteResult{pktEncRaw, size, err}
138                         dst.Close()
139                 }(src, pipeW)
140                 pipeRPrev = pipeR
141                 pipeR, pipeW = io.Pipe()
142                 go func(src io.Reader, dst io.WriteCloser) {
143                         pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
144                         if err != nil {
145                                 panic(err)
146                         }
147                         ctx.LogD("tx", LEs{
148                                 {"Node", hops[0].Id},
149                                 {"Nice", int(nice)},
150                                 {"Size", expectedSize},
151                         }, func(les LEs) string {
152                                 return fmt.Sprintf(
153                                         "Tx packet to %s (source %s) nice: %s",
154                                         ctx.NodeName(hops[0].Id),
155                                         humanize.IBytes(uint64(expectedSize)),
156                                         NicenessFmt(nice),
157                                 )
158                         })
159                         pktEncRaw, size, err := PktEncWrite(
160                                 ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst,
161                         )
162                         results <- PktEncWriteResult{pktEncRaw, size, err}
163                         dst.Close()
164                 }(pipeRPrev, pipeW)
165         }
166         for i := 1; i < len(hops); i++ {
167                 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
168                 if err != nil {
169                         panic(err)
170                 }
171                 pipeRPrev = pipeR
172                 pipeR, pipeW = io.Pipe()
173                 go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) {
174                         ctx.LogD("tx", LEs{
175                                 {"Node", node.Id},
176                                 {"Nice", int(nice)},
177                         }, func(les LEs) string {
178                                 return fmt.Sprintf(
179                                         "Tx trns packet to %s nice: %s",
180                                         ctx.NodeName(node.Id),
181                                         NicenessFmt(nice),
182                                 )
183                         })
184                         pktEncRaw, size, err := PktEncWrite(
185                                 ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst,
186                         )
187                         results <- PktEncWriteResult{pktEncRaw, size, err}
188                         dst.Close()
189                 }(hops[i], pktTrns, pipeRPrev, pipeW)
190         }
191         go func() {
192                 _, err := CopyProgressed(
193                         tmp.W, pipeR, "Tx",
194                         LEs{{"Pkt", pktName}, {"FullSize", expectedSize}},
195                         ctx.ShowPrgrs,
196                 )
197                 results <- PktEncWriteResult{err: err}
198         }()
199         var pktEncRaw []byte
200         var pktEncMsg []byte
201         var payloadSize int64
202         if area != nil {
203                 r := <-results
204                 payloadSize = r.size
205                 pktEncMsg = r.pktEncRaw
206                 wrappers--
207         }
208         for i := 0; i <= wrappers; i++ {
209                 r := <-results
210                 if r.err != nil {
211                         tmp.Fd.Close()
212                         return nil, 0, "", r.err
213                 }
214                 if r.pktEncRaw != nil {
215                         pktEncRaw = r.pktEncRaw
216                         if payloadSize == 0 {
217                                 payloadSize = r.size
218                         }
219                 }
220         }
221         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
222         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
223         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
224         if err != nil {
225                 return lastNode, 0, "", err
226         }
227         if ctx.HdrUsage {
228                 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
229         }
230         if area != nil {
231                 msgHashRaw := blake2b.Sum256(pktEncMsg)
232                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
233                 seenDir := filepath.Join(
234                         ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
235                 )
236                 seenPath := filepath.Join(seenDir, msgHash)
237                 les := LEs{
238                         {"Node", node.Id},
239                         {"Nice", int(nice)},
240                         {"Size", expectedSize},
241                         {"Area", areaId},
242                         {"AreaMsg", msgHash},
243                 }
244                 logMsg := func(les LEs) string {
245                         return fmt.Sprintf(
246                                 "Tx area packet to %s (source %s) nice: %s, area %s: %s",
247                                 ctx.NodeName(node.Id),
248                                 humanize.IBytes(uint64(expectedSize)),
249                                 NicenessFmt(nice),
250                                 area.Name,
251                                 msgHash,
252                         )
253                 }
254                 if err = ensureDir(seenDir); err != nil {
255                         ctx.LogE("tx-mkdir", les, err, logMsg)
256                         return lastNode, 0, "", err
257                 }
258                 if fd, err := os.Create(seenPath); err == nil {
259                         fd.Close()
260                         if err = DirSync(seenDir); err != nil {
261                                 ctx.LogE("tx-dirsync", les, err, logMsg)
262                                 return lastNode, 0, "", err
263                         }
264                 }
265                 ctx.LogI("tx-area", les, logMsg)
266         }
267         return lastNode, payloadSize, tmp.Checksum(), err
268 }
269
270 type DummyCloser struct{}
271
272 func (dc DummyCloser) Close() error { return nil }
273
274 func prepareTxFile(srcPath string) (
275         reader io.Reader,
276         closer io.Closer,
277         srcSize int64,
278         archived bool,
279         rerr error,
280 ) {
281         if srcPath == "-" {
282                 reader = os.Stdin
283                 closer = os.Stdin
284                 return
285         }
286
287         srcStat, err := os.Stat(srcPath)
288         if err != nil {
289                 rerr = err
290                 return
291         }
292         mode := srcStat.Mode()
293
294         if mode.IsRegular() {
295                 // It is regular file, just send it
296                 src, err := os.Open(srcPath)
297                 if err != nil {
298                         rerr = err
299                         return
300                 }
301                 reader = src
302                 closer = src
303                 srcSize = srcStat.Size()
304                 return
305         }
306
307         if !mode.IsDir() {
308                 rerr = errors.New("unsupported file type")
309                 return
310         }
311
312         // It is directory, create PAX archive with its contents
313         archived = true
314         basePath := filepath.Base(srcPath)
315         rootPath, err := filepath.Abs(srcPath)
316         if err != nil {
317                 rerr = err
318                 return
319         }
320         type einfo struct {
321                 path    string
322                 modTime time.Time
323                 size    int64
324         }
325         dirs := make([]einfo, 0, 1<<10)
326         files := make([]einfo, 0, 1<<10)
327         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
328                 if err != nil {
329                         return err
330                 }
331                 if info.Mode().IsDir() {
332                         // directory header, PAX record header+contents
333                         srcSize += TarBlockSize + 2*TarBlockSize
334                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
335                 } else if info.Mode().IsRegular() {
336                         // file header, PAX record header+contents, file content
337                         srcSize += TarBlockSize + 2*TarBlockSize + info.Size()
338                         if n := info.Size() % TarBlockSize; n != 0 {
339                                 srcSize += TarBlockSize - n // padding
340                         }
341                         files = append(files, einfo{
342                                 path:    path,
343                                 modTime: info.ModTime(),
344                                 size:    info.Size(),
345                         })
346                 }
347                 return nil
348         })
349         if rerr != nil {
350                 return
351         }
352
353         r, w := io.Pipe()
354         reader = r
355         closer = DummyCloser{}
356         srcSize += 2 * TarBlockSize // termination block
357
358         go func() error {
359                 tarWr := tar.NewWriter(w)
360                 hdr := tar.Header{
361                         Typeflag: tar.TypeDir,
362                         Mode:     0777,
363                         PAXRecords: map[string]string{
364                                 "comment": "Autogenerated by " + VersionGet(),
365                         },
366                         Format: tar.FormatPAX,
367                 }
368                 for _, e := range dirs {
369                         hdr.Name = basePath + e.path[len(rootPath):]
370                         hdr.ModTime = e.modTime
371                         if err = tarWr.WriteHeader(&hdr); err != nil {
372                                 return w.CloseWithError(err)
373                         }
374                 }
375                 hdr.Typeflag = tar.TypeReg
376                 hdr.Mode = 0666
377                 for _, e := range files {
378                         hdr.Name = basePath + e.path[len(rootPath):]
379                         hdr.ModTime = e.modTime
380                         hdr.Size = e.size
381                         if err = tarWr.WriteHeader(&hdr); err != nil {
382                                 return w.CloseWithError(err)
383                         }
384                         fd, err := os.Open(e.path)
385                         if err != nil {
386                                 fd.Close()
387                                 return w.CloseWithError(err)
388                         }
389                         if _, err = io.Copy(
390                                 tarWr, bufio.NewReaderSize(fd, MTHBlockSize),
391                         ); err != nil {
392                                 fd.Close()
393                                 return w.CloseWithError(err)
394                         }
395                         fd.Close()
396                 }
397                 if err = tarWr.Close(); err != nil {
398                         return w.CloseWithError(err)
399                 }
400                 return w.Close()
401         }()
402         return
403 }
404
405 func (ctx *Ctx) TxFile(
406         node *Node,
407         nice uint8,
408         srcPath, dstPath string,
409         chunkSize, minSize, maxSize int64,
410         areaId *AreaId,
411 ) error {
412         dstPathSpecified := false
413         if dstPath == "" {
414                 if srcPath == "-" {
415                         return errors.New("Must provide destination filename")
416                 }
417                 dstPath = filepath.Base(srcPath)
418         } else {
419                 dstPathSpecified = true
420         }
421         dstPath = filepath.Clean(dstPath)
422         if filepath.IsAbs(dstPath) {
423                 return errors.New("Relative destination path required")
424         }
425         reader, closer, srcSize, archived, err := prepareTxFile(srcPath)
426         if closer != nil {
427                 defer closer.Close()
428         }
429         if err != nil {
430                 return err
431         }
432         if archived && !dstPathSpecified {
433                 dstPath += TarExt
434         }
435
436         if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) {
437                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
438                 if err != nil {
439                         return err
440                 }
441                 _, finalSize, pktName, err := ctx.Tx(
442                         node, pkt, nice,
443                         srcSize, minSize, maxSize,
444                         bufio.NewReaderSize(reader, MTHBlockSize), dstPath, areaId,
445                 )
446                 les := LEs{
447                         {"Type", "file"},
448                         {"Node", node.Id},
449                         {"Nice", int(nice)},
450                         {"Src", srcPath},
451                         {"Dst", dstPath},
452                         {"Size", finalSize},
453                         {"Pkt", pktName},
454                 }
455                 logMsg := func(les LEs) string {
456                         return fmt.Sprintf(
457                                 "File %s (%s) is sent to %s:%s",
458                                 srcPath,
459                                 humanize.IBytes(uint64(finalSize)),
460                                 ctx.NodeName(node.Id),
461                                 dstPath,
462                         )
463                 }
464                 if err == nil {
465                         ctx.LogI("tx", les, logMsg)
466                 } else {
467                         ctx.LogE("tx", les, err, logMsg)
468                 }
469                 return err
470         }
471
472         br := bufio.NewReaderSize(reader, MTHBlockSize)
473         var sizeFull int64
474         var chunkNum int
475         checksums := [][MTHSize]byte{}
476         for {
477                 lr := io.LimitReader(br, chunkSize)
478                 path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
479                 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
480                 if err != nil {
481                         return err
482                 }
483                 hsh := MTHNew(0, 0)
484                 _, size, pktName, err := ctx.Tx(
485                         node, pkt, nice,
486                         0, minSize, maxSize,
487                         io.TeeReader(lr, hsh),
488                         path, areaId,
489                 )
490
491                 les := LEs{
492                         {"Type", "file"},
493                         {"Node", node.Id},
494                         {"Nice", int(nice)},
495                         {"Src", srcPath},
496                         {"Dst", path},
497                         {"Size", size},
498                         {"Pkt", pktName},
499                 }
500                 logMsg := func(les LEs) string {
501                         return fmt.Sprintf(
502                                 "File %s (%s) is sent to %s:%s",
503                                 srcPath,
504                                 humanize.IBytes(uint64(size)),
505                                 ctx.NodeName(node.Id),
506                                 path,
507                         )
508                 }
509                 if err == nil {
510                         ctx.LogI("tx", les, logMsg)
511                 } else {
512                         ctx.LogE("tx", les, err, logMsg)
513                         return err
514                 }
515
516                 sizeFull += size - PktOverhead
517                 var checksum [MTHSize]byte
518                 hsh.Sum(checksum[:0])
519                 checksums = append(checksums, checksum)
520                 chunkNum++
521                 if size < chunkSize {
522                         break
523                 }
524                 if _, err = br.Peek(1); err != nil {
525                         break
526                 }
527         }
528
529         metaPkt := ChunkedMeta{
530                 Magic:     MagicNNCPMv2.B,
531                 FileSize:  uint64(sizeFull),
532                 ChunkSize: uint64(chunkSize),
533                 Checksums: checksums,
534         }
535         var buf bytes.Buffer
536         _, err = xdr.Marshal(&buf, metaPkt)
537         if err != nil {
538                 return err
539         }
540         path := dstPath + ChunkedSuffixMeta
541         pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
542         if err != nil {
543                 return err
544         }
545         metaPktSize := int64(buf.Len())
546         _, _, pktName, err := ctx.Tx(
547                 node,
548                 pkt,
549                 nice,
550                 metaPktSize, minSize, maxSize,
551                 &buf, path, areaId,
552         )
553         les := LEs{
554                 {"Type", "file"},
555                 {"Node", node.Id},
556                 {"Nice", int(nice)},
557                 {"Src", srcPath},
558                 {"Dst", path},
559                 {"Size", metaPktSize},
560                 {"Pkt", pktName},
561         }
562         logMsg := func(les LEs) string {
563                 return fmt.Sprintf(
564                         "File %s (%s) is sent to %s:%s",
565                         srcPath,
566                         humanize.IBytes(uint64(metaPktSize)),
567                         ctx.NodeName(node.Id),
568                         path,
569                 )
570         }
571         if err == nil {
572                 ctx.LogI("tx", les, logMsg)
573         } else {
574                 ctx.LogE("tx", les, err, logMsg)
575         }
576         return err
577 }
578
579 func (ctx *Ctx) TxFreq(
580         node *Node,
581         nice, replyNice uint8,
582         srcPath, dstPath string,
583         minSize int64,
584 ) error {
585         dstPath = filepath.Clean(dstPath)
586         if filepath.IsAbs(dstPath) {
587                 return errors.New("Relative destination path required")
588         }
589         srcPath = filepath.Clean(srcPath)
590         if filepath.IsAbs(srcPath) {
591                 return errors.New("Relative source path required")
592         }
593         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
594         if err != nil {
595                 return err
596         }
597         src := strings.NewReader(dstPath)
598         size := int64(src.Len())
599         _, _, pktName, err := ctx.Tx(
600                 node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil,
601         )
602         les := LEs{
603                 {"Type", "freq"},
604                 {"Node", node.Id},
605                 {"Nice", int(nice)},
606                 {"ReplyNice", int(replyNice)},
607                 {"Src", srcPath},
608                 {"Dst", dstPath},
609                 {"Pkt", pktName},
610         }
611         logMsg := func(les LEs) string {
612                 return fmt.Sprintf(
613                         "File request from %s:%s to %s is sent",
614                         ctx.NodeName(node.Id), srcPath,
615                         dstPath,
616                 )
617         }
618         if err == nil {
619                 ctx.LogI("tx", les, logMsg)
620         } else {
621                 ctx.LogE("tx", les, err, logMsg)
622         }
623         return err
624 }
625
626 func (ctx *Ctx) TxExec(
627         node *Node,
628         nice, replyNice uint8,
629         handle string,
630         args []string,
631         in io.Reader,
632         minSize int64, maxSize int64,
633         noCompress bool,
634         areaId *AreaId,
635 ) error {
636         path := make([][]byte, 0, 1+len(args))
637         path = append(path, []byte(handle))
638         for _, arg := range args {
639                 path = append(path, []byte(arg))
640         }
641         pktType := PktTypeExec
642         if noCompress {
643                 pktType = PktTypeExecFat
644         }
645         pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
646         if err != nil {
647                 return err
648         }
649         compressErr := make(chan error, 1)
650         if !noCompress {
651                 pr, pw := io.Pipe()
652                 compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault))
653                 if err != nil {
654                         return err
655                 }
656                 go func(r io.Reader) {
657                         if _, err := io.Copy(compressor, r); err != nil {
658                                 compressErr <- err
659                                 return
660                         }
661                         compressErr <- compressor.Close()
662                         pw.Close()
663                 }(in)
664                 in = pr
665         }
666         _, size, pktName, err := ctx.Tx(
667                 node, pkt, nice, 0, minSize, maxSize, in, handle, areaId,
668         )
669         if !noCompress {
670                 e := <-compressErr
671                 if err == nil {
672                         err = e
673                 }
674         }
675         dst := strings.Join(append([]string{handle}, args...), " ")
676         les := LEs{
677                 {"Type", "exec"},
678                 {"Node", node.Id},
679                 {"Nice", int(nice)},
680                 {"ReplyNice", int(replyNice)},
681                 {"Dst", dst},
682                 {"Size", size},
683                 {"Pkt", pktName},
684         }
685         logMsg := func(les LEs) string {
686                 return fmt.Sprintf(
687                         "Exec is sent to %s@%s (%s)",
688                         ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
689                 )
690         }
691         if err == nil {
692                 ctx.LogI("tx", les, logMsg)
693         } else {
694                 ctx.LogE("tx", les, err, logMsg)
695         }
696         return err
697 }
698
699 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
700         les := LEs{
701                 {"Type", "trns"},
702                 {"Node", node.Id},
703                 {"Nice", int(nice)},
704                 {"Size", size},
705         }
706         logMsg := func(les LEs) string {
707                 return fmt.Sprintf(
708                         "Transitional packet to %s (%s) (nice %s)",
709                         ctx.NodeName(node.Id),
710                         humanize.IBytes(uint64(size)),
711                         NicenessFmt(nice),
712                 )
713         }
714         ctx.LogD("tx", les, logMsg)
715         if !ctx.IsEnoughSpace(size) {
716                 err := errors.New("is not enough space")
717                 ctx.LogE("tx", les, err, logMsg)
718                 return err
719         }
720         tmp, err := ctx.NewTmpFileWHash()
721         if err != nil {
722                 return err
723         }
724         if _, err = CopyProgressed(
725                 tmp.W, src, "Tx trns",
726                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
727                 ctx.ShowPrgrs,
728         ); err != nil {
729                 return err
730         }
731         nodePath := filepath.Join(ctx.Spool, node.Id.String())
732         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
733         if err == nil {
734                 ctx.LogI("tx", les, logMsg)
735         } else {
736                 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
737         }
738         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
739         return err
740 }
741
742 func (ctx *Ctx) TxACK(
743         node *Node,
744         nice uint8,
745         hsh string,
746         minSize int64,
747 ) (pktName string, err error) {
748         hshRaw, err := Base32Codec.DecodeString(hsh)
749         if err != nil {
750                 return "", err
751         }
752         if len(hshRaw) != MTHSize {
753                 return "", errors.New("Invalid packet id size")
754         }
755         pkt, err := NewPkt(PktTypeACK, nice, []byte(hshRaw))
756         if err != nil {
757                 return "", err
758         }
759         src := bytes.NewReader([]byte{})
760         _, _, pktName, err = ctx.Tx(
761                 node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil,
762         )
763         les := LEs{
764                 {"Type", "ack"},
765                 {"Node", node.Id},
766                 {"Nice", int(nice)},
767                 {"Pkt", hsh},
768                 {"NewPkt", pktName},
769         }
770         logMsg := func(les LEs) string {
771                 return fmt.Sprintf("ACK to %s of %s is sent", ctx.NodeName(node.Id), hsh)
772         }
773         if err == nil {
774                 ctx.LogI("tx", les, logMsg)
775         } else {
776                 ctx.LogE("tx", les, err, logMsg)
777         }
778         return
779 }