]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Operations progress
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "hash"
27         "io"
28         "io/ioutil"
29         "os"
30         "path/filepath"
31         "strconv"
32         "strings"
33         "time"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/klauspost/compress/zstd"
37         "golang.org/x/crypto/blake2b"
38         "golang.org/x/crypto/chacha20poly1305"
39 )
40
41 const (
42         MaxFileSize = 1 << 62
43
44         TarBlockSize = 512
45         TarExt       = ".tar"
46 )
47
48 func (ctx *Ctx) Tx(
49         node *Node,
50         pkt *Pkt,
51         nice uint8,
52         size, minSize int64,
53         src io.Reader,
54         pktName string,
55 ) (*Node, error) {
56         hops := make([]*Node, 0, 1+len(node.Via))
57         hops = append(hops, node)
58         lastNode := node
59         for i := len(node.Via); i > 0; i-- {
60                 lastNode = ctx.Neigh[*node.Via[i-1]]
61                 hops = append(hops, lastNode)
62         }
63         expectedSize := size
64         for i := 0; i < len(hops); i++ {
65                 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
66         }
67         padSize := minSize - expectedSize
68         if padSize < 0 {
69                 padSize = 0
70         }
71         if !ctx.IsEnoughSpace(size + padSize) {
72                 return nil, errors.New("is not enough space")
73         }
74         tmp, err := ctx.NewTmpFileWHash()
75         if err != nil {
76                 return nil, err
77         }
78
79         errs := make(chan error)
80         curSize := size
81         pipeR, pipeW := io.Pipe()
82         go func(size int64, src io.Reader, dst io.WriteCloser) {
83                 ctx.LogD("tx", SDS{
84                         "node": hops[0].Id,
85                         "nice": int(nice),
86                         "size": size,
87                 }, "wrote")
88                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
89                 dst.Close()
90         }(curSize, src, pipeW)
91         curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
92
93         var pipeRPrev io.Reader
94         for i := 1; i < len(hops); i++ {
95                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
96                 pipeRPrev = pipeR
97                 pipeR, pipeW = io.Pipe()
98                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
99                         ctx.LogD("tx", SDS{
100                                 "node": node.Id,
101                                 "nice": int(nice),
102                                 "size": size,
103                         }, "trns wrote")
104                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
105                         dst.Close()
106                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
107                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
108         }
109         go func() {
110                 _, err := CopyProgressed(
111                         tmp.W, pipeR,
112                         SDS{"xx": string(TTx), "pkt": pktName, "fullsize": curSize},
113                         ctx.ShowPrgrs,
114                 )
115                 errs <- err
116         }()
117         for i := 0; i <= len(hops); i++ {
118                 err = <-errs
119                 if err != nil {
120                         tmp.Fd.Close()
121                         return nil, err
122                 }
123         }
124         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
125         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
126         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
127         return lastNode, err
128 }
129
130 type DummyCloser struct{}
131
132 func (dc DummyCloser) Close() error { return nil }
133
134 func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) {
135         if srcPath == "-" {
136                 // Read content from stdin, saving to temporary file, encrypting
137                 // on the fly
138                 src, err := ioutil.TempFile("", "nncp-file")
139                 if err != nil {
140                         rerr = err
141                         return
142                 }
143                 os.Remove(src.Name())
144                 tmpW := bufio.NewWriter(src)
145                 tmpKey := make([]byte, chacha20poly1305.KeySize)
146                 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
147                         return
148                 }
149                 aead, err := chacha20poly1305.New(tmpKey)
150                 if err != nil {
151                         rerr = err
152                         return
153                 }
154                 nonce := make([]byte, aead.NonceSize())
155                 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
156                 if err != nil {
157                         rerr = err
158                         return
159                 }
160                 fileSize = int64(written)
161                 if err = tmpW.Flush(); err != nil {
162                         return
163                 }
164                 src.Seek(0, io.SeekStart)
165                 r, w := io.Pipe()
166                 go func() {
167                         if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
168                                 w.CloseWithError(err)
169                         }
170                 }()
171                 reader = r
172                 closer = src
173                 return
174         }
175
176         srcStat, err := os.Stat(srcPath)
177         if err != nil {
178                 rerr = err
179                 return
180         }
181         mode := srcStat.Mode()
182
183         if mode.IsRegular() {
184                 // It is regular file, just send it
185                 src, err := os.Open(srcPath)
186                 if err != nil {
187                         rerr = err
188                         return
189                 }
190                 fileSize = srcStat.Size()
191                 reader = bufio.NewReader(src)
192                 closer = src
193                 return
194         }
195
196         if !mode.IsDir() {
197                 rerr = errors.New("unsupported file type")
198                 return
199         }
200
201         // It is directory, create PAX archive with its contents
202         archived = true
203         basePath := filepath.Base(srcPath)
204         rootPath, err := filepath.Abs(srcPath)
205         if err != nil {
206                 rerr = err
207                 return
208         }
209         type einfo struct {
210                 path    string
211                 modTime time.Time
212                 size    int64
213         }
214         dirs := make([]einfo, 0, 1<<10)
215         files := make([]einfo, 0, 1<<10)
216         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
217                 if err != nil {
218                         return err
219                 }
220                 if info.IsDir() {
221                         // directory header, PAX record header+contents
222                         fileSize += TarBlockSize + 2*TarBlockSize
223                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
224                 } else {
225                         // file header, PAX record header+contents, file content
226                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
227                         if n := info.Size() % TarBlockSize; n != 0 {
228                                 fileSize += TarBlockSize - n // padding
229                         }
230                         files = append(files, einfo{
231                                 path:    path,
232                                 modTime: info.ModTime(),
233                                 size:    info.Size(),
234                         })
235                 }
236                 return nil
237         })
238         if rerr != nil {
239                 return
240         }
241
242         r, w := io.Pipe()
243         reader = r
244         closer = DummyCloser{}
245         fileSize += 2 * TarBlockSize // termination block
246
247         go func() {
248                 tarWr := tar.NewWriter(w)
249                 hdr := tar.Header{
250                         Typeflag: tar.TypeDir,
251                         Mode:     0777,
252                         PAXRecords: map[string]string{
253                                 "comment": "Autogenerated by " + VersionGet(),
254                         },
255                         Format: tar.FormatPAX,
256                 }
257                 for _, e := range dirs {
258                         hdr.Name = basePath + e.path[len(rootPath):]
259                         hdr.ModTime = e.modTime
260                         if err = tarWr.WriteHeader(&hdr); err != nil {
261                                 w.CloseWithError(err)
262                         }
263                 }
264                 hdr.Typeflag = tar.TypeReg
265                 hdr.Mode = 0666
266                 for _, e := range files {
267                         hdr.Name = basePath + e.path[len(rootPath):]
268                         hdr.ModTime = e.modTime
269                         hdr.Size = e.size
270                         if err = tarWr.WriteHeader(&hdr); err != nil {
271                                 w.CloseWithError(err)
272                         }
273                         fd, err := os.Open(e.path)
274                         if err != nil {
275                                 w.CloseWithError(err)
276                         }
277                         _, err = io.Copy(tarWr, bufio.NewReader(fd))
278                         if err != nil {
279                                 w.CloseWithError(err)
280                         }
281                         fd.Close()
282                 }
283                 tarWr.Close()
284                 w.Close()
285         }()
286         return
287 }
288
289 func (ctx *Ctx) TxFile(
290         node *Node,
291         nice uint8,
292         srcPath, dstPath string,
293         chunkSize int64,
294         minSize, maxSize int64,
295 ) error {
296         dstPathSpecified := false
297         if dstPath == "" {
298                 if srcPath == "-" {
299                         return errors.New("Must provide destination filename")
300                 }
301                 dstPath = filepath.Base(srcPath)
302         } else {
303                 dstPathSpecified = true
304         }
305         dstPath = filepath.Clean(dstPath)
306         if filepath.IsAbs(dstPath) {
307                 return errors.New("Relative destination path required")
308         }
309         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
310         if closer != nil {
311                 defer closer.Close()
312         }
313         if err != nil {
314                 return err
315         }
316         if fileSize > maxSize {
317                 return errors.New("Too big than allowed")
318         }
319         if archived && !dstPathSpecified {
320                 dstPath += TarExt
321         }
322
323         if fileSize <= chunkSize {
324                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
325                 if err != nil {
326                         return err
327                 }
328                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
329                 sds := SDS{
330                         "type": "file",
331                         "node": node.Id,
332                         "nice": int(nice),
333                         "src":  srcPath,
334                         "dst":  dstPath,
335                         "size": fileSize,
336                 }
337                 if err == nil {
338                         ctx.LogI("tx", sds, "sent")
339                 } else {
340                         ctx.LogE("tx", sds, err, "sent")
341                 }
342                 return err
343         }
344
345         leftSize := fileSize
346         metaPkt := ChunkedMeta{
347                 Magic:     MagicNNCPMv1,
348                 FileSize:  uint64(fileSize),
349                 ChunkSize: uint64(chunkSize),
350                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
351         }
352         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
353                 hsh := new([32]byte)
354                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
355         }
356         var sizeToSend int64
357         var hsh hash.Hash
358         var pkt *Pkt
359         var chunkNum int
360         var path string
361         for {
362                 if leftSize <= chunkSize {
363                         sizeToSend = leftSize
364                 } else {
365                         sizeToSend = chunkSize
366                 }
367                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
368                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
369                 if err != nil {
370                         return err
371                 }
372                 hsh, err = blake2b.New256(nil)
373                 if err != nil {
374                         return err
375                 }
376                 _, err = ctx.Tx(
377                         node,
378                         pkt,
379                         nice,
380                         sizeToSend,
381                         minSize,
382                         io.TeeReader(reader, hsh),
383                         path,
384                 )
385                 sds := SDS{
386                         "type": "file",
387                         "node": node.Id,
388                         "nice": int(nice),
389                         "src":  srcPath,
390                         "dst":  path,
391                         "size": sizeToSend,
392                 }
393                 if err == nil {
394                         ctx.LogI("tx", sds, "sent")
395                 } else {
396                         ctx.LogE("tx", sds, err, "sent")
397                         return err
398                 }
399                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
400                 leftSize -= sizeToSend
401                 chunkNum++
402                 if leftSize == 0 {
403                         break
404                 }
405         }
406         var metaBuf bytes.Buffer
407         _, err = xdr.Marshal(&metaBuf, metaPkt)
408         if err != nil {
409                 return err
410         }
411         path = dstPath + ChunkedSuffixMeta
412         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
413         if err != nil {
414                 return err
415         }
416         metaPktSize := int64(metaBuf.Len())
417         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
418         sds := SDS{
419                 "type": "file",
420                 "node": node.Id,
421                 "nice": int(nice),
422                 "src":  srcPath,
423                 "dst":  path,
424                 "size": metaPktSize,
425         }
426         if err == nil {
427                 ctx.LogI("tx", sds, "sent")
428         } else {
429                 ctx.LogE("tx", sds, err, "sent")
430         }
431         return err
432 }
433
434 func (ctx *Ctx) TxFreq(
435         node *Node,
436         nice, replyNice uint8,
437         srcPath, dstPath string,
438         minSize int64) error {
439         dstPath = filepath.Clean(dstPath)
440         if filepath.IsAbs(dstPath) {
441                 return errors.New("Relative destination path required")
442         }
443         srcPath = filepath.Clean(srcPath)
444         if filepath.IsAbs(srcPath) {
445                 return errors.New("Relative source path required")
446         }
447         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
448         if err != nil {
449                 return err
450         }
451         src := strings.NewReader(dstPath)
452         size := int64(src.Len())
453         _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
454         sds := SDS{
455                 "type":      "freq",
456                 "node":      node.Id,
457                 "nice":      int(nice),
458                 "replynice": int(replyNice),
459                 "src":       srcPath,
460                 "dst":       dstPath,
461         }
462         if err == nil {
463                 ctx.LogI("tx", sds, "sent")
464         } else {
465                 ctx.LogE("tx", sds, err, "sent")
466         }
467         return err
468 }
469
470 func (ctx *Ctx) TxExec(
471         node *Node,
472         nice, replyNice uint8,
473         handle string,
474         args []string,
475         in io.Reader,
476         minSize int64,
477 ) error {
478         path := make([][]byte, 0, 1+len(args))
479         path = append(path, []byte(handle))
480         for _, arg := range args {
481                 path = append(path, []byte(arg))
482         }
483         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
484         if err != nil {
485                 return err
486         }
487         var compressed bytes.Buffer
488         compressor, err := zstd.NewWriter(
489                 &compressed,
490                 zstd.WithEncoderLevel(zstd.SpeedDefault),
491         )
492         if err != nil {
493                 return err
494         }
495         _, err = io.Copy(compressor, in)
496         compressor.Close()
497         if err != nil {
498                 return err
499         }
500         size := int64(compressed.Len())
501         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
502         sds := SDS{
503                 "type":      "exec",
504                 "node":      node.Id,
505                 "nice":      int(nice),
506                 "replynice": int(replyNice),
507                 "dst":       strings.Join(append([]string{handle}, args...), " "),
508                 "size":      size,
509         }
510         if err == nil {
511                 ctx.LogI("tx", sds, "sent")
512         } else {
513                 ctx.LogE("tx", sds, err, "sent")
514         }
515         return err
516 }
517
518 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
519         sds := SDS{
520                 "type": "trns",
521                 "node": node.Id,
522                 "nice": int(nice),
523                 "size": size,
524         }
525         ctx.LogD("tx", sds, "taken")
526         if !ctx.IsEnoughSpace(size) {
527                 err := errors.New("is not enough space")
528                 ctx.LogE("tx", sds, err, err.Error())
529                 return err
530         }
531         tmp, err := ctx.NewTmpFileWHash()
532         if err != nil {
533                 return err
534         }
535         if _, err = CopyProgressed(tmp.W, src, SDS{
536                 "xx":       string(TTx),
537                 "pkt":      node.Id.String(),
538                 "fullsize": size,
539         }, ctx.ShowPrgrs); err != nil {
540                 return err
541         }
542         nodePath := filepath.Join(ctx.Spool, node.Id.String())
543         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
544         if err == nil {
545                 ctx.LogI("tx", sds, "sent")
546         } else {
547                 ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent")
548         }
549         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
550         return err
551 }