]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Merge branch 'develop'
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "hash"
27         "io"
28         "io/ioutil"
29         "os"
30         "path/filepath"
31         "strconv"
32         "strings"
33         "time"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/klauspost/compress/zstd"
37         "golang.org/x/crypto/blake2b"
38         "golang.org/x/crypto/chacha20poly1305"
39 )
40
41 const (
42         MaxFileSize = 1 << 62
43
44         TarBlockSize = 512
45         TarExt       = ".tar"
46 )
47
48 func (ctx *Ctx) Tx(
49         node *Node,
50         pkt *Pkt,
51         nice uint8,
52         size, minSize int64,
53         src io.Reader,
54         pktName string,
55 ) (*Node, error) {
56         hops := make([]*Node, 0, 1+len(node.Via))
57         hops = append(hops, node)
58         lastNode := node
59         for i := len(node.Via); i > 0; i-- {
60                 lastNode = ctx.Neigh[*node.Via[i-1]]
61                 hops = append(hops, lastNode)
62         }
63         expectedSize := size
64         for i := 0; i < len(hops); i++ {
65                 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
66         }
67         padSize := minSize - expectedSize
68         if padSize < 0 {
69                 padSize = 0
70         }
71         if !ctx.IsEnoughSpace(size + padSize) {
72                 return nil, errors.New("is not enough space")
73         }
74         tmp, err := ctx.NewTmpFileWHash()
75         if err != nil {
76                 return nil, err
77         }
78
79         errs := make(chan error)
80         curSize := size
81         pipeR, pipeW := io.Pipe()
82         go func(size int64, src io.Reader, dst io.WriteCloser) {
83                 ctx.LogD("tx", LEs{
84                         {"Node", hops[0].Id},
85                         {"Nice", int(nice)},
86                         {"Size", size},
87                 }, "wrote")
88                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
89                 dst.Close() // #nosec G104
90         }(curSize, src, pipeW)
91         curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
92
93         var pipeRPrev io.Reader
94         for i := 1; i < len(hops); i++ {
95                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
96                 pipeRPrev = pipeR
97                 pipeR, pipeW = io.Pipe()
98                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
99                         ctx.LogD("tx", LEs{
100                                 {"Node", node.Id},
101                                 {"Nice", int(nice)},
102                                 {"Size", size},
103                         }, "trns wrote")
104                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
105                         dst.Close() // #nosec G104
106                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
107                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
108         }
109         go func() {
110                 _, err := CopyProgressed(
111                         tmp.W, pipeR, "Tx",
112                         LEs{{"Pkt", pktName}, {"FullSize", curSize}},
113                         ctx.ShowPrgrs,
114                 )
115                 errs <- err
116         }()
117         for i := 0; i <= len(hops); i++ {
118                 err = <-errs
119                 if err != nil {
120                         tmp.Fd.Close() // #nosec G104
121                         return nil, err
122                 }
123         }
124         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
125         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
126         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
127         return lastNode, err
128 }
129
130 type DummyCloser struct{}
131
132 func (dc DummyCloser) Close() error { return nil }
133
134 func throughTmpFile(r io.Reader) (
135         reader io.Reader,
136         closer io.Closer,
137         fileSize int64,
138         rerr error,
139 ) {
140         src, err := ioutil.TempFile("", "nncp-file")
141         if err != nil {
142                 rerr = err
143                 return
144         }
145         os.Remove(src.Name()) // #nosec G104
146         tmpW := bufio.NewWriter(src)
147         tmpKey := make([]byte, chacha20poly1305.KeySize)
148         if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
149                 return
150         }
151         aead, err := chacha20poly1305.New(tmpKey)
152         if err != nil {
153                 rerr = err
154                 return
155         }
156         nonce := make([]byte, aead.NonceSize())
157         written, err := aeadProcess(aead, nonce, true, r, tmpW)
158         if err != nil {
159                 rerr = err
160                 return
161         }
162         fileSize = int64(written)
163         if err = tmpW.Flush(); err != nil {
164                 rerr = err
165                 return
166         }
167         if _, err = src.Seek(0, io.SeekStart); err != nil {
168                 rerr = err
169                 return
170         }
171         r, w := io.Pipe()
172         go func() {
173                 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
174                         w.CloseWithError(err) // #nosec G104
175                 }
176         }()
177         reader = r
178         closer = src
179         return
180 }
181
182 func prepareTxFile(srcPath string) (
183         reader io.Reader,
184         closer io.Closer,
185         fileSize int64,
186         archived bool,
187         rerr error,
188 ) {
189         if srcPath == "-" {
190                 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
191                 return
192         }
193
194         srcStat, err := os.Stat(srcPath)
195         if err != nil {
196                 rerr = err
197                 return
198         }
199         mode := srcStat.Mode()
200
201         if mode.IsRegular() {
202                 // It is regular file, just send it
203                 src, err := os.Open(srcPath)
204                 if err != nil {
205                         rerr = err
206                         return
207                 }
208                 fileSize = srcStat.Size()
209                 reader = bufio.NewReader(src)
210                 closer = src
211                 return
212         }
213
214         if !mode.IsDir() {
215                 rerr = errors.New("unsupported file type")
216                 return
217         }
218
219         // It is directory, create PAX archive with its contents
220         archived = true
221         basePath := filepath.Base(srcPath)
222         rootPath, err := filepath.Abs(srcPath)
223         if err != nil {
224                 rerr = err
225                 return
226         }
227         type einfo struct {
228                 path    string
229                 modTime time.Time
230                 size    int64
231         }
232         dirs := make([]einfo, 0, 1<<10)
233         files := make([]einfo, 0, 1<<10)
234         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
235                 if err != nil {
236                         return err
237                 }
238                 if info.IsDir() {
239                         // directory header, PAX record header+contents
240                         fileSize += TarBlockSize + 2*TarBlockSize
241                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
242                 } else {
243                         // file header, PAX record header+contents, file content
244                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
245                         if n := info.Size() % TarBlockSize; n != 0 {
246                                 fileSize += TarBlockSize - n // padding
247                         }
248                         files = append(files, einfo{
249                                 path:    path,
250                                 modTime: info.ModTime(),
251                                 size:    info.Size(),
252                         })
253                 }
254                 return nil
255         })
256         if rerr != nil {
257                 return
258         }
259
260         r, w := io.Pipe()
261         reader = r
262         closer = DummyCloser{}
263         fileSize += 2 * TarBlockSize // termination block
264
265         go func() error {
266                 tarWr := tar.NewWriter(w)
267                 hdr := tar.Header{
268                         Typeflag: tar.TypeDir,
269                         Mode:     0777,
270                         PAXRecords: map[string]string{
271                                 "comment": "Autogenerated by " + VersionGet(),
272                         },
273                         Format: tar.FormatPAX,
274                 }
275                 for _, e := range dirs {
276                         hdr.Name = basePath + e.path[len(rootPath):]
277                         hdr.ModTime = e.modTime
278                         if err = tarWr.WriteHeader(&hdr); err != nil {
279                                 return w.CloseWithError(err)
280                         }
281                 }
282                 hdr.Typeflag = tar.TypeReg
283                 hdr.Mode = 0666
284                 for _, e := range files {
285                         hdr.Name = basePath + e.path[len(rootPath):]
286                         hdr.ModTime = e.modTime
287                         hdr.Size = e.size
288                         if err = tarWr.WriteHeader(&hdr); err != nil {
289                                 return w.CloseWithError(err)
290                         }
291                         fd, err := os.Open(e.path)
292                         if err != nil {
293                                 fd.Close() // #nosec G104
294                                 return w.CloseWithError(err)
295                         }
296                         if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
297                                 fd.Close() // #nosec G104
298                                 return w.CloseWithError(err)
299                         }
300                         fd.Close() // #nosec G104
301                 }
302                 if err = tarWr.Close(); err != nil {
303                         return w.CloseWithError(err)
304                 }
305                 return w.Close()
306         }()
307         return
308 }
309
310 func (ctx *Ctx) TxFile(
311         node *Node,
312         nice uint8,
313         srcPath, dstPath string,
314         chunkSize int64,
315         minSize, maxSize int64,
316 ) error {
317         dstPathSpecified := false
318         if dstPath == "" {
319                 if srcPath == "-" {
320                         return errors.New("Must provide destination filename")
321                 }
322                 dstPath = filepath.Base(srcPath)
323         } else {
324                 dstPathSpecified = true
325         }
326         dstPath = filepath.Clean(dstPath)
327         if filepath.IsAbs(dstPath) {
328                 return errors.New("Relative destination path required")
329         }
330         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
331         if closer != nil {
332                 defer closer.Close()
333         }
334         if err != nil {
335                 return err
336         }
337         if fileSize > maxSize {
338                 return errors.New("Too big than allowed")
339         }
340         if archived && !dstPathSpecified {
341                 dstPath += TarExt
342         }
343
344         if fileSize <= chunkSize {
345                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
346                 if err != nil {
347                         return err
348                 }
349                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
350                 les := LEs{
351                         {"Type", "file"},
352                         {"Node", node.Id},
353                         {"Nice", int(nice)},
354                         {"Src", srcPath},
355                         {"Dst", dstPath},
356                         {"Size", fileSize},
357                 }
358                 if err == nil {
359                         ctx.LogI("tx", les, "sent")
360                 } else {
361                         ctx.LogE("tx", les, err, "sent")
362                 }
363                 return err
364         }
365
366         leftSize := fileSize
367         metaPkt := ChunkedMeta{
368                 Magic:     MagicNNCPMv1,
369                 FileSize:  uint64(fileSize),
370                 ChunkSize: uint64(chunkSize),
371                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
372         }
373         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
374                 hsh := new([32]byte)
375                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
376         }
377         var sizeToSend int64
378         var hsh hash.Hash
379         var pkt *Pkt
380         var chunkNum int
381         var path string
382         for {
383                 if leftSize <= chunkSize {
384                         sizeToSend = leftSize
385                 } else {
386                         sizeToSend = chunkSize
387                 }
388                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
389                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
390                 if err != nil {
391                         return err
392                 }
393                 hsh, err = blake2b.New256(nil)
394                 if err != nil {
395                         return err
396                 }
397                 _, err = ctx.Tx(
398                         node,
399                         pkt,
400                         nice,
401                         sizeToSend,
402                         minSize,
403                         io.TeeReader(reader, hsh),
404                         path,
405                 )
406                 les := LEs{
407                         {"Type", "file"},
408                         {"Node", node.Id},
409                         {"Nice", int(nice)},
410                         {"Src", srcPath},
411                         {"Dst", path},
412                         {"Size", sizeToSend},
413                 }
414                 if err == nil {
415                         ctx.LogI("tx", les, "sent")
416                 } else {
417                         ctx.LogE("tx", les, err, "sent")
418                         return err
419                 }
420                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
421                 leftSize -= sizeToSend
422                 chunkNum++
423                 if leftSize == 0 {
424                         break
425                 }
426         }
427         var metaBuf bytes.Buffer
428         _, err = xdr.Marshal(&metaBuf, metaPkt)
429         if err != nil {
430                 return err
431         }
432         path = dstPath + ChunkedSuffixMeta
433         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
434         if err != nil {
435                 return err
436         }
437         metaPktSize := int64(metaBuf.Len())
438         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
439         les := LEs{
440                 {"Type", "file"},
441                 {"Node", node.Id},
442                 {"Nice", int(nice)},
443                 {"Src", srcPath},
444                 {"Dst", path},
445                 {"Size", metaPktSize},
446         }
447         if err == nil {
448                 ctx.LogI("tx", les, "sent")
449         } else {
450                 ctx.LogE("tx", les, err, "sent")
451         }
452         return err
453 }
454
455 func (ctx *Ctx) TxFreq(
456         node *Node,
457         nice, replyNice uint8,
458         srcPath, dstPath string,
459         minSize int64) error {
460         dstPath = filepath.Clean(dstPath)
461         if filepath.IsAbs(dstPath) {
462                 return errors.New("Relative destination path required")
463         }
464         srcPath = filepath.Clean(srcPath)
465         if filepath.IsAbs(srcPath) {
466                 return errors.New("Relative source path required")
467         }
468         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
469         if err != nil {
470                 return err
471         }
472         src := strings.NewReader(dstPath)
473         size := int64(src.Len())
474         _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
475         les := LEs{
476                 {"Type", "freq"},
477                 {"Node", node.Id},
478                 {"Nice", int(nice)},
479                 {"ReplyNice", int(replyNice)},
480                 {"Src", srcPath},
481                 {"Dst", dstPath},
482         }
483         if err == nil {
484                 ctx.LogI("tx", les, "sent")
485         } else {
486                 ctx.LogE("tx", les, err, "sent")
487         }
488         return err
489 }
490
491 func (ctx *Ctx) TxExec(
492         node *Node,
493         nice, replyNice uint8,
494         handle string,
495         args []string,
496         in io.Reader,
497         minSize int64,
498         useTmp bool,
499         noCompress bool,
500 ) error {
501         path := make([][]byte, 0, 1+len(args))
502         path = append(path, []byte(handle))
503         for _, arg := range args {
504                 path = append(path, []byte(arg))
505         }
506         pktType := PktTypeExec
507         if noCompress {
508                 pktType = PktTypeExecFat
509         }
510         pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
511         if err != nil {
512                 return err
513         }
514         var size int64
515
516         if !noCompress && !useTmp {
517                 var compressed bytes.Buffer
518                 compressor, err := zstd.NewWriter(
519                         &compressed,
520                         zstd.WithEncoderLevel(zstd.SpeedDefault),
521                 )
522                 if err != nil {
523                         return err
524                 }
525                 if _, err = io.Copy(compressor, in); err != nil {
526                         compressor.Close() // #nosec G104
527                         return err
528                 }
529                 if err = compressor.Close(); err != nil {
530                         return err
531                 }
532                 size = int64(compressed.Len())
533                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
534         }
535         if noCompress && !useTmp {
536                 var data bytes.Buffer
537                 if _, err = io.Copy(&data, in); err != nil {
538                         return err
539                 }
540                 size = int64(data.Len())
541                 _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
542         }
543         if !noCompress && useTmp {
544                 r, w := io.Pipe()
545                 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
546                 if err != nil {
547                         return err
548                 }
549                 copyErr := make(chan error)
550                 go func() {
551                         _, err := io.Copy(compressor, in)
552                         if err != nil {
553                                 compressor.Close() // #nosec G104
554                                 copyErr <- err
555                         }
556                         err = compressor.Close()
557                         w.Close()
558                         copyErr <- err
559                 }()
560                 tmpReader, closer, fileSize, err := throughTmpFile(r)
561                 if closer != nil {
562                         defer closer.Close()
563                 }
564                 if err != nil {
565                         return err
566                 }
567                 err = <-copyErr
568                 if err != nil {
569                         return err
570                 }
571                 size = fileSize
572                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
573         }
574         if noCompress && useTmp {
575                 tmpReader, closer, fileSize, err := throughTmpFile(in)
576                 if closer != nil {
577                         defer closer.Close()
578                 }
579                 if err != nil {
580                         return err
581                 }
582                 size = fileSize
583                 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
584         }
585
586         les := LEs{
587                 {"Type", "exec"},
588                 {"Node", node.Id},
589                 {"Nice", int(nice)},
590                 {"ReplyNice", int(replyNice)},
591                 {"Dst", strings.Join(append([]string{handle}, args...), " ")},
592                 {"Size", size},
593         }
594         if err == nil {
595                 ctx.LogI("tx", les, "sent")
596         } else {
597                 ctx.LogE("tx", les, err, "sent")
598         }
599         return err
600 }
601
602 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
603         les := LEs{
604                 {"Type", "trns"},
605                 {"Node", node.Id},
606                 {"Nice", int(nice)},
607                 {"Size", size},
608         }
609         ctx.LogD("tx", les, "taken")
610         if !ctx.IsEnoughSpace(size) {
611                 err := errors.New("is not enough space")
612                 ctx.LogE("tx", les, err, err.Error())
613                 return err
614         }
615         tmp, err := ctx.NewTmpFileWHash()
616         if err != nil {
617                 return err
618         }
619         if _, err = CopyProgressed(
620                 tmp.W, src, "Tx trns",
621                 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
622                 ctx.ShowPrgrs,
623         ); err != nil {
624                 return err
625         }
626         nodePath := filepath.Join(ctx.Spool, node.Id.String())
627         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
628         if err == nil {
629                 ctx.LogI("tx", les, "sent")
630         } else {
631                 ctx.LogI("tx", append(les, LE{"Err", err}), "sent")
632         }
633         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
634         return err
635 }