]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
15cf08a060ccb20cefcaeabc5f88e29f40580a72
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "fmt"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34         "time"
35
36         xdr "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "github.com/klauspost/compress/zstd"
39         "golang.org/x/crypto/chacha20poly1305"
40 )
41
42 const (
43         MaxFileSize = 1 << 62
44
45         TarBlockSize = 512
46         TarExt       = ".tar"
47 )
48
49 func (ctx *Ctx) Tx(
50         node *Node,
51         pkt *Pkt,
52         nice uint8,
53         size, minSize int64,
54         src io.Reader,
55         pktName string,
56 ) (*Node, error) {
57         hops := make([]*Node, 0, 1+len(node.Via))
58         hops = append(hops, node)
59         lastNode := node
60         for i := len(node.Via); i > 0; i-- {
61                 lastNode = ctx.Neigh[*node.Via[i-1]]
62                 hops = append(hops, lastNode)
63         }
64         expectedSize := size
65         for i := 0; i < len(hops); i++ {
66                 expectedSize = PktEncOverhead +
67                         PktSizeOverhead +
68                         sizeWithTags(PktOverhead+expectedSize)
69         }
70         padSize := minSize - expectedSize
71         if padSize < 0 {
72                 padSize = 0
73         }
74         if !ctx.IsEnoughSpace(size + padSize) {
75                 return nil, errors.New("is not enough space")
76         }
77         tmp, err := ctx.NewTmpFileWHash()
78         if err != nil {
79                 return nil, err
80         }
81
82         errs := make(chan error)
83         pktEncRaws := make(chan []byte)
84         curSize := size
85         pipeR, pipeW := io.Pipe()
86         go func(size int64, src io.Reader, dst io.WriteCloser) {
87                 ctx.LogD("tx", LEs{
88                         {"Node", hops[0].Id},
89                         {"Nice", int(nice)},
90                         {"Size", size},
91                 }, func(les LEs) string {
92                         return fmt.Sprintf(
93                                 "Tx packet to %s (%s) nice: %s",
94                                 ctx.NodeName(hops[0].Id),
95                                 humanize.IBytes(uint64(size)),
96                                 NicenessFmt(nice),
97                         )
98                 })
99                 pktEncRaw, err := PktEncWrite(
100                         ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
101                 )
102                 pktEncRaws <- pktEncRaw
103                 errs <- err
104                 dst.Close() // #nosec G104
105         }(curSize, src, pipeW)
106         curSize = PktEncOverhead +
107                 PktSizeOverhead +
108                 sizeWithTags(PktOverhead+curSize) +
109                 padSize
110
111         var pipeRPrev io.Reader
112         for i := 1; i < len(hops); i++ {
113                 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
114                 if err != nil {
115                         panic(err)
116                 }
117                 pipeRPrev = pipeR
118                 pipeR, pipeW = io.Pipe()
119                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
120                         ctx.LogD("tx", LEs{
121                                 {"Node", node.Id},
122                                 {"Nice", int(nice)},
123                                 {"Size", size},
124                         }, func(les LEs) string {
125                                 return fmt.Sprintf(
126                                         "Tx trns packet to %s (%s) nice: %s",
127                                         ctx.NodeName(node.Id),
128                                         humanize.IBytes(uint64(size)),
129                                         NicenessFmt(nice),
130                                 )
131                         })
132                         pktEncRaw, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
133                         pktEncRaws <- pktEncRaw
134                         errs <- err
135                         dst.Close() // #nosec G104
136                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
137                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
138         }
139         go func() {
140                 _, err := CopyProgressed(
141                         tmp.W, pipeR, "Tx",
142                         LEs{{"Pkt", pktName}, {"FullSize", curSize}},
143                         ctx.ShowPrgrs,
144                 )
145                 errs <- err
146         }()
147         var pktEncRaw []byte
148         for i := 0; i < len(hops); i++ {
149                 pktEncRaw = <-pktEncRaws
150         }
151         for i := 0; i <= len(hops); i++ {
152                 err = <-errs
153                 if err != nil {
154                         tmp.Fd.Close() // #nosec G104
155                         return nil, err
156                 }
157         }
158         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
159         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
160         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
161         if err != nil {
162                 return lastNode, err
163         }
164         if ctx.HdrUsage {
165                 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
166         }
167         return lastNode, err
168 }
169
170 type DummyCloser struct{}
171
172 func (dc DummyCloser) Close() error { return nil }
173
174 func throughTmpFile(r io.Reader) (
175         reader io.Reader,
176         closer io.Closer,
177         fileSize int64,
178         rerr error,
179 ) {
180         src, err := ioutil.TempFile("", "nncp-file")
181         if err != nil {
182                 rerr = err
183                 return
184         }
185         os.Remove(src.Name()) // #nosec G104
186         tmpW := bufio.NewWriter(src)
187         tmpKey := make([]byte, chacha20poly1305.KeySize)
188         if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
189                 return
190         }
191         aead, err := chacha20poly1305.New(tmpKey)
192         if err != nil {
193                 rerr = err
194                 return
195         }
196         nonce := make([]byte, aead.NonceSize())
197         written, err := aeadProcess(aead, nonce, nil, true, r, tmpW)
198         if err != nil {
199                 rerr = err
200                 return
201         }
202         fileSize = int64(written)
203         if err = tmpW.Flush(); err != nil {
204                 rerr = err
205                 return
206         }
207         if _, err = src.Seek(0, io.SeekStart); err != nil {
208                 rerr = err
209                 return
210         }
211         r, w := io.Pipe()
212         go func() {
213                 if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil {
214                         w.CloseWithError(err) // #nosec G104
215                 }
216         }()
217         reader = r
218         closer = src
219         return
220 }
221
222 func prepareTxFile(srcPath string) (
223         reader io.Reader,
224         closer io.Closer,
225         fileSize int64,
226         archived bool,
227         rerr error,
228 ) {
229         if srcPath == "-" {
230                 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
231                 return
232         }
233
234         srcStat, err := os.Stat(srcPath)
235         if err != nil {
236                 rerr = err
237                 return
238         }
239         mode := srcStat.Mode()
240
241         if mode.IsRegular() {
242                 // It is regular file, just send it
243                 src, err := os.Open(srcPath)
244                 if err != nil {
245                         rerr = err
246                         return
247                 }
248                 fileSize = srcStat.Size()
249                 reader = bufio.NewReader(src)
250                 closer = src
251                 return
252         }
253
254         if !mode.IsDir() {
255                 rerr = errors.New("unsupported file type")
256                 return
257         }
258
259         // It is directory, create PAX archive with its contents
260         archived = true
261         basePath := filepath.Base(srcPath)
262         rootPath, err := filepath.Abs(srcPath)
263         if err != nil {
264                 rerr = err
265                 return
266         }
267         type einfo struct {
268                 path    string
269                 modTime time.Time
270                 size    int64
271         }
272         dirs := make([]einfo, 0, 1<<10)
273         files := make([]einfo, 0, 1<<10)
274         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
275                 if err != nil {
276                         return err
277                 }
278                 if info.IsDir() {
279                         // directory header, PAX record header+contents
280                         fileSize += TarBlockSize + 2*TarBlockSize
281                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
282                 } else {
283                         // file header, PAX record header+contents, file content
284                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
285                         if n := info.Size() % TarBlockSize; n != 0 {
286                                 fileSize += TarBlockSize - n // padding
287                         }
288                         files = append(files, einfo{
289                                 path:    path,
290                                 modTime: info.ModTime(),
291                                 size:    info.Size(),
292                         })
293                 }
294                 return nil
295         })
296         if rerr != nil {
297                 return
298         }
299
300         r, w := io.Pipe()
301         reader = r
302         closer = DummyCloser{}
303         fileSize += 2 * TarBlockSize // termination block
304
305         go func() error {
306                 tarWr := tar.NewWriter(w)
307                 hdr := tar.Header{
308                         Typeflag: tar.TypeDir,
309                         Mode:     0777,
310                         PAXRecords: map[string]string{
311                                 "comment": "Autogenerated by " + VersionGet(),
312                         },
313                         Format: tar.FormatPAX,
314                 }
315                 for _, e := range dirs {
316                         hdr.Name = basePath + e.path[len(rootPath):]
317                         hdr.ModTime = e.modTime
318                         if err = tarWr.WriteHeader(&hdr); err != nil {
319                                 return w.CloseWithError(err)
320                         }
321                 }
322                 hdr.Typeflag = tar.TypeReg
323                 hdr.Mode = 0666
324                 for _, e := range files {
325                         hdr.Name = basePath + e.path[len(rootPath):]
326                         hdr.ModTime = e.modTime
327                         hdr.Size = e.size
328                         if err = tarWr.WriteHeader(&hdr); err != nil {
329                                 return w.CloseWithError(err)
330                         }
331                         fd, err := os.Open(e.path)
332                         if err != nil {
333                                 fd.Close() // #nosec G104
334                                 return w.CloseWithError(err)
335                         }
336                         if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
337                                 fd.Close() // #nosec G104
338                                 return w.CloseWithError(err)
339                         }
340                         fd.Close() // #nosec G104
341                 }
342                 if err = tarWr.Close(); err != nil {
343                         return w.CloseWithError(err)
344                 }
345                 return w.Close()
346         }()
347         return
348 }
349
350 func (ctx *Ctx) TxFile(
351         node *Node,
352         nice uint8,
353         srcPath, dstPath string,
354         chunkSize int64,
355         minSize, maxSize int64,
356 ) error {
357         dstPathSpecified := false
358         if dstPath == "" {
359                 if srcPath == "-" {
360                         return errors.New("Must provide destination filename")
361                 }
362                 dstPath = filepath.Base(srcPath)
363         } else {
364                 dstPathSpecified = true
365         }
366         dstPath = filepath.Clean(dstPath)
367         if filepath.IsAbs(dstPath) {
368                 return errors.New("Relative destination path required")
369         }
370         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
371         if closer != nil {
372                 defer closer.Close()
373         }
374         if err != nil {
375                 return err
376         }
377         if fileSize > maxSize {
378                 return errors.New("Too big than allowed")
379         }
380         if archived && !dstPathSpecified {
381                 dstPath += TarExt
382         }
383
384         if fileSize <= chunkSize {
385                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
386                 if err != nil {
387                         return err
388                 }
389                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
390                 les := LEs{
391                         {"Type", "file"},
392                         {"Node", node.Id},
393                         {"Nice", int(nice)},
394                         {"Src", srcPath},
395                         {"Dst", dstPath},
396                         {"Size", fileSize},
397                 }
398                 logMsg := func(les LEs) string {
399                         return fmt.Sprintf(
400                                 "File %s (%s) sent to %s:%s",
401                                 srcPath,
402                                 humanize.IBytes(uint64(fileSize)),
403                                 ctx.NodeName(node.Id),
404                                 dstPath,
405                         )
406                 }
407                 if err == nil {
408                         ctx.LogI("tx", les, logMsg)
409                 } else {
410                         ctx.LogE("tx", les, err, logMsg)
411                 }
412                 return err
413         }
414
415         leftSize := fileSize
416         metaPkt := ChunkedMeta{
417                 Magic:     MagicNNCPMv2.B,
418                 FileSize:  uint64(fileSize),
419                 ChunkSize: uint64(chunkSize),
420                 Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
421         }
422         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
423                 hsh := new([MTHSize]byte)
424                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
425         }
426         var sizeToSend int64
427         var hsh hash.Hash
428         var pkt *Pkt
429         var chunkNum int
430         var path string
431         for {
432                 if leftSize <= chunkSize {
433                         sizeToSend = leftSize
434                 } else {
435                         sizeToSend = chunkSize
436                 }
437                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
438                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
439                 if err != nil {
440                         return err
441                 }
442                 hsh = MTHNew(0, 0)
443                 _, err = ctx.Tx(
444                         node,
445                         pkt,
446                         nice,
447                         sizeToSend,
448                         minSize,
449                         io.TeeReader(reader, hsh),
450                         path,
451                 )
452                 les := LEs{
453                         {"Type", "file"},
454                         {"Node", node.Id},
455                         {"Nice", int(nice)},
456                         {"Src", srcPath},
457                         {"Dst", path},
458                         {"Size", sizeToSend},
459                 }
460                 logMsg := func(les LEs) string {
461                         return fmt.Sprintf(
462                                 "File %s (%s) sent to %s:%s",
463                                 srcPath,
464                                 humanize.IBytes(uint64(sizeToSend)),
465                                 ctx.NodeName(node.Id),
466                                 path,
467                         )
468                 }
469                 if err == nil {
470                         ctx.LogI("tx", les, logMsg)
471                 } else {
472                         ctx.LogE("tx", les, err, logMsg)
473                         return err
474                 }
475                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
476                 leftSize -= sizeToSend
477                 chunkNum++
478                 if leftSize == 0 {
479                         break
480                 }
481         }
482         var metaBuf bytes.Buffer
483         _, err = xdr.Marshal(&metaBuf, metaPkt)
484         if err != nil {
485                 return err
486         }
487         path = dstPath + ChunkedSuffixMeta
488         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
489         if err != nil {
490                 return err
491         }
492         metaPktSize := int64(metaBuf.Len())
493         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
494         les := LEs{
495                 {"Type", "file"},
496                 {"Node", node.Id},
497                 {"Nice", int(nice)},
498                 {"Src", srcPath},
499                 {"Dst", path},
500                 {"Size", metaPktSize},
501         }
502         logMsg := func(les LEs) string {
503                 return fmt.Sprintf(
504                         "File %s (%s) sent to %s:%s",
505                         srcPath,
506                         humanize.IBytes(uint64(metaPktSize)),
507                         ctx.NodeName(node.Id),
508                         path,
509                 )
510         }
511         if err == nil {
512                 ctx.LogI("tx", les, logMsg)
513         } else {
514                 ctx.LogE("tx", les, err, logMsg)
515         }
516         return err
517 }
518
519 func (ctx *Ctx) TxFreq(
520         node *Node,
521         nice, replyNice uint8,
522         srcPath, dstPath string,
523         minSize int64) error {
524         dstPath = filepath.Clean(dstPath)
525         if filepath.IsAbs(dstPath) {
526                 return errors.New("Relative destination path required")
527         }
528         srcPath = filepath.Clean(srcPath)
529         if filepath.IsAbs(srcPath) {
530                 return errors.New("Relative source path required")
531         }
532         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
533         if err != nil {
534                 return err
535         }
536         src := strings.NewReader(dstPath)
537         size := int64(src.Len())
538         _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
539         les := LEs{
540                 {"Type", "freq"},
541                 {"Node", node.Id},
542                 {"Nice", int(nice)},
543                 {"ReplyNice", int(replyNice)},
544                 {"Src", srcPath},
545                 {"Dst", dstPath},
546         }
547         logMsg := func(les LEs) string {
548                 return fmt.Sprintf(
549                         "File request from %s:%s to %s sent",
550                         ctx.NodeName(node.Id), srcPath,
551                         dstPath,
552                 )
553         }
554         if err == nil {
555                 ctx.LogI("tx", les, logMsg)
556         } else {
557                 ctx.LogE("tx", les, err, logMsg)
558         }
559         return err
560 }
561
562 func (ctx *Ctx) TxExec(
563         node *Node,
564         nice, replyNice uint8,
565         handle string,
566         args []string,
567         in io.Reader,
568         minSize int64,
569         useTmp bool,
570         noCompress bool,
571 ) error {
572         path := make([][]byte, 0, 1+len(args))
573         path = append(path, []byte(handle))
574         for _, arg := range args {
575                 path = append(path, []byte(arg))
576         }
577         pktType := PktTypeExec
578         if noCompress {
579                 pktType = PktTypeExecFat
580         }
581         pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
582         if err != nil {
583                 return err
584         }
585         var size int64
586
587         if !noCompress && !useTmp {
588                 var compressed bytes.Buffer
589                 compressor, err := zstd.NewWriter(
590                         &compressed,
591                         zstd.WithEncoderLevel(zstd.SpeedDefault),
592                 )
593                 if err != nil {
594                         return err
595                 }
596                 if _, err = io.Copy(compressor, in); err != nil {
597                         compressor.Close() // #nosec G104
598                         return err
599                 }
600                 if err = compressor.Close(); err != nil {
601                         return err
602                 }
603                 size = int64(compressed.Len())
604                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
605         }
606         if noCompress && !useTmp {
607                 var data bytes.Buffer
608                 if _, err = io.Copy(&data, in); err != nil {
609                         return err
610                 }
611                 size = int64(data.Len())
612                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
613         }
614         if !noCompress && useTmp {
615                 r, w := io.Pipe()
616                 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
617                 if err != nil {
618                         return err
619                 }
620                 copyErr := make(chan error)
621                 go func() {
622                         _, err := io.Copy(compressor, in)
623                         if err != nil {
624                                 compressor.Close() // #nosec G104
625                                 copyErr <- err
626                         }
627                         err = compressor.Close()
628                         w.Close()
629                         copyErr <- err
630                 }()
631                 tmpReader, closer, fileSize, err := throughTmpFile(r)
632                 if closer != nil {
633                         defer closer.Close()
634                 }
635                 if err != nil {
636                         return err
637                 }
638                 err = <-copyErr
639                 if err != nil {
640                         return err
641                 }
642                 size = fileSize
643                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
644         }
645         if noCompress && useTmp {
646                 tmpReader, closer, fileSize, err := throughTmpFile(in)
647                 if closer != nil {
648                         defer closer.Close()
649                 }
650                 if err != nil {
651                         return err
652                 }
653                 size = fileSize
654                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
655         }
656
657         dst := strings.Join(append([]string{handle}, args...), " ")
658         les := LEs{
659                 {"Type", "exec"},
660                 {"Node", node.Id},
661                 {"Nice", int(nice)},
662                 {"ReplyNice", int(replyNice)},
663                 {"Dst", dst},
664                 {"Size", size},
665         }
666         logMsg := func(les LEs) string {
667                 return fmt.Sprintf(
668                         "Exec sent to %s@%s (%s)",
669                         ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
670                 )
671         }
672         if err == nil {
673                 ctx.LogI("tx", les, logMsg)
674         } else {
675                 ctx.LogE("tx", les, err, logMsg)
676         }
677         return err
678 }
679
680 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
681         les := LEs{
682                 {"Type", "trns"},
683                 {"Node", node.Id},
684                 {"Nice", int(nice)},
685                 {"Size", size},
686         }
687         logMsg := func(les LEs) string {
688                 return fmt.Sprintf(
689                         "Transitional packet to %s (%s) (nice %s)",
690                         ctx.NodeName(node.Id),
691                         humanize.IBytes(uint64(size)),
692                         NicenessFmt(nice),
693                 )
694         }
695         ctx.LogD("tx", les, logMsg)
696         if !ctx.IsEnoughSpace(size) {
697                 err := errors.New("is not enough space")
698                 ctx.LogE("tx", les, err, logMsg)
699                 return err
700         }
701         tmp, err := ctx.NewTmpFileWHash()
702         if err != nil {
703                 return err
704         }
705         if _, err = CopyProgressed(
706                 tmp.W, src, "Tx trns",
707                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
708                 ctx.ShowPrgrs,
709         ); err != nil {
710                 return err
711         }
712         nodePath := filepath.Join(ctx.Spool, node.Id.String())
713         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
714         if err == nil {
715                 ctx.LogI("tx", les, logMsg)
716         } else {
717                 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
718         }
719         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
720         return err
721 }