]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
MTH
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "fmt"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34         "time"
35
36         xdr "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "github.com/klauspost/compress/zstd"
39         "golang.org/x/crypto/chacha20poly1305"
40 )
41
42 const (
43         MaxFileSize = 1 << 62
44
45         TarBlockSize = 512
46         TarExt       = ".tar"
47 )
48
49 func (ctx *Ctx) Tx(
50         node *Node,
51         pkt *Pkt,
52         nice uint8,
53         size, minSize int64,
54         src io.Reader,
55         pktName string,
56 ) (*Node, error) {
57         hops := make([]*Node, 0, 1+len(node.Via))
58         hops = append(hops, node)
59         lastNode := node
60         for i := len(node.Via); i > 0; i-- {
61                 lastNode = ctx.Neigh[*node.Via[i-1]]
62                 hops = append(hops, lastNode)
63         }
64         expectedSize := size
65         for i := 0; i < len(hops); i++ {
66                 expectedSize = PktEncOverhead +
67                         PktSizeOverhead +
68                         sizeWithTags(PktOverhead+expectedSize)
69         }
70         padSize := minSize - expectedSize
71         if padSize < 0 {
72                 padSize = 0
73         }
74         if !ctx.IsEnoughSpace(size + padSize) {
75                 return nil, errors.New("is not enough space")
76         }
77         tmp, err := ctx.NewTmpFileWHash()
78         if err != nil {
79                 return nil, err
80         }
81
82         errs := make(chan error)
83         curSize := size
84         pipeR, pipeW := io.Pipe()
85         var pktEncRaw []byte
86         go func(size int64, src io.Reader, dst io.WriteCloser) {
87                 ctx.LogD("tx", LEs{
88                         {"Node", hops[0].Id},
89                         {"Nice", int(nice)},
90                         {"Size", size},
91                 }, func(les LEs) string {
92                         return fmt.Sprintf(
93                                 "Tx packet to %s (%s) nice: %s",
94                                 ctx.NodeName(hops[0].Id),
95                                 humanize.IBytes(uint64(size)),
96                                 NicenessFmt(nice),
97                         )
98                 })
99                 pktEncRaw, err = PktEncWrite(
100                         ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
101                 )
102                 errs <- err
103                 dst.Close() // #nosec G104
104         }(curSize, src, pipeW)
105         curSize = PktEncOverhead +
106                 PktSizeOverhead +
107                 sizeWithTags(PktOverhead+curSize) +
108                 padSize
109
110         var pipeRPrev io.Reader
111         for i := 1; i < len(hops); i++ {
112                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
113                 pipeRPrev = pipeR
114                 pipeR, pipeW = io.Pipe()
115                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
116                         ctx.LogD("tx", LEs{
117                                 {"Node", node.Id},
118                                 {"Nice", int(nice)},
119                                 {"Size", size},
120                         }, func(les LEs) string {
121                                 return fmt.Sprintf(
122                                         "Tx trns packet to %s (%s) nice: %s",
123                                         ctx.NodeName(node.Id),
124                                         humanize.IBytes(uint64(size)),
125                                         NicenessFmt(nice),
126                                 )
127                         })
128                         _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
129                         errs <- err
130                         dst.Close() // #nosec G104
131                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
132                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
133         }
134         go func() {
135                 _, err := CopyProgressed(
136                         tmp.W, pipeR, "Tx",
137                         LEs{{"Pkt", pktName}, {"FullSize", curSize}},
138                         ctx.ShowPrgrs,
139                 )
140                 errs <- err
141         }()
142         for i := 0; i <= len(hops); i++ {
143                 err = <-errs
144                 if err != nil {
145                         tmp.Fd.Close() // #nosec G104
146                         return nil, err
147                 }
148         }
149         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
150         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
151         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
152         if err != nil {
153                 return lastNode, err
154         }
155         if ctx.HdrUsage {
156                 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
157         }
158         return lastNode, err
159 }
160
161 type DummyCloser struct{}
162
163 func (dc DummyCloser) Close() error { return nil }
164
165 func throughTmpFile(r io.Reader) (
166         reader io.Reader,
167         closer io.Closer,
168         fileSize int64,
169         rerr error,
170 ) {
171         src, err := ioutil.TempFile("", "nncp-file")
172         if err != nil {
173                 rerr = err
174                 return
175         }
176         os.Remove(src.Name()) // #nosec G104
177         tmpW := bufio.NewWriter(src)
178         tmpKey := make([]byte, chacha20poly1305.KeySize)
179         if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
180                 return
181         }
182         aead, err := chacha20poly1305.New(tmpKey)
183         if err != nil {
184                 rerr = err
185                 return
186         }
187         nonce := make([]byte, aead.NonceSize())
188         written, err := aeadProcess(aead, nonce, true, r, tmpW)
189         if err != nil {
190                 rerr = err
191                 return
192         }
193         fileSize = int64(written)
194         if err = tmpW.Flush(); err != nil {
195                 rerr = err
196                 return
197         }
198         if _, err = src.Seek(0, io.SeekStart); err != nil {
199                 rerr = err
200                 return
201         }
202         r, w := io.Pipe()
203         go func() {
204                 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
205                         w.CloseWithError(err) // #nosec G104
206                 }
207         }()
208         reader = r
209         closer = src
210         return
211 }
212
213 func prepareTxFile(srcPath string) (
214         reader io.Reader,
215         closer io.Closer,
216         fileSize int64,
217         archived bool,
218         rerr error,
219 ) {
220         if srcPath == "-" {
221                 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
222                 return
223         }
224
225         srcStat, err := os.Stat(srcPath)
226         if err != nil {
227                 rerr = err
228                 return
229         }
230         mode := srcStat.Mode()
231
232         if mode.IsRegular() {
233                 // It is regular file, just send it
234                 src, err := os.Open(srcPath)
235                 if err != nil {
236                         rerr = err
237                         return
238                 }
239                 fileSize = srcStat.Size()
240                 reader = bufio.NewReader(src)
241                 closer = src
242                 return
243         }
244
245         if !mode.IsDir() {
246                 rerr = errors.New("unsupported file type")
247                 return
248         }
249
250         // It is directory, create PAX archive with its contents
251         archived = true
252         basePath := filepath.Base(srcPath)
253         rootPath, err := filepath.Abs(srcPath)
254         if err != nil {
255                 rerr = err
256                 return
257         }
258         type einfo struct {
259                 path    string
260                 modTime time.Time
261                 size    int64
262         }
263         dirs := make([]einfo, 0, 1<<10)
264         files := make([]einfo, 0, 1<<10)
265         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
266                 if err != nil {
267                         return err
268                 }
269                 if info.IsDir() {
270                         // directory header, PAX record header+contents
271                         fileSize += TarBlockSize + 2*TarBlockSize
272                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
273                 } else {
274                         // file header, PAX record header+contents, file content
275                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
276                         if n := info.Size() % TarBlockSize; n != 0 {
277                                 fileSize += TarBlockSize - n // padding
278                         }
279                         files = append(files, einfo{
280                                 path:    path,
281                                 modTime: info.ModTime(),
282                                 size:    info.Size(),
283                         })
284                 }
285                 return nil
286         })
287         if rerr != nil {
288                 return
289         }
290
291         r, w := io.Pipe()
292         reader = r
293         closer = DummyCloser{}
294         fileSize += 2 * TarBlockSize // termination block
295
296         go func() error {
297                 tarWr := tar.NewWriter(w)
298                 hdr := tar.Header{
299                         Typeflag: tar.TypeDir,
300                         Mode:     0777,
301                         PAXRecords: map[string]string{
302                                 "comment": "Autogenerated by " + VersionGet(),
303                         },
304                         Format: tar.FormatPAX,
305                 }
306                 for _, e := range dirs {
307                         hdr.Name = basePath + e.path[len(rootPath):]
308                         hdr.ModTime = e.modTime
309                         if err = tarWr.WriteHeader(&hdr); err != nil {
310                                 return w.CloseWithError(err)
311                         }
312                 }
313                 hdr.Typeflag = tar.TypeReg
314                 hdr.Mode = 0666
315                 for _, e := range files {
316                         hdr.Name = basePath + e.path[len(rootPath):]
317                         hdr.ModTime = e.modTime
318                         hdr.Size = e.size
319                         if err = tarWr.WriteHeader(&hdr); err != nil {
320                                 return w.CloseWithError(err)
321                         }
322                         fd, err := os.Open(e.path)
323                         if err != nil {
324                                 fd.Close() // #nosec G104
325                                 return w.CloseWithError(err)
326                         }
327                         if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
328                                 fd.Close() // #nosec G104
329                                 return w.CloseWithError(err)
330                         }
331                         fd.Close() // #nosec G104
332                 }
333                 if err = tarWr.Close(); err != nil {
334                         return w.CloseWithError(err)
335                 }
336                 return w.Close()
337         }()
338         return
339 }
340
341 func (ctx *Ctx) TxFile(
342         node *Node,
343         nice uint8,
344         srcPath, dstPath string,
345         chunkSize int64,
346         minSize, maxSize int64,
347 ) error {
348         dstPathSpecified := false
349         if dstPath == "" {
350                 if srcPath == "-" {
351                         return errors.New("Must provide destination filename")
352                 }
353                 dstPath = filepath.Base(srcPath)
354         } else {
355                 dstPathSpecified = true
356         }
357         dstPath = filepath.Clean(dstPath)
358         if filepath.IsAbs(dstPath) {
359                 return errors.New("Relative destination path required")
360         }
361         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
362         if closer != nil {
363                 defer closer.Close()
364         }
365         if err != nil {
366                 return err
367         }
368         if fileSize > maxSize {
369                 return errors.New("Too big than allowed")
370         }
371         if archived && !dstPathSpecified {
372                 dstPath += TarExt
373         }
374
375         if fileSize <= chunkSize {
376                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
377                 if err != nil {
378                         return err
379                 }
380                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
381                 les := LEs{
382                         {"Type", "file"},
383                         {"Node", node.Id},
384                         {"Nice", int(nice)},
385                         {"Src", srcPath},
386                         {"Dst", dstPath},
387                         {"Size", fileSize},
388                 }
389                 logMsg := func(les LEs) string {
390                         return fmt.Sprintf(
391                                 "File %s (%s) sent to %s:%s",
392                                 srcPath,
393                                 humanize.IBytes(uint64(fileSize)),
394                                 ctx.NodeName(node.Id),
395                                 dstPath,
396                         )
397                 }
398                 if err == nil {
399                         ctx.LogI("tx", les, logMsg)
400                 } else {
401                         ctx.LogE("tx", les, err, logMsg)
402                 }
403                 return err
404         }
405
406         leftSize := fileSize
407         metaPkt := ChunkedMeta{
408                 Magic:     MagicNNCPMv2,
409                 FileSize:  uint64(fileSize),
410                 ChunkSize: uint64(chunkSize),
411                 Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
412         }
413         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
414                 hsh := new([MTHSize]byte)
415                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
416         }
417         var sizeToSend int64
418         var hsh hash.Hash
419         var pkt *Pkt
420         var chunkNum int
421         var path string
422         for {
423                 if leftSize <= chunkSize {
424                         sizeToSend = leftSize
425                 } else {
426                         sizeToSend = chunkSize
427                 }
428                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
429                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
430                 if err != nil {
431                         return err
432                 }
433                 hsh = MTHNew(0, 0)
434                 _, err = ctx.Tx(
435                         node,
436                         pkt,
437                         nice,
438                         sizeToSend,
439                         minSize,
440                         io.TeeReader(reader, hsh),
441                         path,
442                 )
443                 les := LEs{
444                         {"Type", "file"},
445                         {"Node", node.Id},
446                         {"Nice", int(nice)},
447                         {"Src", srcPath},
448                         {"Dst", path},
449                         {"Size", sizeToSend},
450                 }
451                 logMsg := func(les LEs) string {
452                         return fmt.Sprintf(
453                                 "File %s (%s) sent to %s:%s",
454                                 srcPath,
455                                 humanize.IBytes(uint64(sizeToSend)),
456                                 ctx.NodeName(node.Id),
457                                 path,
458                         )
459                 }
460                 if err == nil {
461                         ctx.LogI("tx", les, logMsg)
462                 } else {
463                         ctx.LogE("tx", les, err, logMsg)
464                         return err
465                 }
466                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
467                 leftSize -= sizeToSend
468                 chunkNum++
469                 if leftSize == 0 {
470                         break
471                 }
472         }
473         var metaBuf bytes.Buffer
474         _, err = xdr.Marshal(&metaBuf, metaPkt)
475         if err != nil {
476                 return err
477         }
478         path = dstPath + ChunkedSuffixMeta
479         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
480         if err != nil {
481                 return err
482         }
483         metaPktSize := int64(metaBuf.Len())
484         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
485         les := LEs{
486                 {"Type", "file"},
487                 {"Node", node.Id},
488                 {"Nice", int(nice)},
489                 {"Src", srcPath},
490                 {"Dst", path},
491                 {"Size", metaPktSize},
492         }
493         logMsg := func(les LEs) string {
494                 return fmt.Sprintf(
495                         "File %s (%s) sent to %s:%s",
496                         srcPath,
497                         humanize.IBytes(uint64(metaPktSize)),
498                         ctx.NodeName(node.Id),
499                         path,
500                 )
501         }
502         if err == nil {
503                 ctx.LogI("tx", les, logMsg)
504         } else {
505                 ctx.LogE("tx", les, err, logMsg)
506         }
507         return err
508 }
509
510 func (ctx *Ctx) TxFreq(
511         node *Node,
512         nice, replyNice uint8,
513         srcPath, dstPath string,
514         minSize int64) error {
515         dstPath = filepath.Clean(dstPath)
516         if filepath.IsAbs(dstPath) {
517                 return errors.New("Relative destination path required")
518         }
519         srcPath = filepath.Clean(srcPath)
520         if filepath.IsAbs(srcPath) {
521                 return errors.New("Relative source path required")
522         }
523         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
524         if err != nil {
525                 return err
526         }
527         src := strings.NewReader(dstPath)
528         size := int64(src.Len())
529         _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
530         les := LEs{
531                 {"Type", "freq"},
532                 {"Node", node.Id},
533                 {"Nice", int(nice)},
534                 {"ReplyNice", int(replyNice)},
535                 {"Src", srcPath},
536                 {"Dst", dstPath},
537         }
538         logMsg := func(les LEs) string {
539                 return fmt.Sprintf(
540                         "File request from %s:%s to %s sent",
541                         ctx.NodeName(node.Id), srcPath,
542                         dstPath,
543                 )
544         }
545         if err == nil {
546                 ctx.LogI("tx", les, logMsg)
547         } else {
548                 ctx.LogE("tx", les, err, logMsg)
549         }
550         return err
551 }
552
553 func (ctx *Ctx) TxExec(
554         node *Node,
555         nice, replyNice uint8,
556         handle string,
557         args []string,
558         in io.Reader,
559         minSize int64,
560         useTmp bool,
561         noCompress bool,
562 ) error {
563         path := make([][]byte, 0, 1+len(args))
564         path = append(path, []byte(handle))
565         for _, arg := range args {
566                 path = append(path, []byte(arg))
567         }
568         pktType := PktTypeExec
569         if noCompress {
570                 pktType = PktTypeExecFat
571         }
572         pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
573         if err != nil {
574                 return err
575         }
576         var size int64
577
578         if !noCompress && !useTmp {
579                 var compressed bytes.Buffer
580                 compressor, err := zstd.NewWriter(
581                         &compressed,
582                         zstd.WithEncoderLevel(zstd.SpeedDefault),
583                 )
584                 if err != nil {
585                         return err
586                 }
587                 if _, err = io.Copy(compressor, in); err != nil {
588                         compressor.Close() // #nosec G104
589                         return err
590                 }
591                 if err = compressor.Close(); err != nil {
592                         return err
593                 }
594                 size = int64(compressed.Len())
595                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
596         }
597         if noCompress && !useTmp {
598                 var data bytes.Buffer
599                 if _, err = io.Copy(&data, in); err != nil {
600                         return err
601                 }
602                 size = int64(data.Len())
603                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
604         }
605         if !noCompress && useTmp {
606                 r, w := io.Pipe()
607                 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
608                 if err != nil {
609                         return err
610                 }
611                 copyErr := make(chan error)
612                 go func() {
613                         _, err := io.Copy(compressor, in)
614                         if err != nil {
615                                 compressor.Close() // #nosec G104
616                                 copyErr <- err
617                         }
618                         err = compressor.Close()
619                         w.Close()
620                         copyErr <- err
621                 }()
622                 tmpReader, closer, fileSize, err := throughTmpFile(r)
623                 if closer != nil {
624                         defer closer.Close()
625                 }
626                 if err != nil {
627                         return err
628                 }
629                 err = <-copyErr
630                 if err != nil {
631                         return err
632                 }
633                 size = fileSize
634                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
635         }
636         if noCompress && useTmp {
637                 tmpReader, closer, fileSize, err := throughTmpFile(in)
638                 if closer != nil {
639                         defer closer.Close()
640                 }
641                 if err != nil {
642                         return err
643                 }
644                 size = fileSize
645                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
646         }
647
648         dst := strings.Join(append([]string{handle}, args...), " ")
649         les := LEs{
650                 {"Type", "exec"},
651                 {"Node", node.Id},
652                 {"Nice", int(nice)},
653                 {"ReplyNice", int(replyNice)},
654                 {"Dst", dst},
655                 {"Size", size},
656         }
657         logMsg := func(les LEs) string {
658                 return fmt.Sprintf(
659                         "Exec sent to %s@%s (%s)",
660                         ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
661                 )
662         }
663         if err == nil {
664                 ctx.LogI("tx", les, logMsg)
665         } else {
666                 ctx.LogE("tx", les, err, logMsg)
667         }
668         return err
669 }
670
671 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
672         les := LEs{
673                 {"Type", "trns"},
674                 {"Node", node.Id},
675                 {"Nice", int(nice)},
676                 {"Size", size},
677         }
678         logMsg := func(les LEs) string {
679                 return fmt.Sprintf(
680                         "Transitional packet to %s (%s) (nice %s)",
681                         ctx.NodeName(node.Id),
682                         humanize.IBytes(uint64(size)),
683                         NicenessFmt(nice),
684                 )
685         }
686         ctx.LogD("tx", les, logMsg)
687         if !ctx.IsEnoughSpace(size) {
688                 err := errors.New("is not enough space")
689                 ctx.LogE("tx", les, err, logMsg)
690                 return err
691         }
692         tmp, err := ctx.NewTmpFileWHash()
693         if err != nil {
694                 return err
695         }
696         if _, err = CopyProgressed(
697                 tmp.W, src, "Tx trns",
698                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
699                 ctx.ShowPrgrs,
700         ); err != nil {
701                 return err
702         }
703         nodePath := filepath.Join(ctx.Spool, node.Id.String())
704         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
705         if err == nil {
706                 ctx.LogI("tx", les, logMsg)
707         } else {
708                 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
709         }
710         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
711         return err
712 }