]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
More errors checking
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "hash"
27         "io"
28         "io/ioutil"
29         "os"
30         "path/filepath"
31         "strconv"
32         "strings"
33         "time"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/klauspost/compress/zstd"
37         "golang.org/x/crypto/blake2b"
38         "golang.org/x/crypto/chacha20poly1305"
39 )
40
41 const (
42         MaxFileSize = 1 << 62
43
44         TarBlockSize = 512
45         TarExt       = ".tar"
46 )
47
48 func (ctx *Ctx) Tx(
49         node *Node,
50         pkt *Pkt,
51         nice uint8,
52         size, minSize int64,
53         src io.Reader,
54         pktName string,
55 ) (*Node, error) {
56         hops := make([]*Node, 0, 1+len(node.Via))
57         hops = append(hops, node)
58         lastNode := node
59         for i := len(node.Via); i > 0; i-- {
60                 lastNode = ctx.Neigh[*node.Via[i-1]]
61                 hops = append(hops, lastNode)
62         }
63         expectedSize := size
64         for i := 0; i < len(hops); i++ {
65                 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
66         }
67         padSize := minSize - expectedSize
68         if padSize < 0 {
69                 padSize = 0
70         }
71         if !ctx.IsEnoughSpace(size + padSize) {
72                 return nil, errors.New("is not enough space")
73         }
74         tmp, err := ctx.NewTmpFileWHash()
75         if err != nil {
76                 return nil, err
77         }
78
79         errs := make(chan error)
80         curSize := size
81         pipeR, pipeW := io.Pipe()
82         go func(size int64, src io.Reader, dst io.WriteCloser) {
83                 ctx.LogD("tx", SDS{
84                         "node": hops[0].Id,
85                         "nice": int(nice),
86                         "size": size,
87                 }, "wrote")
88                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
89                 dst.Close() // #nosec G104
90         }(curSize, src, pipeW)
91         curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
92
93         var pipeRPrev io.Reader
94         for i := 1; i < len(hops); i++ {
95                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
96                 pipeRPrev = pipeR
97                 pipeR, pipeW = io.Pipe()
98                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
99                         ctx.LogD("tx", SDS{
100                                 "node": node.Id,
101                                 "nice": int(nice),
102                                 "size": size,
103                         }, "trns wrote")
104                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
105                         dst.Close() // #nosec G104
106                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
107                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
108         }
109         go func() {
110                 _, err := CopyProgressed(
111                         tmp.W, pipeR, "Tx",
112                         SDS{"pkt": pktName, "fullsize": curSize},
113                         ctx.ShowPrgrs,
114                 )
115                 errs <- err
116         }()
117         for i := 0; i <= len(hops); i++ {
118                 err = <-errs
119                 if err != nil {
120                         tmp.Fd.Close() // #nosec G104
121                         return nil, err
122                 }
123         }
124         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
125         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
126         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
127         return lastNode, err
128 }
129
130 type DummyCloser struct{}
131
132 func (dc DummyCloser) Close() error { return nil }
133
134 func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) {
135         if srcPath == "-" {
136                 // Read content from stdin, saving to temporary file, encrypting
137                 // on the fly
138                 src, err := ioutil.TempFile("", "nncp-file")
139                 if err != nil {
140                         rerr = err
141                         return
142                 }
143                 os.Remove(src.Name()) // #nosec G104
144                 tmpW := bufio.NewWriter(src)
145                 tmpKey := make([]byte, chacha20poly1305.KeySize)
146                 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
147                         return
148                 }
149                 aead, err := chacha20poly1305.New(tmpKey)
150                 if err != nil {
151                         rerr = err
152                         return
153                 }
154                 nonce := make([]byte, aead.NonceSize())
155                 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
156                 if err != nil {
157                         rerr = err
158                         return
159                 }
160                 fileSize = int64(written)
161                 if err = tmpW.Flush(); err != nil {
162                         rerr = err
163                         return
164                 }
165                 if _, err = src.Seek(0, io.SeekStart); err != nil {
166                         rerr = err
167                         return
168                 }
169                 r, w := io.Pipe()
170                 go func() {
171                         if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
172                                 w.CloseWithError(err) // #nosec G104
173                         }
174                 }()
175                 reader = r
176                 closer = src
177                 return
178         }
179
180         srcStat, err := os.Stat(srcPath)
181         if err != nil {
182                 rerr = err
183                 return
184         }
185         mode := srcStat.Mode()
186
187         if mode.IsRegular() {
188                 // It is regular file, just send it
189                 src, err := os.Open(srcPath)
190                 if err != nil {
191                         rerr = err
192                         return
193                 }
194                 fileSize = srcStat.Size()
195                 reader = bufio.NewReader(src)
196                 closer = src
197                 return
198         }
199
200         if !mode.IsDir() {
201                 rerr = errors.New("unsupported file type")
202                 return
203         }
204
205         // It is directory, create PAX archive with its contents
206         archived = true
207         basePath := filepath.Base(srcPath)
208         rootPath, err := filepath.Abs(srcPath)
209         if err != nil {
210                 rerr = err
211                 return
212         }
213         type einfo struct {
214                 path    string
215                 modTime time.Time
216                 size    int64
217         }
218         dirs := make([]einfo, 0, 1<<10)
219         files := make([]einfo, 0, 1<<10)
220         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
221                 if err != nil {
222                         return err
223                 }
224                 if info.IsDir() {
225                         // directory header, PAX record header+contents
226                         fileSize += TarBlockSize + 2*TarBlockSize
227                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
228                 } else {
229                         // file header, PAX record header+contents, file content
230                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
231                         if n := info.Size() % TarBlockSize; n != 0 {
232                                 fileSize += TarBlockSize - n // padding
233                         }
234                         files = append(files, einfo{
235                                 path:    path,
236                                 modTime: info.ModTime(),
237                                 size:    info.Size(),
238                         })
239                 }
240                 return nil
241         })
242         if rerr != nil {
243                 return
244         }
245
246         r, w := io.Pipe()
247         reader = r
248         closer = DummyCloser{}
249         fileSize += 2 * TarBlockSize // termination block
250
251         go func() error {
252                 tarWr := tar.NewWriter(w)
253                 hdr := tar.Header{
254                         Typeflag: tar.TypeDir,
255                         Mode:     0777,
256                         PAXRecords: map[string]string{
257                                 "comment": "Autogenerated by " + VersionGet(),
258                         },
259                         Format: tar.FormatPAX,
260                 }
261                 for _, e := range dirs {
262                         hdr.Name = basePath + e.path[len(rootPath):]
263                         hdr.ModTime = e.modTime
264                         if err = tarWr.WriteHeader(&hdr); err != nil {
265                                 return w.CloseWithError(err)
266                         }
267                 }
268                 hdr.Typeflag = tar.TypeReg
269                 hdr.Mode = 0666
270                 for _, e := range files {
271                         hdr.Name = basePath + e.path[len(rootPath):]
272                         hdr.ModTime = e.modTime
273                         hdr.Size = e.size
274                         if err = tarWr.WriteHeader(&hdr); err != nil {
275                                 return w.CloseWithError(err)
276                         }
277                         fd, err := os.Open(e.path)
278                         if err != nil {
279                                 fd.Close() // #nosec G104
280                                 return w.CloseWithError(err)
281                         }
282                         if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
283                                 fd.Close() // #nosec G104
284                                 return w.CloseWithError(err)
285                         }
286                         fd.Close() // #nosec G104
287                 }
288                 if err = tarWr.Close(); err != nil {
289                         return w.CloseWithError(err)
290                 }
291                 return w.Close()
292         }()
293         return
294 }
295
296 func (ctx *Ctx) TxFile(
297         node *Node,
298         nice uint8,
299         srcPath, dstPath string,
300         chunkSize int64,
301         minSize, maxSize int64,
302 ) error {
303         dstPathSpecified := false
304         if dstPath == "" {
305                 if srcPath == "-" {
306                         return errors.New("Must provide destination filename")
307                 }
308                 dstPath = filepath.Base(srcPath)
309         } else {
310                 dstPathSpecified = true
311         }
312         dstPath = filepath.Clean(dstPath)
313         if filepath.IsAbs(dstPath) {
314                 return errors.New("Relative destination path required")
315         }
316         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
317         if closer != nil {
318                 defer closer.Close()
319         }
320         if err != nil {
321                 return err
322         }
323         if fileSize > maxSize {
324                 return errors.New("Too big than allowed")
325         }
326         if archived && !dstPathSpecified {
327                 dstPath += TarExt
328         }
329
330         if fileSize <= chunkSize {
331                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
332                 if err != nil {
333                         return err
334                 }
335                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
336                 sds := SDS{
337                         "type": "file",
338                         "node": node.Id,
339                         "nice": int(nice),
340                         "src":  srcPath,
341                         "dst":  dstPath,
342                         "size": fileSize,
343                 }
344                 if err == nil {
345                         ctx.LogI("tx", sds, "sent")
346                 } else {
347                         ctx.LogE("tx", sds, err, "sent")
348                 }
349                 return err
350         }
351
352         leftSize := fileSize
353         metaPkt := ChunkedMeta{
354                 Magic:     MagicNNCPMv1,
355                 FileSize:  uint64(fileSize),
356                 ChunkSize: uint64(chunkSize),
357                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
358         }
359         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
360                 hsh := new([32]byte)
361                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
362         }
363         var sizeToSend int64
364         var hsh hash.Hash
365         var pkt *Pkt
366         var chunkNum int
367         var path string
368         for {
369                 if leftSize <= chunkSize {
370                         sizeToSend = leftSize
371                 } else {
372                         sizeToSend = chunkSize
373                 }
374                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
375                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
376                 if err != nil {
377                         return err
378                 }
379                 hsh, err = blake2b.New256(nil)
380                 if err != nil {
381                         return err
382                 }
383                 _, err = ctx.Tx(
384                         node,
385                         pkt,
386                         nice,
387                         sizeToSend,
388                         minSize,
389                         io.TeeReader(reader, hsh),
390                         path,
391                 )
392                 sds := SDS{
393                         "type": "file",
394                         "node": node.Id,
395                         "nice": int(nice),
396                         "src":  srcPath,
397                         "dst":  path,
398                         "size": sizeToSend,
399                 }
400                 if err == nil {
401                         ctx.LogI("tx", sds, "sent")
402                 } else {
403                         ctx.LogE("tx", sds, err, "sent")
404                         return err
405                 }
406                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
407                 leftSize -= sizeToSend
408                 chunkNum++
409                 if leftSize == 0 {
410                         break
411                 }
412         }
413         var metaBuf bytes.Buffer
414         _, err = xdr.Marshal(&metaBuf, metaPkt)
415         if err != nil {
416                 return err
417         }
418         path = dstPath + ChunkedSuffixMeta
419         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
420         if err != nil {
421                 return err
422         }
423         metaPktSize := int64(metaBuf.Len())
424         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
425         sds := SDS{
426                 "type": "file",
427                 "node": node.Id,
428                 "nice": int(nice),
429                 "src":  srcPath,
430                 "dst":  path,
431                 "size": metaPktSize,
432         }
433         if err == nil {
434                 ctx.LogI("tx", sds, "sent")
435         } else {
436                 ctx.LogE("tx", sds, err, "sent")
437         }
438         return err
439 }
440
441 func (ctx *Ctx) TxFreq(
442         node *Node,
443         nice, replyNice uint8,
444         srcPath, dstPath string,
445         minSize int64) error {
446         dstPath = filepath.Clean(dstPath)
447         if filepath.IsAbs(dstPath) {
448                 return errors.New("Relative destination path required")
449         }
450         srcPath = filepath.Clean(srcPath)
451         if filepath.IsAbs(srcPath) {
452                 return errors.New("Relative source path required")
453         }
454         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
455         if err != nil {
456                 return err
457         }
458         src := strings.NewReader(dstPath)
459         size := int64(src.Len())
460         _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
461         sds := SDS{
462                 "type":      "freq",
463                 "node":      node.Id,
464                 "nice":      int(nice),
465                 "replynice": int(replyNice),
466                 "src":       srcPath,
467                 "dst":       dstPath,
468         }
469         if err == nil {
470                 ctx.LogI("tx", sds, "sent")
471         } else {
472                 ctx.LogE("tx", sds, err, "sent")
473         }
474         return err
475 }
476
477 func (ctx *Ctx) TxExec(
478         node *Node,
479         nice, replyNice uint8,
480         handle string,
481         args []string,
482         in io.Reader,
483         minSize int64,
484 ) error {
485         path := make([][]byte, 0, 1+len(args))
486         path = append(path, []byte(handle))
487         for _, arg := range args {
488                 path = append(path, []byte(arg))
489         }
490         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
491         if err != nil {
492                 return err
493         }
494         var compressed bytes.Buffer
495         compressor, err := zstd.NewWriter(
496                 &compressed,
497                 zstd.WithEncoderLevel(zstd.SpeedDefault),
498         )
499         if err != nil {
500                 return err
501         }
502         if _, err = io.Copy(compressor, in); err != nil {
503                 compressor.Close() // #nosec G104
504                 return err
505         }
506         if err = compressor.Close(); err != nil {
507                 return err
508         }
509         size := int64(compressed.Len())
510         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
511         sds := SDS{
512                 "type":      "exec",
513                 "node":      node.Id,
514                 "nice":      int(nice),
515                 "replynice": int(replyNice),
516                 "dst":       strings.Join(append([]string{handle}, args...), " "),
517                 "size":      size,
518         }
519         if err == nil {
520                 ctx.LogI("tx", sds, "sent")
521         } else {
522                 ctx.LogE("tx", sds, err, "sent")
523         }
524         return err
525 }
526
527 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
528         sds := SDS{
529                 "type": "trns",
530                 "node": node.Id,
531                 "nice": int(nice),
532                 "size": size,
533         }
534         ctx.LogD("tx", sds, "taken")
535         if !ctx.IsEnoughSpace(size) {
536                 err := errors.New("is not enough space")
537                 ctx.LogE("tx", sds, err, err.Error())
538                 return err
539         }
540         tmp, err := ctx.NewTmpFileWHash()
541         if err != nil {
542                 return err
543         }
544         if _, err = CopyProgressed(
545                 tmp.W, src, "Tx trns",
546                 SDS{"pkt": node.Id.String(), "fullsize": size},
547                 ctx.ShowPrgrs,
548         ); err != nil {
549                 return err
550         }
551         nodePath := filepath.Join(ctx.Spool, node.Id.String())
552         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
553         if err == nil {
554                 ctx.LogI("tx", sds, "sent")
555         } else {
556                 ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent")
557         }
558         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
559         return err
560 }