]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Streamed NNCPE format
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "errors"
25         "fmt"
26         "io"
27         "os"
28         "path/filepath"
29         "strconv"
30         "strings"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/klauspost/compress/zstd"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 const (
40         MaxFileSize = 1 << 62
41
42         TarBlockSize = 512
43         TarExt       = ".tar"
44 )
45
46 type PktEncWriteResult struct {
47         pktEncRaw []byte
48         size      int64
49         err       error
50 }
51
52 func (ctx *Ctx) Tx(
53         node *Node,
54         pkt *Pkt,
55         nice uint8,
56         srcSize, minSize, maxSize int64,
57         src io.Reader,
58         pktName string,
59         areaId *AreaId,
60 ) (*Node, int64, error) {
61         var area *Area
62         if areaId != nil {
63                 area = ctx.AreaId2Area[*areaId]
64                 if area.Prv == nil {
65                         return nil, 0, errors.New("area has no encryption keys")
66                 }
67         }
68         hops := make([]*Node, 0, 1+len(node.Via))
69         hops = append(hops, node)
70         lastNode := node
71         for i := len(node.Via); i > 0; i-- {
72                 lastNode = ctx.Neigh[*node.Via[i-1]]
73                 hops = append(hops, lastNode)
74         }
75         wrappers := len(hops)
76         if area != nil {
77                 wrappers++
78         }
79         var expectedSize int64
80         if srcSize > 0 {
81                 expectedSize = srcSize + PktOverhead
82                 expectedSize += sizePadCalc(expectedSize, minSize, wrappers)
83                 expectedSize = PktEncOverhead + sizeWithTags(expectedSize)
84                 if maxSize != 0 && expectedSize > maxSize {
85                         return nil, 0, TooBig
86                 }
87                 if !ctx.IsEnoughSpace(expectedSize) {
88                         return nil, 0, errors.New("is not enough space")
89                 }
90         }
91         tmp, err := ctx.NewTmpFileWHash()
92         if err != nil {
93                 return nil, 0, err
94         }
95
96         results := make(chan PktEncWriteResult)
97         pipeR, pipeW := io.Pipe()
98         var pipeRPrev io.Reader
99         if area == nil {
100                 go func(src io.Reader, dst io.WriteCloser) {
101                         ctx.LogD("tx", LEs{
102                                 {"Node", hops[0].Id},
103                                 {"Nice", int(nice)},
104                                 {"Size", expectedSize},
105                         }, func(les LEs) string {
106                                 return fmt.Sprintf(
107                                         "Tx packet to %s (source %s) nice: %s",
108                                         ctx.NodeName(hops[0].Id),
109                                         humanize.IBytes(uint64(expectedSize)),
110                                         NicenessFmt(nice),
111                                 )
112                         })
113                         pktEncRaw, size, err := PktEncWrite(
114                                 ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst,
115                         )
116                         results <- PktEncWriteResult{pktEncRaw, size, err}
117                         dst.Close()
118                 }(src, pipeW)
119         } else {
120                 go func(src io.Reader, dst io.WriteCloser) {
121                         ctx.LogD("tx", LEs{
122                                 {"Area", area.Id},
123                                 {"Nice", int(nice)},
124                                 {"Size", expectedSize},
125                         }, func(les LEs) string {
126                                 return fmt.Sprintf(
127                                         "Tx area packet to %s (source %s) nice: %s",
128                                         ctx.AreaName(areaId),
129                                         humanize.IBytes(uint64(expectedSize)),
130                                         NicenessFmt(nice),
131                                 )
132                         })
133                         areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
134                         copy(areaNode.Id[:], area.Id[:])
135                         copy(areaNode.ExchPub[:], area.Pub[:])
136                         pktEncRaw, size, err := PktEncWrite(
137                                 ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst,
138                         )
139                         results <- PktEncWriteResult{pktEncRaw, size, err}
140                         dst.Close()
141                 }(src, pipeW)
142                 pipeRPrev = pipeR
143                 pipeR, pipeW = io.Pipe()
144                 go func(src io.Reader, dst io.WriteCloser) {
145                         pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
146                         if err != nil {
147                                 panic(err)
148                         }
149                         ctx.LogD("tx", LEs{
150                                 {"Node", hops[0].Id},
151                                 {"Nice", int(nice)},
152                                 {"Size", expectedSize},
153                         }, func(les LEs) string {
154                                 return fmt.Sprintf(
155                                         "Tx packet to %s (source %s) nice: %s",
156                                         ctx.NodeName(hops[0].Id),
157                                         humanize.IBytes(uint64(expectedSize)),
158                                         NicenessFmt(nice),
159                                 )
160                         })
161                         pktEncRaw, size, err := PktEncWrite(
162                                 ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst,
163                         )
164                         results <- PktEncWriteResult{pktEncRaw, size, err}
165                         dst.Close()
166                 }(pipeRPrev, pipeW)
167         }
168         for i := 1; i < len(hops); i++ {
169                 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
170                 if err != nil {
171                         panic(err)
172                 }
173                 pipeRPrev = pipeR
174                 pipeR, pipeW = io.Pipe()
175                 go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) {
176                         ctx.LogD("tx", LEs{
177                                 {"Node", node.Id},
178                                 {"Nice", int(nice)},
179                         }, func(les LEs) string {
180                                 return fmt.Sprintf(
181                                         "Tx trns packet to %s nice: %s",
182                                         ctx.NodeName(node.Id),
183                                         NicenessFmt(nice),
184                                 )
185                         })
186                         pktEncRaw, size, err := PktEncWrite(
187                                 ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst,
188                         )
189                         results <- PktEncWriteResult{pktEncRaw, size, err}
190                         dst.Close()
191                 }(hops[i], pktTrns, pipeRPrev, pipeW)
192         }
193         go func() {
194                 _, err := CopyProgressed(
195                         tmp.W, pipeR, "Tx",
196                         LEs{{"Pkt", pktName}, {"FullSize", expectedSize}},
197                         ctx.ShowPrgrs,
198                 )
199                 results <- PktEncWriteResult{err: err}
200         }()
201         var pktEncRaw []byte
202         var pktEncMsg []byte
203         if area != nil {
204                 pktEncMsg = (<-results).pktEncRaw
205         }
206         var finalSize int64
207         for i := 0; i <= wrappers; i++ {
208                 r := <-results
209                 if r.err != nil {
210                         tmp.Fd.Close()
211                         return nil, 0, err
212                 }
213                 if r.pktEncRaw != nil {
214                         finalSize = r.size
215                         pktEncRaw = r.pktEncRaw
216                 }
217         }
218         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
219         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
220         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
221         if err != nil {
222                 return lastNode, 0, err
223         }
224         if ctx.HdrUsage {
225                 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
226         }
227         if area != nil {
228                 msgHashRaw := blake2b.Sum256(pktEncMsg)
229                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
230                 seenDir := filepath.Join(
231                         ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
232                 )
233                 seenPath := filepath.Join(seenDir, msgHash)
234                 les := LEs{
235                         {"Node", node.Id},
236                         {"Nice", int(nice)},
237                         {"Size", expectedSize},
238                         {"Area", areaId},
239                         {"AreaMsg", msgHash},
240                 }
241                 logMsg := func(les LEs) string {
242                         return fmt.Sprintf(
243                                 "Tx area packet to %s (source %s) nice: %s, area %s: %s",
244                                 ctx.NodeName(node.Id),
245                                 humanize.IBytes(uint64(expectedSize)),
246                                 NicenessFmt(nice),
247                                 area.Name,
248                                 msgHash,
249                         )
250                 }
251                 if err = ensureDir(seenDir); err != nil {
252                         ctx.LogE("tx-mkdir", les, err, logMsg)
253                         return lastNode, 0, err
254                 }
255                 if fd, err := os.Create(seenPath); err == nil {
256                         fd.Close()
257                         if err = DirSync(seenDir); err != nil {
258                                 ctx.LogE("tx-dirsync", les, err, logMsg)
259                                 return lastNode, 0, err
260                         }
261                 }
262                 ctx.LogI("tx-area", les, logMsg)
263         }
264         return lastNode, finalSize, err
265 }
266
267 type DummyCloser struct{}
268
269 func (dc DummyCloser) Close() error { return nil }
270
271 func prepareTxFile(srcPath string) (
272         reader io.Reader,
273         closer io.Closer,
274         srcSize int64,
275         archived bool,
276         rerr error,
277 ) {
278         if srcPath == "-" {
279                 reader = os.Stdin
280                 closer = os.Stdin
281                 return
282         }
283
284         srcStat, err := os.Stat(srcPath)
285         if err != nil {
286                 rerr = err
287                 return
288         }
289         mode := srcStat.Mode()
290
291         if mode.IsRegular() {
292                 // It is regular file, just send it
293                 src, err := os.Open(srcPath)
294                 if err != nil {
295                         rerr = err
296                         return
297                 }
298                 reader = src
299                 closer = src
300                 srcSize = srcStat.Size()
301                 return
302         }
303
304         if !mode.IsDir() {
305                 rerr = errors.New("unsupported file type")
306                 return
307         }
308
309         // It is directory, create PAX archive with its contents
310         archived = true
311         basePath := filepath.Base(srcPath)
312         rootPath, err := filepath.Abs(srcPath)
313         if err != nil {
314                 rerr = err
315                 return
316         }
317         type einfo struct {
318                 path    string
319                 modTime time.Time
320                 size    int64
321         }
322         dirs := make([]einfo, 0, 1<<10)
323         files := make([]einfo, 0, 1<<10)
324         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
325                 if err != nil {
326                         return err
327                 }
328                 if info.IsDir() {
329                         // directory header, PAX record header+contents
330                         srcSize += TarBlockSize + 2*TarBlockSize
331                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
332                 } else {
333                         // file header, PAX record header+contents, file content
334                         srcSize += TarBlockSize + 2*TarBlockSize + info.Size()
335                         if n := info.Size() % TarBlockSize; n != 0 {
336                                 srcSize += TarBlockSize - n // padding
337                         }
338                         files = append(files, einfo{
339                                 path:    path,
340                                 modTime: info.ModTime(),
341                                 size:    info.Size(),
342                         })
343                 }
344                 return nil
345         })
346         if rerr != nil {
347                 return
348         }
349
350         r, w := io.Pipe()
351         reader = r
352         closer = DummyCloser{}
353         srcSize += 2 * TarBlockSize // termination block
354
355         go func() error {
356                 tarWr := tar.NewWriter(w)
357                 hdr := tar.Header{
358                         Typeflag: tar.TypeDir,
359                         Mode:     0777,
360                         PAXRecords: map[string]string{
361                                 "comment": "Autogenerated by " + VersionGet(),
362                         },
363                         Format: tar.FormatPAX,
364                 }
365                 for _, e := range dirs {
366                         hdr.Name = basePath + e.path[len(rootPath):]
367                         hdr.ModTime = e.modTime
368                         if err = tarWr.WriteHeader(&hdr); err != nil {
369                                 return w.CloseWithError(err)
370                         }
371                 }
372                 hdr.Typeflag = tar.TypeReg
373                 hdr.Mode = 0666
374                 for _, e := range files {
375                         hdr.Name = basePath + e.path[len(rootPath):]
376                         hdr.ModTime = e.modTime
377                         hdr.Size = e.size
378                         if err = tarWr.WriteHeader(&hdr); err != nil {
379                                 return w.CloseWithError(err)
380                         }
381                         fd, err := os.Open(e.path)
382                         if err != nil {
383                                 fd.Close()
384                                 return w.CloseWithError(err)
385                         }
386                         if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
387                                 fd.Close()
388                                 return w.CloseWithError(err)
389                         }
390                         fd.Close()
391                 }
392                 if err = tarWr.Close(); err != nil {
393                         return w.CloseWithError(err)
394                 }
395                 return w.Close()
396         }()
397         return
398 }
399
400 func (ctx *Ctx) TxFile(
401         node *Node,
402         nice uint8,
403         srcPath, dstPath string,
404         chunkSize, minSize, maxSize int64,
405         areaId *AreaId,
406 ) error {
407         dstPathSpecified := false
408         if dstPath == "" {
409                 if srcPath == "-" {
410                         return errors.New("Must provide destination filename")
411                 }
412                 dstPath = filepath.Base(srcPath)
413         } else {
414                 dstPathSpecified = true
415         }
416         dstPath = filepath.Clean(dstPath)
417         if filepath.IsAbs(dstPath) {
418                 return errors.New("Relative destination path required")
419         }
420         reader, closer, srcSize, archived, err := prepareTxFile(srcPath)
421         if closer != nil {
422                 defer closer.Close()
423         }
424         if err != nil {
425                 return err
426         }
427         if archived && !dstPathSpecified {
428                 dstPath += TarExt
429         }
430
431         if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) {
432                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
433                 if err != nil {
434                         return err
435                 }
436                 _, finalSize, err := ctx.Tx(
437                         node, pkt, nice,
438                         srcSize, minSize, maxSize,
439                         bufio.NewReader(reader), dstPath, areaId,
440                 )
441                 les := LEs{
442                         {"Type", "file"},
443                         {"Node", node.Id},
444                         {"Nice", int(nice)},
445                         {"Src", srcPath},
446                         {"Dst", dstPath},
447                         {"Size", finalSize},
448                 }
449                 logMsg := func(les LEs) string {
450                         return fmt.Sprintf(
451                                 "File %s (%s) sent to %s:%s",
452                                 srcPath,
453                                 humanize.IBytes(uint64(finalSize)),
454                                 ctx.NodeName(node.Id),
455                                 dstPath,
456                         )
457                 }
458                 if err == nil {
459                         ctx.LogI("tx", les, logMsg)
460                 } else {
461                         ctx.LogE("tx", les, err, logMsg)
462                 }
463                 return err
464         }
465
466         br := bufio.NewReader(reader)
467         var sizeFull int64
468         var chunkNum int
469         checksums := [][MTHSize]byte{}
470         for {
471                 lr := io.LimitReader(br, chunkSize)
472                 path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
473                 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
474                 if err != nil {
475                         return err
476                 }
477                 hsh := MTHNew(0, 0)
478                 _, size, err := ctx.Tx(
479                         node, pkt, nice,
480                         0, minSize, maxSize,
481                         io.TeeReader(lr, hsh),
482                         path, areaId,
483                 )
484
485                 les := LEs{
486                         {"Type", "file"},
487                         {"Node", node.Id},
488                         {"Nice", int(nice)},
489                         {"Src", srcPath},
490                         {"Dst", path},
491                         {"Size", size},
492                 }
493                 logMsg := func(les LEs) string {
494                         return fmt.Sprintf(
495                                 "File %s (%s) sent to %s:%s",
496                                 srcPath,
497                                 humanize.IBytes(uint64(size)),
498                                 ctx.NodeName(node.Id),
499                                 path,
500                         )
501                 }
502                 if err == nil {
503                         ctx.LogI("tx", les, logMsg)
504                 } else {
505                         ctx.LogE("tx", les, err, logMsg)
506                         return err
507                 }
508
509                 sizeFull += size - PktOverhead
510                 var checksum [MTHSize]byte
511                 hsh.Sum(checksum[:0])
512                 checksums = append(checksums, checksum)
513                 chunkNum++
514                 if size < chunkSize {
515                         break
516                 }
517                 if _, err = br.Peek(1); err != nil {
518                         break
519                 }
520         }
521
522         metaPkt := ChunkedMeta{
523                 Magic:     MagicNNCPMv2.B,
524                 FileSize:  uint64(sizeFull),
525                 ChunkSize: uint64(chunkSize),
526                 Checksums: checksums,
527         }
528         var buf bytes.Buffer
529         _, err = xdr.Marshal(&buf, metaPkt)
530         if err != nil {
531                 return err
532         }
533         path := dstPath + ChunkedSuffixMeta
534         pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
535         if err != nil {
536                 return err
537         }
538         metaPktSize := int64(buf.Len())
539         _, _, err = ctx.Tx(
540                 node,
541                 pkt,
542                 nice,
543                 metaPktSize, minSize, maxSize,
544                 &buf, path, areaId,
545         )
546         les := LEs{
547                 {"Type", "file"},
548                 {"Node", node.Id},
549                 {"Nice", int(nice)},
550                 {"Src", srcPath},
551                 {"Dst", path},
552                 {"Size", metaPktSize},
553         }
554         logMsg := func(les LEs) string {
555                 return fmt.Sprintf(
556                         "File %s (%s) sent to %s:%s",
557                         srcPath,
558                         humanize.IBytes(uint64(metaPktSize)),
559                         ctx.NodeName(node.Id),
560                         path,
561                 )
562         }
563         if err == nil {
564                 ctx.LogI("tx", les, logMsg)
565         } else {
566                 ctx.LogE("tx", les, err, logMsg)
567         }
568         return err
569 }
570
571 func (ctx *Ctx) TxFreq(
572         node *Node,
573         nice, replyNice uint8,
574         srcPath, dstPath string,
575         minSize int64,
576 ) error {
577         dstPath = filepath.Clean(dstPath)
578         if filepath.IsAbs(dstPath) {
579                 return errors.New("Relative destination path required")
580         }
581         srcPath = filepath.Clean(srcPath)
582         if filepath.IsAbs(srcPath) {
583                 return errors.New("Relative source path required")
584         }
585         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
586         if err != nil {
587                 return err
588         }
589         src := strings.NewReader(dstPath)
590         size := int64(src.Len())
591         _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil)
592         les := LEs{
593                 {"Type", "freq"},
594                 {"Node", node.Id},
595                 {"Nice", int(nice)},
596                 {"ReplyNice", int(replyNice)},
597                 {"Src", srcPath},
598                 {"Dst", dstPath},
599         }
600         logMsg := func(les LEs) string {
601                 return fmt.Sprintf(
602                         "File request from %s:%s to %s sent",
603                         ctx.NodeName(node.Id), srcPath,
604                         dstPath,
605                 )
606         }
607         if err == nil {
608                 ctx.LogI("tx", les, logMsg)
609         } else {
610                 ctx.LogE("tx", les, err, logMsg)
611         }
612         return err
613 }
614
615 func (ctx *Ctx) TxExec(
616         node *Node,
617         nice, replyNice uint8,
618         handle string,
619         args []string,
620         in io.Reader,
621         minSize int64, maxSize int64,
622         noCompress bool,
623         areaId *AreaId,
624 ) error {
625         path := make([][]byte, 0, 1+len(args))
626         path = append(path, []byte(handle))
627         for _, arg := range args {
628                 path = append(path, []byte(arg))
629         }
630         pktType := PktTypeExec
631         if noCompress {
632                 pktType = PktTypeExecFat
633         }
634         pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
635         if err != nil {
636                 return err
637         }
638         compressErr := make(chan error, 1)
639         if !noCompress {
640                 pr, pw := io.Pipe()
641                 compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault))
642                 if err != nil {
643                         return err
644                 }
645                 go func(r io.Reader) {
646                         if _, err := io.Copy(compressor, r); err != nil {
647                                 compressErr <- err
648                                 return
649                         }
650                         compressErr <- compressor.Close()
651                         pw.Close()
652                 }(in)
653                 in = pr
654         }
655         _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId)
656         if !noCompress {
657                 e := <-compressErr
658                 if err == nil {
659                         err = e
660                 }
661         }
662         dst := strings.Join(append([]string{handle}, args...), " ")
663         les := LEs{
664                 {"Type", "exec"},
665                 {"Node", node.Id},
666                 {"Nice", int(nice)},
667                 {"ReplyNice", int(replyNice)},
668                 {"Dst", dst},
669                 {"Size", size},
670         }
671         logMsg := func(les LEs) string {
672                 return fmt.Sprintf(
673                         "Exec sent to %s@%s (%s)",
674                         ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
675                 )
676         }
677         if err == nil {
678                 ctx.LogI("tx", les, logMsg)
679         } else {
680                 ctx.LogE("tx", les, err, logMsg)
681         }
682         return err
683 }
684
685 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
686         les := LEs{
687                 {"Type", "trns"},
688                 {"Node", node.Id},
689                 {"Nice", int(nice)},
690                 {"Size", size},
691         }
692         logMsg := func(les LEs) string {
693                 return fmt.Sprintf(
694                         "Transitional packet to %s (%s) (nice %s)",
695                         ctx.NodeName(node.Id),
696                         humanize.IBytes(uint64(size)),
697                         NicenessFmt(nice),
698                 )
699         }
700         ctx.LogD("tx", les, logMsg)
701         if !ctx.IsEnoughSpace(size) {
702                 err := errors.New("is not enough space")
703                 ctx.LogE("tx", les, err, logMsg)
704                 return err
705         }
706         tmp, err := ctx.NewTmpFileWHash()
707         if err != nil {
708                 return err
709         }
710         if _, err = CopyProgressed(
711                 tmp.W, src, "Tx trns",
712                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
713                 ctx.ShowPrgrs,
714         ); err != nil {
715                 return err
716         }
717         nodePath := filepath.Join(ctx.Spool, node.Id.String())
718         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
719         if err == nil {
720                 ctx.LogI("tx", les, logMsg)
721         } else {
722                 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
723         }
724         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
725         return err
726 }