]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
73fbe4ec12bfa118d7d7ff1ab15c6f1ed2667fa1
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "fmt"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34         "time"
35
36         xdr "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "github.com/klauspost/compress/zstd"
39         "golang.org/x/crypto/blake2b"
40         "golang.org/x/crypto/chacha20poly1305"
41 )
42
43 const (
44         MaxFileSize = 1 << 62
45
46         TarBlockSize = 512
47         TarExt       = ".tar"
48 )
49
50 func (ctx *Ctx) Tx(
51         node *Node,
52         pkt *Pkt,
53         nice uint8,
54         size, minSize int64,
55         src io.Reader,
56         pktName string,
57 ) (*Node, error) {
58         hops := make([]*Node, 0, 1+len(node.Via))
59         hops = append(hops, node)
60         lastNode := node
61         for i := len(node.Via); i > 0; i-- {
62                 lastNode = ctx.Neigh[*node.Via[i-1]]
63                 hops = append(hops, lastNode)
64         }
65         expectedSize := size
66         for i := 0; i < len(hops); i++ {
67                 expectedSize = PktEncOverhead +
68                         PktSizeOverhead +
69                         sizeWithTags(PktOverhead+expectedSize)
70         }
71         padSize := minSize - expectedSize
72         if padSize < 0 {
73                 padSize = 0
74         }
75         if !ctx.IsEnoughSpace(size + padSize) {
76                 return nil, errors.New("is not enough space")
77         }
78         tmp, err := ctx.NewTmpFileWHash()
79         if err != nil {
80                 return nil, err
81         }
82
83         errs := make(chan error)
84         curSize := size
85         pipeR, pipeW := io.Pipe()
86         var pktEncRaw []byte
87         go func(size int64, src io.Reader, dst io.WriteCloser) {
88                 ctx.LogD("tx", LEs{
89                         {"Node", hops[0].Id},
90                         {"Nice", int(nice)},
91                         {"Size", size},
92                 }, func(les LEs) string {
93                         return fmt.Sprintf(
94                                 "Tx packet to %s (%s) nice: %s",
95                                 ctx.NodeName(hops[0].Id),
96                                 humanize.IBytes(uint64(size)),
97                                 NicenessFmt(nice),
98                         )
99                 })
100                 pktEncRaw, err = PktEncWrite(
101                         ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
102                 )
103                 errs <- err
104                 dst.Close() // #nosec G104
105         }(curSize, src, pipeW)
106         curSize = PktEncOverhead +
107                 PktSizeOverhead +
108                 sizeWithTags(PktOverhead+curSize) +
109                 padSize
110
111         var pipeRPrev io.Reader
112         for i := 1; i < len(hops); i++ {
113                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
114                 pipeRPrev = pipeR
115                 pipeR, pipeW = io.Pipe()
116                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
117                         ctx.LogD("tx", LEs{
118                                 {"Node", node.Id},
119                                 {"Nice", int(nice)},
120                                 {"Size", size},
121                         }, func(les LEs) string {
122                                 return fmt.Sprintf(
123                                         "Tx trns packet to %s (%s) nice: %s",
124                                         ctx.NodeName(node.Id),
125                                         humanize.IBytes(uint64(size)),
126                                         NicenessFmt(nice),
127                                 )
128                         })
129                         _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
130                         errs <- err
131                         dst.Close() // #nosec G104
132                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
133                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
134         }
135         go func() {
136                 _, err := CopyProgressed(
137                         tmp.W, pipeR, "Tx",
138                         LEs{{"Pkt", pktName}, {"FullSize", curSize}},
139                         ctx.ShowPrgrs,
140                 )
141                 errs <- err
142         }()
143         for i := 0; i <= len(hops); i++ {
144                 err = <-errs
145                 if err != nil {
146                         tmp.Fd.Close() // #nosec G104
147                         return nil, err
148                 }
149         }
150         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
151         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
152         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
153         if err != nil {
154                 return lastNode, err
155         }
156         if ctx.HdrUsage {
157                 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
158         }
159         return lastNode, err
160 }
161
162 type DummyCloser struct{}
163
164 func (dc DummyCloser) Close() error { return nil }
165
166 func throughTmpFile(r io.Reader) (
167         reader io.Reader,
168         closer io.Closer,
169         fileSize int64,
170         rerr error,
171 ) {
172         src, err := ioutil.TempFile("", "nncp-file")
173         if err != nil {
174                 rerr = err
175                 return
176         }
177         os.Remove(src.Name()) // #nosec G104
178         tmpW := bufio.NewWriter(src)
179         tmpKey := make([]byte, chacha20poly1305.KeySize)
180         if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
181                 return
182         }
183         aead, err := chacha20poly1305.New(tmpKey)
184         if err != nil {
185                 rerr = err
186                 return
187         }
188         nonce := make([]byte, aead.NonceSize())
189         written, err := aeadProcess(aead, nonce, true, r, tmpW)
190         if err != nil {
191                 rerr = err
192                 return
193         }
194         fileSize = int64(written)
195         if err = tmpW.Flush(); err != nil {
196                 rerr = err
197                 return
198         }
199         if _, err = src.Seek(0, io.SeekStart); err != nil {
200                 rerr = err
201                 return
202         }
203         r, w := io.Pipe()
204         go func() {
205                 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
206                         w.CloseWithError(err) // #nosec G104
207                 }
208         }()
209         reader = r
210         closer = src
211         return
212 }
213
214 func prepareTxFile(srcPath string) (
215         reader io.Reader,
216         closer io.Closer,
217         fileSize int64,
218         archived bool,
219         rerr error,
220 ) {
221         if srcPath == "-" {
222                 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
223                 return
224         }
225
226         srcStat, err := os.Stat(srcPath)
227         if err != nil {
228                 rerr = err
229                 return
230         }
231         mode := srcStat.Mode()
232
233         if mode.IsRegular() {
234                 // It is regular file, just send it
235                 src, err := os.Open(srcPath)
236                 if err != nil {
237                         rerr = err
238                         return
239                 }
240                 fileSize = srcStat.Size()
241                 reader = bufio.NewReader(src)
242                 closer = src
243                 return
244         }
245
246         if !mode.IsDir() {
247                 rerr = errors.New("unsupported file type")
248                 return
249         }
250
251         // It is directory, create PAX archive with its contents
252         archived = true
253         basePath := filepath.Base(srcPath)
254         rootPath, err := filepath.Abs(srcPath)
255         if err != nil {
256                 rerr = err
257                 return
258         }
259         type einfo struct {
260                 path    string
261                 modTime time.Time
262                 size    int64
263         }
264         dirs := make([]einfo, 0, 1<<10)
265         files := make([]einfo, 0, 1<<10)
266         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
267                 if err != nil {
268                         return err
269                 }
270                 if info.IsDir() {
271                         // directory header, PAX record header+contents
272                         fileSize += TarBlockSize + 2*TarBlockSize
273                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
274                 } else {
275                         // file header, PAX record header+contents, file content
276                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
277                         if n := info.Size() % TarBlockSize; n != 0 {
278                                 fileSize += TarBlockSize - n // padding
279                         }
280                         files = append(files, einfo{
281                                 path:    path,
282                                 modTime: info.ModTime(),
283                                 size:    info.Size(),
284                         })
285                 }
286                 return nil
287         })
288         if rerr != nil {
289                 return
290         }
291
292         r, w := io.Pipe()
293         reader = r
294         closer = DummyCloser{}
295         fileSize += 2 * TarBlockSize // termination block
296
297         go func() error {
298                 tarWr := tar.NewWriter(w)
299                 hdr := tar.Header{
300                         Typeflag: tar.TypeDir,
301                         Mode:     0777,
302                         PAXRecords: map[string]string{
303                                 "comment": "Autogenerated by " + VersionGet(),
304                         },
305                         Format: tar.FormatPAX,
306                 }
307                 for _, e := range dirs {
308                         hdr.Name = basePath + e.path[len(rootPath):]
309                         hdr.ModTime = e.modTime
310                         if err = tarWr.WriteHeader(&hdr); err != nil {
311                                 return w.CloseWithError(err)
312                         }
313                 }
314                 hdr.Typeflag = tar.TypeReg
315                 hdr.Mode = 0666
316                 for _, e := range files {
317                         hdr.Name = basePath + e.path[len(rootPath):]
318                         hdr.ModTime = e.modTime
319                         hdr.Size = e.size
320                         if err = tarWr.WriteHeader(&hdr); err != nil {
321                                 return w.CloseWithError(err)
322                         }
323                         fd, err := os.Open(e.path)
324                         if err != nil {
325                                 fd.Close() // #nosec G104
326                                 return w.CloseWithError(err)
327                         }
328                         if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
329                                 fd.Close() // #nosec G104
330                                 return w.CloseWithError(err)
331                         }
332                         fd.Close() // #nosec G104
333                 }
334                 if err = tarWr.Close(); err != nil {
335                         return w.CloseWithError(err)
336                 }
337                 return w.Close()
338         }()
339         return
340 }
341
342 func (ctx *Ctx) TxFile(
343         node *Node,
344         nice uint8,
345         srcPath, dstPath string,
346         chunkSize int64,
347         minSize, maxSize int64,
348 ) error {
349         dstPathSpecified := false
350         if dstPath == "" {
351                 if srcPath == "-" {
352                         return errors.New("Must provide destination filename")
353                 }
354                 dstPath = filepath.Base(srcPath)
355         } else {
356                 dstPathSpecified = true
357         }
358         dstPath = filepath.Clean(dstPath)
359         if filepath.IsAbs(dstPath) {
360                 return errors.New("Relative destination path required")
361         }
362         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
363         if closer != nil {
364                 defer closer.Close()
365         }
366         if err != nil {
367                 return err
368         }
369         if fileSize > maxSize {
370                 return errors.New("Too big than allowed")
371         }
372         if archived && !dstPathSpecified {
373                 dstPath += TarExt
374         }
375
376         if fileSize <= chunkSize {
377                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
378                 if err != nil {
379                         return err
380                 }
381                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
382                 les := LEs{
383                         {"Type", "file"},
384                         {"Node", node.Id},
385                         {"Nice", int(nice)},
386                         {"Src", srcPath},
387                         {"Dst", dstPath},
388                         {"Size", fileSize},
389                 }
390                 logMsg := func(les LEs) string {
391                         return fmt.Sprintf(
392                                 "File %s (%s) sent to %s:%s",
393                                 srcPath,
394                                 humanize.IBytes(uint64(fileSize)),
395                                 ctx.NodeName(node.Id),
396                                 dstPath,
397                         )
398                 }
399                 if err == nil {
400                         ctx.LogI("tx", les, logMsg)
401                 } else {
402                         ctx.LogE("tx", les, err, logMsg)
403                 }
404                 return err
405         }
406
407         leftSize := fileSize
408         metaPkt := ChunkedMeta{
409                 Magic:     MagicNNCPMv1,
410                 FileSize:  uint64(fileSize),
411                 ChunkSize: uint64(chunkSize),
412                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
413         }
414         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
415                 hsh := new([32]byte)
416                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
417         }
418         var sizeToSend int64
419         var hsh hash.Hash
420         var pkt *Pkt
421         var chunkNum int
422         var path string
423         for {
424                 if leftSize <= chunkSize {
425                         sizeToSend = leftSize
426                 } else {
427                         sizeToSend = chunkSize
428                 }
429                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
430                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
431                 if err != nil {
432                         return err
433                 }
434                 hsh, err = blake2b.New256(nil)
435                 if err != nil {
436                         return err
437                 }
438                 _, err = ctx.Tx(
439                         node,
440                         pkt,
441                         nice,
442                         sizeToSend,
443                         minSize,
444                         io.TeeReader(reader, hsh),
445                         path,
446                 )
447                 les := LEs{
448                         {"Type", "file"},
449                         {"Node", node.Id},
450                         {"Nice", int(nice)},
451                         {"Src", srcPath},
452                         {"Dst", path},
453                         {"Size", sizeToSend},
454                 }
455                 logMsg := func(les LEs) string {
456                         return fmt.Sprintf(
457                                 "File %s (%s) sent to %s:%s",
458                                 srcPath,
459                                 humanize.IBytes(uint64(sizeToSend)),
460                                 ctx.NodeName(node.Id),
461                                 path,
462                         )
463                 }
464                 if err == nil {
465                         ctx.LogI("tx", les, logMsg)
466                 } else {
467                         ctx.LogE("tx", les, err, logMsg)
468                         return err
469                 }
470                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
471                 leftSize -= sizeToSend
472                 chunkNum++
473                 if leftSize == 0 {
474                         break
475                 }
476         }
477         var metaBuf bytes.Buffer
478         _, err = xdr.Marshal(&metaBuf, metaPkt)
479         if err != nil {
480                 return err
481         }
482         path = dstPath + ChunkedSuffixMeta
483         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
484         if err != nil {
485                 return err
486         }
487         metaPktSize := int64(metaBuf.Len())
488         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
489         les := LEs{
490                 {"Type", "file"},
491                 {"Node", node.Id},
492                 {"Nice", int(nice)},
493                 {"Src", srcPath},
494                 {"Dst", path},
495                 {"Size", metaPktSize},
496         }
497         logMsg := func(les LEs) string {
498                 return fmt.Sprintf(
499                         "File %s (%s) sent to %s:%s",
500                         srcPath,
501                         humanize.IBytes(uint64(metaPktSize)),
502                         ctx.NodeName(node.Id),
503                         path,
504                 )
505         }
506         if err == nil {
507                 ctx.LogI("tx", les, logMsg)
508         } else {
509                 ctx.LogE("tx", les, err, logMsg)
510         }
511         return err
512 }
513
514 func (ctx *Ctx) TxFreq(
515         node *Node,
516         nice, replyNice uint8,
517         srcPath, dstPath string,
518         minSize int64) error {
519         dstPath = filepath.Clean(dstPath)
520         if filepath.IsAbs(dstPath) {
521                 return errors.New("Relative destination path required")
522         }
523         srcPath = filepath.Clean(srcPath)
524         if filepath.IsAbs(srcPath) {
525                 return errors.New("Relative source path required")
526         }
527         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
528         if err != nil {
529                 return err
530         }
531         src := strings.NewReader(dstPath)
532         size := int64(src.Len())
533         _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
534         les := LEs{
535                 {"Type", "freq"},
536                 {"Node", node.Id},
537                 {"Nice", int(nice)},
538                 {"ReplyNice", int(replyNice)},
539                 {"Src", srcPath},
540                 {"Dst", dstPath},
541         }
542         logMsg := func(les LEs) string {
543                 return fmt.Sprintf(
544                         "File request from %s:%s to %s sent",
545                         ctx.NodeName(node.Id), srcPath,
546                         dstPath,
547                 )
548         }
549         if err == nil {
550                 ctx.LogI("tx", les, logMsg)
551         } else {
552                 ctx.LogE("tx", les, err, logMsg)
553         }
554         return err
555 }
556
557 func (ctx *Ctx) TxExec(
558         node *Node,
559         nice, replyNice uint8,
560         handle string,
561         args []string,
562         in io.Reader,
563         minSize int64,
564         useTmp bool,
565         noCompress bool,
566 ) error {
567         path := make([][]byte, 0, 1+len(args))
568         path = append(path, []byte(handle))
569         for _, arg := range args {
570                 path = append(path, []byte(arg))
571         }
572         pktType := PktTypeExec
573         if noCompress {
574                 pktType = PktTypeExecFat
575         }
576         pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
577         if err != nil {
578                 return err
579         }
580         var size int64
581
582         if !noCompress && !useTmp {
583                 var compressed bytes.Buffer
584                 compressor, err := zstd.NewWriter(
585                         &compressed,
586                         zstd.WithEncoderLevel(zstd.SpeedDefault),
587                 )
588                 if err != nil {
589                         return err
590                 }
591                 if _, err = io.Copy(compressor, in); err != nil {
592                         compressor.Close() // #nosec G104
593                         return err
594                 }
595                 if err = compressor.Close(); err != nil {
596                         return err
597                 }
598                 size = int64(compressed.Len())
599                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
600         }
601         if noCompress && !useTmp {
602                 var data bytes.Buffer
603                 if _, err = io.Copy(&data, in); err != nil {
604                         return err
605                 }
606                 size = int64(data.Len())
607                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
608         }
609         if !noCompress && useTmp {
610                 r, w := io.Pipe()
611                 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
612                 if err != nil {
613                         return err
614                 }
615                 copyErr := make(chan error)
616                 go func() {
617                         _, err := io.Copy(compressor, in)
618                         if err != nil {
619                                 compressor.Close() // #nosec G104
620                                 copyErr <- err
621                         }
622                         err = compressor.Close()
623                         w.Close()
624                         copyErr <- err
625                 }()
626                 tmpReader, closer, fileSize, err := throughTmpFile(r)
627                 if closer != nil {
628                         defer closer.Close()
629                 }
630                 if err != nil {
631                         return err
632                 }
633                 err = <-copyErr
634                 if err != nil {
635                         return err
636                 }
637                 size = fileSize
638                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
639         }
640         if noCompress && useTmp {
641                 tmpReader, closer, fileSize, err := throughTmpFile(in)
642                 if closer != nil {
643                         defer closer.Close()
644                 }
645                 if err != nil {
646                         return err
647                 }
648                 size = fileSize
649                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
650         }
651
652         dst := strings.Join(append([]string{handle}, args...), " ")
653         les := LEs{
654                 {"Type", "exec"},
655                 {"Node", node.Id},
656                 {"Nice", int(nice)},
657                 {"ReplyNice", int(replyNice)},
658                 {"Dst", dst},
659                 {"Size", size},
660         }
661         logMsg := func(les LEs) string {
662                 return fmt.Sprintf(
663                         "Exec sent to %s@%s (%s)",
664                         ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
665                 )
666         }
667         if err == nil {
668                 ctx.LogI("tx", les, logMsg)
669         } else {
670                 ctx.LogE("tx", les, err, logMsg)
671         }
672         return err
673 }
674
675 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
676         les := LEs{
677                 {"Type", "trns"},
678                 {"Node", node.Id},
679                 {"Nice", int(nice)},
680                 {"Size", size},
681         }
682         logMsg := func(les LEs) string {
683                 return fmt.Sprintf(
684                         "Transitional packet to %s (%s) (nice %s)",
685                         ctx.NodeName(node.Id),
686                         humanize.IBytes(uint64(size)),
687                         NicenessFmt(nice),
688                 )
689         }
690         ctx.LogD("tx", les, logMsg)
691         if !ctx.IsEnoughSpace(size) {
692                 err := errors.New("is not enough space")
693                 ctx.LogE("tx", les, err, logMsg)
694                 return err
695         }
696         tmp, err := ctx.NewTmpFileWHash()
697         if err != nil {
698                 return err
699         }
700         if _, err = CopyProgressed(
701                 tmp.W, src, "Tx trns",
702                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
703                 ctx.ShowPrgrs,
704         ); err != nil {
705                 return err
706         }
707         nodePath := filepath.Join(ctx.Spool, node.Id.String())
708         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
709         if err == nil {
710                 ctx.LogI("tx", les, logMsg)
711         } else {
712                 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
713         }
714         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
715         return err
716 }