]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Check disk space when sending
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "hash"
27         "io"
28         "io/ioutil"
29         "os"
30         "path/filepath"
31         "strconv"
32         "strings"
33         "time"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/klauspost/compress/zstd"
37         "golang.org/x/crypto/blake2b"
38         "golang.org/x/crypto/chacha20poly1305"
39 )
40
41 const (
42         TarBlockSize = 512
43         TarExt       = ".tar"
44 )
45
46 func (ctx *Ctx) Tx(
47         node *Node,
48         pkt *Pkt,
49         nice uint8,
50         size, minSize int64,
51         src io.Reader,
52 ) (*Node, error) {
53         hops := make([]*Node, 0, 1+len(node.Via))
54         hops = append(hops, node)
55         lastNode := node
56         for i := len(node.Via); i > 0; i-- {
57                 lastNode = ctx.Neigh[*node.Via[i-1]]
58                 hops = append(hops, lastNode)
59         }
60         expectedSize := size
61         for i := 0; i < len(hops); i++ {
62                 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
63         }
64         padSize := minSize - expectedSize
65         if padSize < 0 {
66                 padSize = 0
67         }
68         if !ctx.IsEnoughSpace(size + padSize) {
69                 return nil, errors.New("is not enough space")
70         }
71         tmp, err := ctx.NewTmpFileWHash()
72         if err != nil {
73                 return nil, err
74         }
75
76         errs := make(chan error)
77         curSize := size
78         pipeR, pipeW := io.Pipe()
79         go func(size int64, src io.Reader, dst io.WriteCloser) {
80                 ctx.LogD("tx", SDS{
81                         "node": hops[0].Id,
82                         "nice": strconv.Itoa(int(nice)),
83                         "size": strconv.FormatInt(size, 10),
84                 }, "wrote")
85                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
86                 dst.Close()
87         }(curSize, src, pipeW)
88         curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
89
90         var pipeRPrev io.Reader
91         for i := 1; i < len(hops); i++ {
92                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
93                 pipeRPrev = pipeR
94                 pipeR, pipeW = io.Pipe()
95                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
96                         ctx.LogD("tx", SDS{
97                                 "node": node.Id,
98                                 "nice": strconv.Itoa(int(nice)),
99                                 "size": strconv.FormatInt(size, 10),
100                         }, "trns wrote")
101                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
102                         dst.Close()
103                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
104                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
105         }
106         go func() {
107                 _, err := io.Copy(tmp.W, pipeR)
108                 errs <- err
109         }()
110         for i := 0; i <= len(hops); i++ {
111                 err = <-errs
112                 if err != nil {
113                         tmp.Fd.Close()
114                         return nil, err
115                 }
116         }
117         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
118         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
119         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
120         return lastNode, err
121 }
122
123 type DummyCloser struct{}
124
125 func (dc DummyCloser) Close() error { return nil }
126
127 func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) {
128         if srcPath == "-" {
129                 // Read content from stdin, saving to temporary file, encrypting
130                 // on the fly
131                 src, err := ioutil.TempFile("", "nncp-file")
132                 if err != nil {
133                         rerr = err
134                         return
135                 }
136                 os.Remove(src.Name())
137                 tmpW := bufio.NewWriter(src)
138                 tmpKey := make([]byte, chacha20poly1305.KeySize)
139                 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
140                         return
141                 }
142                 aead, err := chacha20poly1305.New(tmpKey)
143                 if err != nil {
144                         rerr = err
145                         return
146                 }
147                 nonce := make([]byte, aead.NonceSize())
148                 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
149                 if err != nil {
150                         rerr = err
151                         return
152                 }
153                 fileSize = int64(written)
154                 if err = tmpW.Flush(); err != nil {
155                         return
156                 }
157                 src.Seek(0, io.SeekStart)
158                 r, w := io.Pipe()
159                 go func() {
160                         if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
161                                 w.CloseWithError(err)
162                         }
163                 }()
164                 reader = r
165                 closer = src
166                 return
167         }
168
169         srcStat, err := os.Stat(srcPath)
170         if err != nil {
171                 rerr = err
172                 return
173         }
174         mode := srcStat.Mode()
175
176         if mode.IsRegular() {
177                 // It is regular file, just send it
178                 src, err := os.Open(srcPath)
179                 if err != nil {
180                         rerr = err
181                         return
182                 }
183                 fileSize = srcStat.Size()
184                 reader = bufio.NewReader(src)
185                 closer = src
186                 return
187         }
188
189         if !mode.IsDir() {
190                 rerr = errors.New("unsupported file type")
191                 return
192         }
193
194         // It is directory, create PAX archive with its contents
195         archived = true
196         basePath := filepath.Base(srcPath)
197         rootPath, err := filepath.Abs(srcPath)
198         if err != nil {
199                 rerr = err
200                 return
201         }
202         type einfo struct {
203                 path    string
204                 modTime time.Time
205                 size    int64
206         }
207         dirs := make([]einfo, 0, 1<<10)
208         files := make([]einfo, 0, 1<<10)
209         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
210                 if err != nil {
211                         return err
212                 }
213                 if info.IsDir() {
214                         // directory header, PAX record header+contents
215                         fileSize += TarBlockSize + 2*TarBlockSize
216                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
217                 } else {
218                         // file header, PAX record header+contents, file content
219                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
220                         if n := info.Size() % TarBlockSize; n != 0 {
221                                 fileSize += TarBlockSize - n // padding
222                         }
223                         files = append(files, einfo{
224                                 path:    path,
225                                 modTime: info.ModTime(),
226                                 size:    info.Size(),
227                         })
228                 }
229                 return nil
230         })
231         if rerr != nil {
232                 return
233         }
234
235         r, w := io.Pipe()
236         reader = r
237         closer = DummyCloser{}
238         fileSize += 2 * TarBlockSize // termination block
239
240         go func() {
241                 tarWr := tar.NewWriter(w)
242                 hdr := tar.Header{
243                         Typeflag: tar.TypeDir,
244                         Mode:     0777,
245                         PAXRecords: map[string]string{
246                                 "comment": "Autogenerated by " + VersionGet(),
247                         },
248                         Format: tar.FormatPAX,
249                 }
250                 for _, e := range dirs {
251                         hdr.Name = basePath + e.path[len(rootPath):]
252                         hdr.ModTime = e.modTime
253                         if err = tarWr.WriteHeader(&hdr); err != nil {
254                                 w.CloseWithError(err)
255                         }
256                 }
257                 hdr.Typeflag = tar.TypeReg
258                 hdr.Mode = 0666
259                 for _, e := range files {
260                         hdr.Name = basePath + e.path[len(rootPath):]
261                         hdr.ModTime = e.modTime
262                         hdr.Size = e.size
263                         if err = tarWr.WriteHeader(&hdr); err != nil {
264                                 w.CloseWithError(err)
265                         }
266                         fd, err := os.Open(e.path)
267                         if err != nil {
268                                 w.CloseWithError(err)
269                         }
270                         _, err = io.Copy(tarWr, bufio.NewReader(fd))
271                         if err != nil {
272                                 w.CloseWithError(err)
273                         }
274                         fd.Close()
275                 }
276                 tarWr.Close()
277                 w.Close()
278         }()
279         return
280 }
281
282 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
283         dstPathSpecified := false
284         if dstPath == "" {
285                 if srcPath == "-" {
286                         return errors.New("Must provide destination filename")
287                 }
288                 dstPath = filepath.Base(srcPath)
289         } else {
290                 dstPathSpecified = true
291         }
292         dstPath = filepath.Clean(dstPath)
293         if filepath.IsAbs(dstPath) {
294                 return errors.New("Relative destination path required")
295         }
296         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
297         if closer != nil {
298                 defer closer.Close()
299         }
300         if err != nil {
301                 return err
302         }
303         if archived && !dstPathSpecified {
304                 dstPath += TarExt
305         }
306         pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
307         if err != nil {
308                 return err
309         }
310         _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
311         sds := SDS{
312                 "type": "file",
313                 "node": node.Id,
314                 "nice": strconv.Itoa(int(nice)),
315                 "src":  srcPath,
316                 "dst":  dstPath,
317                 "size": strconv.FormatInt(fileSize, 10),
318         }
319         if err == nil {
320                 ctx.LogI("tx", sds, "sent")
321         } else {
322                 sds["err"] = err
323                 ctx.LogE("tx", sds, "sent")
324         }
325         return err
326 }
327
328 func (ctx *Ctx) TxFileChunked(
329         node *Node,
330         nice uint8,
331         srcPath, dstPath string,
332         minSize int64,
333         chunkSize int64,
334 ) error {
335         dstPathSpecified := false
336         if dstPath == "" {
337                 if srcPath == "-" {
338                         return errors.New("Must provide destination filename")
339                 }
340                 dstPath = filepath.Base(srcPath)
341         } else {
342                 dstPathSpecified = true
343         }
344         dstPath = filepath.Clean(dstPath)
345         if filepath.IsAbs(dstPath) {
346                 return errors.New("Relative destination path required")
347         }
348         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
349         if closer != nil {
350                 defer closer.Close()
351         }
352         if err != nil {
353                 return err
354         }
355         if archived && !dstPathSpecified {
356                 dstPath += TarExt
357         }
358
359         if fileSize <= chunkSize {
360                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
361                 if err != nil {
362                         return err
363                 }
364                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
365                 sds := SDS{
366                         "type": "file",
367                         "node": node.Id,
368                         "nice": strconv.Itoa(int(nice)),
369                         "src":  srcPath,
370                         "dst":  dstPath,
371                         "size": strconv.FormatInt(fileSize, 10),
372                 }
373                 if err == nil {
374                         ctx.LogI("tx", sds, "sent")
375                 } else {
376                         sds["err"] = err
377                         ctx.LogE("tx", sds, "sent")
378                 }
379                 return err
380         }
381
382         leftSize := fileSize
383         metaPkt := ChunkedMeta{
384                 Magic:     MagicNNCPMv1,
385                 FileSize:  uint64(fileSize),
386                 ChunkSize: uint64(chunkSize),
387                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
388         }
389         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
390                 hsh := new([32]byte)
391                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
392         }
393         var sizeToSend int64
394         var hsh hash.Hash
395         var pkt *Pkt
396         var chunkNum int
397         var path string
398         for {
399                 if leftSize <= chunkSize {
400                         sizeToSend = leftSize
401                 } else {
402                         sizeToSend = chunkSize
403                 }
404                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
405                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
406                 if err != nil {
407                         return err
408                 }
409                 hsh, err = blake2b.New256(nil)
410                 if err != nil {
411                         return err
412                 }
413                 _, err = ctx.Tx(
414                         node,
415                         pkt,
416                         nice,
417                         sizeToSend,
418                         minSize,
419                         io.TeeReader(reader, hsh),
420                 )
421                 sds := SDS{
422                         "type": "file",
423                         "node": node.Id,
424                         "nice": strconv.Itoa(int(nice)),
425                         "src":  srcPath,
426                         "dst":  path,
427                         "size": strconv.FormatInt(sizeToSend, 10),
428                 }
429                 if err == nil {
430                         ctx.LogI("tx", sds, "sent")
431                 } else {
432                         sds["err"] = err
433                         ctx.LogE("tx", sds, "sent")
434                         return err
435                 }
436                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
437                 leftSize -= sizeToSend
438                 chunkNum++
439                 if leftSize == 0 {
440                         break
441                 }
442         }
443         var metaBuf bytes.Buffer
444         _, err = xdr.Marshal(&metaBuf, metaPkt)
445         if err != nil {
446                 return err
447         }
448         path = dstPath + ChunkedSuffixMeta
449         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
450         if err != nil {
451                 return err
452         }
453         metaPktSize := int64(metaBuf.Len())
454         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
455         sds := SDS{
456                 "type": "file",
457                 "node": node.Id,
458                 "nice": strconv.Itoa(int(nice)),
459                 "src":  srcPath,
460                 "dst":  path,
461                 "size": strconv.FormatInt(metaPktSize, 10),
462         }
463         if err == nil {
464                 ctx.LogI("tx", sds, "sent")
465         } else {
466                 sds["err"] = err
467                 ctx.LogE("tx", sds, "sent")
468         }
469         return err
470 }
471
472 func (ctx *Ctx) TxFreq(
473         node *Node,
474         nice, replyNice uint8,
475         srcPath, dstPath string,
476         minSize int64) error {
477         dstPath = filepath.Clean(dstPath)
478         if filepath.IsAbs(dstPath) {
479                 return errors.New("Relative destination path required")
480         }
481         srcPath = filepath.Clean(srcPath)
482         if filepath.IsAbs(srcPath) {
483                 return errors.New("Relative source path required")
484         }
485         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
486         if err != nil {
487                 return err
488         }
489         src := strings.NewReader(dstPath)
490         size := int64(src.Len())
491         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
492         sds := SDS{
493                 "type":      "freq",
494                 "node":      node.Id,
495                 "nice":      strconv.Itoa(int(nice)),
496                 "replynice": strconv.Itoa(int(replyNice)),
497                 "src":       srcPath,
498                 "dst":       dstPath,
499         }
500         if err == nil {
501                 ctx.LogI("tx", sds, "sent")
502         } else {
503                 sds["err"] = err
504                 ctx.LogE("tx", sds, "sent")
505         }
506         return err
507 }
508
509 func (ctx *Ctx) TxExec(
510         node *Node,
511         nice, replyNice uint8,
512         handle string,
513         args []string,
514         in io.Reader,
515         minSize int64,
516 ) error {
517         path := make([][]byte, 0, 1+len(args))
518         path = append(path, []byte(handle))
519         for _, arg := range args {
520                 path = append(path, []byte(arg))
521         }
522         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
523         if err != nil {
524                 return err
525         }
526         var compressed bytes.Buffer
527         compressor, err := zstd.NewWriter(
528                 &compressed,
529                 zstd.WithEncoderLevel(zstd.SpeedDefault),
530         )
531         if err != nil {
532                 return err
533         }
534         _, err = io.Copy(compressor, in)
535         compressor.Close()
536         if err != nil {
537                 return err
538         }
539         size := int64(compressed.Len())
540         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
541         sds := SDS{
542                 "type":      "exec",
543                 "node":      node.Id,
544                 "nice":      strconv.Itoa(int(nice)),
545                 "replynice": strconv.Itoa(int(replyNice)),
546                 "dst":       strings.Join(append([]string{handle}, args...), " "),
547                 "size":      strconv.FormatInt(size, 10),
548         }
549         if err == nil {
550                 ctx.LogI("tx", sds, "sent")
551         } else {
552                 sds["err"] = err
553                 ctx.LogE("tx", sds, "sent")
554         }
555         return err
556 }
557
558 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
559         sds := SDS{
560                 "type": "trns",
561                 "node": node.Id,
562                 "nice": strconv.Itoa(int(nice)),
563                 "size": strconv.FormatInt(size, 10),
564         }
565         ctx.LogD("tx", sds, "taken")
566         if !ctx.IsEnoughSpace(size) {
567                 err := errors.New("is not enough space")
568                 ctx.LogE("tx", sds, err.Error())
569                 return err
570         }
571         tmp, err := ctx.NewTmpFileWHash()
572         if err != nil {
573                 return err
574         }
575         if _, err = io.Copy(tmp.W, src); err != nil {
576                 return err
577         }
578         nodePath := filepath.Join(ctx.Spool, node.Id.String())
579         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
580         if err == nil {
581                 ctx.LogI("tx", sds, "sent")
582         } else {
583                 sds["err"] = err
584                 ctx.LogI("tx", sds, "sent")
585         }
586         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
587         return err
588 }