]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Merge branch 'develop'
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "hash"
27         "io"
28         "io/ioutil"
29         "os"
30         "path/filepath"
31         "strconv"
32         "strings"
33         "time"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/klauspost/compress/zstd"
37         "golang.org/x/crypto/blake2b"
38         "golang.org/x/crypto/chacha20poly1305"
39 )
40
41 const (
42         MaxFileSize = 1 << 62
43
44         TarBlockSize = 512
45         TarExt       = ".tar"
46 )
47
48 func (ctx *Ctx) Tx(
49         node *Node,
50         pkt *Pkt,
51         nice uint8,
52         size, minSize int64,
53         src io.Reader,
54         pktName string,
55 ) (*Node, error) {
56         hops := make([]*Node, 0, 1+len(node.Via))
57         hops = append(hops, node)
58         lastNode := node
59         for i := len(node.Via); i > 0; i-- {
60                 lastNode = ctx.Neigh[*node.Via[i-1]]
61                 hops = append(hops, lastNode)
62         }
63         expectedSize := size
64         for i := 0; i < len(hops); i++ {
65                 expectedSize = PktEncOverhead +
66                         PktSizeOverhead +
67                         sizeWithTags(PktOverhead+expectedSize)
68         }
69         padSize := minSize - expectedSize
70         if padSize < 0 {
71                 padSize = 0
72         }
73         if !ctx.IsEnoughSpace(size + padSize) {
74                 return nil, errors.New("is not enough space")
75         }
76         tmp, err := ctx.NewTmpFileWHash()
77         if err != nil {
78                 return nil, err
79         }
80
81         errs := make(chan error)
82         curSize := size
83         pipeR, pipeW := io.Pipe()
84         var pktEncRaw []byte
85         go func(size int64, src io.Reader, dst io.WriteCloser) {
86                 ctx.LogD("tx", LEs{
87                         {"Node", hops[0].Id},
88                         {"Nice", int(nice)},
89                         {"Size", size},
90                 }, "wrote")
91                 pktEncRaw, err = PktEncWrite(
92                         ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
93                 )
94                 errs <- err
95                 dst.Close() // #nosec G104
96         }(curSize, src, pipeW)
97         curSize = PktEncOverhead +
98                 PktSizeOverhead +
99                 sizeWithTags(PktOverhead+curSize) +
100                 padSize
101
102         var pipeRPrev io.Reader
103         for i := 1; i < len(hops); i++ {
104                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
105                 pipeRPrev = pipeR
106                 pipeR, pipeW = io.Pipe()
107                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
108                         ctx.LogD("tx", LEs{
109                                 {"Node", node.Id},
110                                 {"Nice", int(nice)},
111                                 {"Size", size},
112                         }, "trns wrote")
113                         _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
114                         errs <- err
115                         dst.Close() // #nosec G104
116                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
117                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
118         }
119         go func() {
120                 _, err := CopyProgressed(
121                         tmp.W, pipeR, "Tx",
122                         LEs{{"Pkt", pktName}, {"FullSize", curSize}},
123                         ctx.ShowPrgrs,
124                 )
125                 errs <- err
126         }()
127         for i := 0; i <= len(hops); i++ {
128                 err = <-errs
129                 if err != nil {
130                         tmp.Fd.Close() // #nosec G104
131                         return nil, err
132                 }
133         }
134         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
135         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
136         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
137         if err != nil {
138                 return lastNode, err
139         }
140         if ctx.HdrUsage {
141                 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
142         }
143         return lastNode, err
144 }
145
146 type DummyCloser struct{}
147
148 func (dc DummyCloser) Close() error { return nil }
149
150 func throughTmpFile(r io.Reader) (
151         reader io.Reader,
152         closer io.Closer,
153         fileSize int64,
154         rerr error,
155 ) {
156         src, err := ioutil.TempFile("", "nncp-file")
157         if err != nil {
158                 rerr = err
159                 return
160         }
161         os.Remove(src.Name()) // #nosec G104
162         tmpW := bufio.NewWriter(src)
163         tmpKey := make([]byte, chacha20poly1305.KeySize)
164         if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
165                 return
166         }
167         aead, err := chacha20poly1305.New(tmpKey)
168         if err != nil {
169                 rerr = err
170                 return
171         }
172         nonce := make([]byte, aead.NonceSize())
173         written, err := aeadProcess(aead, nonce, true, r, tmpW)
174         if err != nil {
175                 rerr = err
176                 return
177         }
178         fileSize = int64(written)
179         if err = tmpW.Flush(); err != nil {
180                 rerr = err
181                 return
182         }
183         if _, err = src.Seek(0, io.SeekStart); err != nil {
184                 rerr = err
185                 return
186         }
187         r, w := io.Pipe()
188         go func() {
189                 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
190                         w.CloseWithError(err) // #nosec G104
191                 }
192         }()
193         reader = r
194         closer = src
195         return
196 }
197
198 func prepareTxFile(srcPath string) (
199         reader io.Reader,
200         closer io.Closer,
201         fileSize int64,
202         archived bool,
203         rerr error,
204 ) {
205         if srcPath == "-" {
206                 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
207                 return
208         }
209
210         srcStat, err := os.Stat(srcPath)
211         if err != nil {
212                 rerr = err
213                 return
214         }
215         mode := srcStat.Mode()
216
217         if mode.IsRegular() {
218                 // It is regular file, just send it
219                 src, err := os.Open(srcPath)
220                 if err != nil {
221                         rerr = err
222                         return
223                 }
224                 fileSize = srcStat.Size()
225                 reader = bufio.NewReader(src)
226                 closer = src
227                 return
228         }
229
230         if !mode.IsDir() {
231                 rerr = errors.New("unsupported file type")
232                 return
233         }
234
235         // It is directory, create PAX archive with its contents
236         archived = true
237         basePath := filepath.Base(srcPath)
238         rootPath, err := filepath.Abs(srcPath)
239         if err != nil {
240                 rerr = err
241                 return
242         }
243         type einfo struct {
244                 path    string
245                 modTime time.Time
246                 size    int64
247         }
248         dirs := make([]einfo, 0, 1<<10)
249         files := make([]einfo, 0, 1<<10)
250         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
251                 if err != nil {
252                         return err
253                 }
254                 if info.IsDir() {
255                         // directory header, PAX record header+contents
256                         fileSize += TarBlockSize + 2*TarBlockSize
257                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
258                 } else {
259                         // file header, PAX record header+contents, file content
260                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
261                         if n := info.Size() % TarBlockSize; n != 0 {
262                                 fileSize += TarBlockSize - n // padding
263                         }
264                         files = append(files, einfo{
265                                 path:    path,
266                                 modTime: info.ModTime(),
267                                 size:    info.Size(),
268                         })
269                 }
270                 return nil
271         })
272         if rerr != nil {
273                 return
274         }
275
276         r, w := io.Pipe()
277         reader = r
278         closer = DummyCloser{}
279         fileSize += 2 * TarBlockSize // termination block
280
281         go func() error {
282                 tarWr := tar.NewWriter(w)
283                 hdr := tar.Header{
284                         Typeflag: tar.TypeDir,
285                         Mode:     0777,
286                         PAXRecords: map[string]string{
287                                 "comment": "Autogenerated by " + VersionGet(),
288                         },
289                         Format: tar.FormatPAX,
290                 }
291                 for _, e := range dirs {
292                         hdr.Name = basePath + e.path[len(rootPath):]
293                         hdr.ModTime = e.modTime
294                         if err = tarWr.WriteHeader(&hdr); err != nil {
295                                 return w.CloseWithError(err)
296                         }
297                 }
298                 hdr.Typeflag = tar.TypeReg
299                 hdr.Mode = 0666
300                 for _, e := range files {
301                         hdr.Name = basePath + e.path[len(rootPath):]
302                         hdr.ModTime = e.modTime
303                         hdr.Size = e.size
304                         if err = tarWr.WriteHeader(&hdr); err != nil {
305                                 return w.CloseWithError(err)
306                         }
307                         fd, err := os.Open(e.path)
308                         if err != nil {
309                                 fd.Close() // #nosec G104
310                                 return w.CloseWithError(err)
311                         }
312                         if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
313                                 fd.Close() // #nosec G104
314                                 return w.CloseWithError(err)
315                         }
316                         fd.Close() // #nosec G104
317                 }
318                 if err = tarWr.Close(); err != nil {
319                         return w.CloseWithError(err)
320                 }
321                 return w.Close()
322         }()
323         return
324 }
325
326 func (ctx *Ctx) TxFile(
327         node *Node,
328         nice uint8,
329         srcPath, dstPath string,
330         chunkSize int64,
331         minSize, maxSize int64,
332 ) error {
333         dstPathSpecified := false
334         if dstPath == "" {
335                 if srcPath == "-" {
336                         return errors.New("Must provide destination filename")
337                 }
338                 dstPath = filepath.Base(srcPath)
339         } else {
340                 dstPathSpecified = true
341         }
342         dstPath = filepath.Clean(dstPath)
343         if filepath.IsAbs(dstPath) {
344                 return errors.New("Relative destination path required")
345         }
346         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
347         if closer != nil {
348                 defer closer.Close()
349         }
350         if err != nil {
351                 return err
352         }
353         if fileSize > maxSize {
354                 return errors.New("Too big than allowed")
355         }
356         if archived && !dstPathSpecified {
357                 dstPath += TarExt
358         }
359
360         if fileSize <= chunkSize {
361                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
362                 if err != nil {
363                         return err
364                 }
365                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
366                 les := LEs{
367                         {"Type", "file"},
368                         {"Node", node.Id},
369                         {"Nice", int(nice)},
370                         {"Src", srcPath},
371                         {"Dst", dstPath},
372                         {"Size", fileSize},
373                 }
374                 if err == nil {
375                         ctx.LogI("tx", les, "sent")
376                 } else {
377                         ctx.LogE("tx", les, err, "sent")
378                 }
379                 return err
380         }
381
382         leftSize := fileSize
383         metaPkt := ChunkedMeta{
384                 Magic:     MagicNNCPMv1,
385                 FileSize:  uint64(fileSize),
386                 ChunkSize: uint64(chunkSize),
387                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
388         }
389         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
390                 hsh := new([32]byte)
391                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
392         }
393         var sizeToSend int64
394         var hsh hash.Hash
395         var pkt *Pkt
396         var chunkNum int
397         var path string
398         for {
399                 if leftSize <= chunkSize {
400                         sizeToSend = leftSize
401                 } else {
402                         sizeToSend = chunkSize
403                 }
404                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
405                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
406                 if err != nil {
407                         return err
408                 }
409                 hsh, err = blake2b.New256(nil)
410                 if err != nil {
411                         return err
412                 }
413                 _, err = ctx.Tx(
414                         node,
415                         pkt,
416                         nice,
417                         sizeToSend,
418                         minSize,
419                         io.TeeReader(reader, hsh),
420                         path,
421                 )
422                 les := LEs{
423                         {"Type", "file"},
424                         {"Node", node.Id},
425                         {"Nice", int(nice)},
426                         {"Src", srcPath},
427                         {"Dst", path},
428                         {"Size", sizeToSend},
429                 }
430                 if err == nil {
431                         ctx.LogI("tx", les, "sent")
432                 } else {
433                         ctx.LogE("tx", les, err, "sent")
434                         return err
435                 }
436                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
437                 leftSize -= sizeToSend
438                 chunkNum++
439                 if leftSize == 0 {
440                         break
441                 }
442         }
443         var metaBuf bytes.Buffer
444         _, err = xdr.Marshal(&metaBuf, metaPkt)
445         if err != nil {
446                 return err
447         }
448         path = dstPath + ChunkedSuffixMeta
449         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
450         if err != nil {
451                 return err
452         }
453         metaPktSize := int64(metaBuf.Len())
454         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
455         les := LEs{
456                 {"Type", "file"},
457                 {"Node", node.Id},
458                 {"Nice", int(nice)},
459                 {"Src", srcPath},
460                 {"Dst", path},
461                 {"Size", metaPktSize},
462         }
463         if err == nil {
464                 ctx.LogI("tx", les, "sent")
465         } else {
466                 ctx.LogE("tx", les, err, "sent")
467         }
468         return err
469 }
470
471 func (ctx *Ctx) TxFreq(
472         node *Node,
473         nice, replyNice uint8,
474         srcPath, dstPath string,
475         minSize int64) error {
476         dstPath = filepath.Clean(dstPath)
477         if filepath.IsAbs(dstPath) {
478                 return errors.New("Relative destination path required")
479         }
480         srcPath = filepath.Clean(srcPath)
481         if filepath.IsAbs(srcPath) {
482                 return errors.New("Relative source path required")
483         }
484         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
485         if err != nil {
486                 return err
487         }
488         src := strings.NewReader(dstPath)
489         size := int64(src.Len())
490         _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
491         les := LEs{
492                 {"Type", "freq"},
493                 {"Node", node.Id},
494                 {"Nice", int(nice)},
495                 {"ReplyNice", int(replyNice)},
496                 {"Src", srcPath},
497                 {"Dst", dstPath},
498         }
499         if err == nil {
500                 ctx.LogI("tx", les, "sent")
501         } else {
502                 ctx.LogE("tx", les, err, "sent")
503         }
504         return err
505 }
506
507 func (ctx *Ctx) TxExec(
508         node *Node,
509         nice, replyNice uint8,
510         handle string,
511         args []string,
512         in io.Reader,
513         minSize int64,
514         useTmp bool,
515         noCompress bool,
516 ) error {
517         path := make([][]byte, 0, 1+len(args))
518         path = append(path, []byte(handle))
519         for _, arg := range args {
520                 path = append(path, []byte(arg))
521         }
522         pktType := PktTypeExec
523         if noCompress {
524                 pktType = PktTypeExecFat
525         }
526         pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
527         if err != nil {
528                 return err
529         }
530         var size int64
531
532         if !noCompress && !useTmp {
533                 var compressed bytes.Buffer
534                 compressor, err := zstd.NewWriter(
535                         &compressed,
536                         zstd.WithEncoderLevel(zstd.SpeedDefault),
537                 )
538                 if err != nil {
539                         return err
540                 }
541                 if _, err = io.Copy(compressor, in); err != nil {
542                         compressor.Close() // #nosec G104
543                         return err
544                 }
545                 if err = compressor.Close(); err != nil {
546                         return err
547                 }
548                 size = int64(compressed.Len())
549                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
550         }
551         if noCompress && !useTmp {
552                 var data bytes.Buffer
553                 if _, err = io.Copy(&data, in); err != nil {
554                         return err
555                 }
556                 size = int64(data.Len())
557                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
558         }
559         if !noCompress && useTmp {
560                 r, w := io.Pipe()
561                 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
562                 if err != nil {
563                         return err
564                 }
565                 copyErr := make(chan error)
566                 go func() {
567                         _, err := io.Copy(compressor, in)
568                         if err != nil {
569                                 compressor.Close() // #nosec G104
570                                 copyErr <- err
571                         }
572                         err = compressor.Close()
573                         w.Close()
574                         copyErr <- err
575                 }()
576                 tmpReader, closer, fileSize, err := throughTmpFile(r)
577                 if closer != nil {
578                         defer closer.Close()
579                 }
580                 if err != nil {
581                         return err
582                 }
583                 err = <-copyErr
584                 if err != nil {
585                         return err
586                 }
587                 size = fileSize
588                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
589         }
590         if noCompress && useTmp {
591                 tmpReader, closer, fileSize, err := throughTmpFile(in)
592                 if closer != nil {
593                         defer closer.Close()
594                 }
595                 if err != nil {
596                         return err
597                 }
598                 size = fileSize
599                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
600         }
601
602         les := LEs{
603                 {"Type", "exec"},
604                 {"Node", node.Id},
605                 {"Nice", int(nice)},
606                 {"ReplyNice", int(replyNice)},
607                 {"Dst", strings.Join(append([]string{handle}, args...), " ")},
608                 {"Size", size},
609         }
610         if err == nil {
611                 ctx.LogI("tx", les, "sent")
612         } else {
613                 ctx.LogE("tx", les, err, "sent")
614         }
615         return err
616 }
617
618 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
619         les := LEs{
620                 {"Type", "trns"},
621                 {"Node", node.Id},
622                 {"Nice", int(nice)},
623                 {"Size", size},
624         }
625         ctx.LogD("tx", les, "taken")
626         if !ctx.IsEnoughSpace(size) {
627                 err := errors.New("is not enough space")
628                 ctx.LogE("tx", les, err, err.Error())
629                 return err
630         }
631         tmp, err := ctx.NewTmpFileWHash()
632         if err != nil {
633                 return err
634         }
635         if _, err = CopyProgressed(
636                 tmp.W, src, "Tx trns",
637                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
638                 ctx.ShowPrgrs,
639         ); err != nil {
640                 return err
641         }
642         nodePath := filepath.Join(ctx.Spool, node.Id.String())
643         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
644         if err == nil {
645                 ctx.LogI("tx", les, "sent")
646         } else {
647                 ctx.LogI("tx", append(les, LE{"Err", err}), "sent")
648         }
649         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
650         return err
651 }