]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
20853b0b27a3cd4a02eab6027a2081675bf6a11f
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "hash"
27         "io"
28         "io/ioutil"
29         "os"
30         "path/filepath"
31         "strconv"
32         "strings"
33         "time"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/klauspost/compress/zstd"
37         "golang.org/x/crypto/blake2b"
38         "golang.org/x/crypto/chacha20poly1305"
39 )
40
41 const (
42         MaxFileSize = 1 << 62
43
44         TarBlockSize = 512
45         TarExt       = ".tar"
46 )
47
48 func (ctx *Ctx) Tx(
49         node *Node,
50         pkt *Pkt,
51         nice uint8,
52         size, minSize int64,
53         src io.Reader,
54 ) (*Node, error) {
55         hops := make([]*Node, 0, 1+len(node.Via))
56         hops = append(hops, node)
57         lastNode := node
58         for i := len(node.Via); i > 0; i-- {
59                 lastNode = ctx.Neigh[*node.Via[i-1]]
60                 hops = append(hops, lastNode)
61         }
62         expectedSize := size
63         for i := 0; i < len(hops); i++ {
64                 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
65         }
66         padSize := minSize - expectedSize
67         if padSize < 0 {
68                 padSize = 0
69         }
70         if !ctx.IsEnoughSpace(size + padSize) {
71                 return nil, errors.New("is not enough space")
72         }
73         tmp, err := ctx.NewTmpFileWHash()
74         if err != nil {
75                 return nil, err
76         }
77
78         errs := make(chan error)
79         curSize := size
80         pipeR, pipeW := io.Pipe()
81         go func(size int64, src io.Reader, dst io.WriteCloser) {
82                 ctx.LogD("tx", SDS{
83                         "node": hops[0].Id,
84                         "nice": strconv.Itoa(int(nice)),
85                         "size": strconv.FormatInt(size, 10),
86                 }, "wrote")
87                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
88                 dst.Close()
89         }(curSize, src, pipeW)
90         curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
91
92         var pipeRPrev io.Reader
93         for i := 1; i < len(hops); i++ {
94                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
95                 pipeRPrev = pipeR
96                 pipeR, pipeW = io.Pipe()
97                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
98                         ctx.LogD("tx", SDS{
99                                 "node": node.Id,
100                                 "nice": strconv.Itoa(int(nice)),
101                                 "size": strconv.FormatInt(size, 10),
102                         }, "trns wrote")
103                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
104                         dst.Close()
105                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
106                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
107         }
108         go func() {
109                 _, err := io.Copy(tmp.W, pipeR)
110                 errs <- err
111         }()
112         for i := 0; i <= len(hops); i++ {
113                 err = <-errs
114                 if err != nil {
115                         tmp.Fd.Close()
116                         return nil, err
117                 }
118         }
119         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
120         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
121         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
122         return lastNode, err
123 }
124
125 type DummyCloser struct{}
126
127 func (dc DummyCloser) Close() error { return nil }
128
129 func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) {
130         if srcPath == "-" {
131                 // Read content from stdin, saving to temporary file, encrypting
132                 // on the fly
133                 src, err := ioutil.TempFile("", "nncp-file")
134                 if err != nil {
135                         rerr = err
136                         return
137                 }
138                 os.Remove(src.Name())
139                 tmpW := bufio.NewWriter(src)
140                 tmpKey := make([]byte, chacha20poly1305.KeySize)
141                 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
142                         return
143                 }
144                 aead, err := chacha20poly1305.New(tmpKey)
145                 if err != nil {
146                         rerr = err
147                         return
148                 }
149                 nonce := make([]byte, aead.NonceSize())
150                 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
151                 if err != nil {
152                         rerr = err
153                         return
154                 }
155                 fileSize = int64(written)
156                 if err = tmpW.Flush(); err != nil {
157                         return
158                 }
159                 src.Seek(0, io.SeekStart)
160                 r, w := io.Pipe()
161                 go func() {
162                         if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
163                                 w.CloseWithError(err)
164                         }
165                 }()
166                 reader = r
167                 closer = src
168                 return
169         }
170
171         srcStat, err := os.Stat(srcPath)
172         if err != nil {
173                 rerr = err
174                 return
175         }
176         mode := srcStat.Mode()
177
178         if mode.IsRegular() {
179                 // It is regular file, just send it
180                 src, err := os.Open(srcPath)
181                 if err != nil {
182                         rerr = err
183                         return
184                 }
185                 fileSize = srcStat.Size()
186                 reader = bufio.NewReader(src)
187                 closer = src
188                 return
189         }
190
191         if !mode.IsDir() {
192                 rerr = errors.New("unsupported file type")
193                 return
194         }
195
196         // It is directory, create PAX archive with its contents
197         archived = true
198         basePath := filepath.Base(srcPath)
199         rootPath, err := filepath.Abs(srcPath)
200         if err != nil {
201                 rerr = err
202                 return
203         }
204         type einfo struct {
205                 path    string
206                 modTime time.Time
207                 size    int64
208         }
209         dirs := make([]einfo, 0, 1<<10)
210         files := make([]einfo, 0, 1<<10)
211         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
212                 if err != nil {
213                         return err
214                 }
215                 if info.IsDir() {
216                         // directory header, PAX record header+contents
217                         fileSize += TarBlockSize + 2*TarBlockSize
218                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
219                 } else {
220                         // file header, PAX record header+contents, file content
221                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
222                         if n := info.Size() % TarBlockSize; n != 0 {
223                                 fileSize += TarBlockSize - n // padding
224                         }
225                         files = append(files, einfo{
226                                 path:    path,
227                                 modTime: info.ModTime(),
228                                 size:    info.Size(),
229                         })
230                 }
231                 return nil
232         })
233         if rerr != nil {
234                 return
235         }
236
237         r, w := io.Pipe()
238         reader = r
239         closer = DummyCloser{}
240         fileSize += 2 * TarBlockSize // termination block
241
242         go func() {
243                 tarWr := tar.NewWriter(w)
244                 hdr := tar.Header{
245                         Typeflag: tar.TypeDir,
246                         Mode:     0777,
247                         PAXRecords: map[string]string{
248                                 "comment": "Autogenerated by " + VersionGet(),
249                         },
250                         Format: tar.FormatPAX,
251                 }
252                 for _, e := range dirs {
253                         hdr.Name = basePath + e.path[len(rootPath):]
254                         hdr.ModTime = e.modTime
255                         if err = tarWr.WriteHeader(&hdr); err != nil {
256                                 w.CloseWithError(err)
257                         }
258                 }
259                 hdr.Typeflag = tar.TypeReg
260                 hdr.Mode = 0666
261                 for _, e := range files {
262                         hdr.Name = basePath + e.path[len(rootPath):]
263                         hdr.ModTime = e.modTime
264                         hdr.Size = e.size
265                         if err = tarWr.WriteHeader(&hdr); err != nil {
266                                 w.CloseWithError(err)
267                         }
268                         fd, err := os.Open(e.path)
269                         if err != nil {
270                                 w.CloseWithError(err)
271                         }
272                         _, err = io.Copy(tarWr, bufio.NewReader(fd))
273                         if err != nil {
274                                 w.CloseWithError(err)
275                         }
276                         fd.Close()
277                 }
278                 tarWr.Close()
279                 w.Close()
280         }()
281         return
282 }
283
284 func (ctx *Ctx) TxFile(
285         node *Node,
286         nice uint8,
287         srcPath, dstPath string,
288         chunkSize int64,
289         minSize, maxSize int64,
290 ) error {
291         dstPathSpecified := false
292         if dstPath == "" {
293                 if srcPath == "-" {
294                         return errors.New("Must provide destination filename")
295                 }
296                 dstPath = filepath.Base(srcPath)
297         } else {
298                 dstPathSpecified = true
299         }
300         dstPath = filepath.Clean(dstPath)
301         if filepath.IsAbs(dstPath) {
302                 return errors.New("Relative destination path required")
303         }
304         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
305         if closer != nil {
306                 defer closer.Close()
307         }
308         if err != nil {
309                 return err
310         }
311         if fileSize > maxSize {
312                 return errors.New("Too big than allowed")
313         }
314         if archived && !dstPathSpecified {
315                 dstPath += TarExt
316         }
317
318         if fileSize <= chunkSize {
319                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
320                 if err != nil {
321                         return err
322                 }
323                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
324                 sds := SDS{
325                         "type": "file",
326                         "node": node.Id,
327                         "nice": strconv.Itoa(int(nice)),
328                         "src":  srcPath,
329                         "dst":  dstPath,
330                         "size": strconv.FormatInt(fileSize, 10),
331                 }
332                 if err == nil {
333                         ctx.LogI("tx", sds, "sent")
334                 } else {
335                         ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent")
336                 }
337                 return err
338         }
339
340         leftSize := fileSize
341         metaPkt := ChunkedMeta{
342                 Magic:     MagicNNCPMv1,
343                 FileSize:  uint64(fileSize),
344                 ChunkSize: uint64(chunkSize),
345                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
346         }
347         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
348                 hsh := new([32]byte)
349                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
350         }
351         var sizeToSend int64
352         var hsh hash.Hash
353         var pkt *Pkt
354         var chunkNum int
355         var path string
356         for {
357                 if leftSize <= chunkSize {
358                         sizeToSend = leftSize
359                 } else {
360                         sizeToSend = chunkSize
361                 }
362                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
363                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
364                 if err != nil {
365                         return err
366                 }
367                 hsh, err = blake2b.New256(nil)
368                 if err != nil {
369                         return err
370                 }
371                 _, err = ctx.Tx(
372                         node,
373                         pkt,
374                         nice,
375                         sizeToSend,
376                         minSize,
377                         io.TeeReader(reader, hsh),
378                 )
379                 sds := SDS{
380                         "type": "file",
381                         "node": node.Id,
382                         "nice": strconv.Itoa(int(nice)),
383                         "src":  srcPath,
384                         "dst":  path,
385                         "size": strconv.FormatInt(sizeToSend, 10),
386                 }
387                 if err == nil {
388                         ctx.LogI("tx", sds, "sent")
389                 } else {
390                         ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent")
391                         return err
392                 }
393                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
394                 leftSize -= sizeToSend
395                 chunkNum++
396                 if leftSize == 0 {
397                         break
398                 }
399         }
400         var metaBuf bytes.Buffer
401         _, err = xdr.Marshal(&metaBuf, metaPkt)
402         if err != nil {
403                 return err
404         }
405         path = dstPath + ChunkedSuffixMeta
406         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
407         if err != nil {
408                 return err
409         }
410         metaPktSize := int64(metaBuf.Len())
411         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
412         sds := SDS{
413                 "type": "file",
414                 "node": node.Id,
415                 "nice": strconv.Itoa(int(nice)),
416                 "src":  srcPath,
417                 "dst":  path,
418                 "size": strconv.FormatInt(metaPktSize, 10),
419         }
420         if err == nil {
421                 ctx.LogI("tx", sds, "sent")
422         } else {
423                 ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent")
424         }
425         return err
426 }
427
428 func (ctx *Ctx) TxFreq(
429         node *Node,
430         nice, replyNice uint8,
431         srcPath, dstPath string,
432         minSize int64) error {
433         dstPath = filepath.Clean(dstPath)
434         if filepath.IsAbs(dstPath) {
435                 return errors.New("Relative destination path required")
436         }
437         srcPath = filepath.Clean(srcPath)
438         if filepath.IsAbs(srcPath) {
439                 return errors.New("Relative source path required")
440         }
441         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
442         if err != nil {
443                 return err
444         }
445         src := strings.NewReader(dstPath)
446         size := int64(src.Len())
447         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
448         sds := SDS{
449                 "type":      "freq",
450                 "node":      node.Id,
451                 "nice":      strconv.Itoa(int(nice)),
452                 "replynice": strconv.Itoa(int(replyNice)),
453                 "src":       srcPath,
454                 "dst":       dstPath,
455         }
456         if err == nil {
457                 ctx.LogI("tx", sds, "sent")
458         } else {
459                 ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent")
460         }
461         return err
462 }
463
464 func (ctx *Ctx) TxExec(
465         node *Node,
466         nice, replyNice uint8,
467         handle string,
468         args []string,
469         in io.Reader,
470         minSize int64,
471 ) error {
472         path := make([][]byte, 0, 1+len(args))
473         path = append(path, []byte(handle))
474         for _, arg := range args {
475                 path = append(path, []byte(arg))
476         }
477         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
478         if err != nil {
479                 return err
480         }
481         var compressed bytes.Buffer
482         compressor, err := zstd.NewWriter(
483                 &compressed,
484                 zstd.WithEncoderLevel(zstd.SpeedDefault),
485         )
486         if err != nil {
487                 return err
488         }
489         _, err = io.Copy(compressor, in)
490         compressor.Close()
491         if err != nil {
492                 return err
493         }
494         size := int64(compressed.Len())
495         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
496         sds := SDS{
497                 "type":      "exec",
498                 "node":      node.Id,
499                 "nice":      strconv.Itoa(int(nice)),
500                 "replynice": strconv.Itoa(int(replyNice)),
501                 "dst":       strings.Join(append([]string{handle}, args...), " "),
502                 "size":      strconv.FormatInt(size, 10),
503         }
504         if err == nil {
505                 ctx.LogI("tx", sds, "sent")
506         } else {
507                 ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent")
508         }
509         return err
510 }
511
512 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
513         sds := SDS{
514                 "type": "trns",
515                 "node": node.Id,
516                 "nice": strconv.Itoa(int(nice)),
517                 "size": strconv.FormatInt(size, 10),
518         }
519         ctx.LogD("tx", sds, "taken")
520         if !ctx.IsEnoughSpace(size) {
521                 err := errors.New("is not enough space")
522                 ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), err.Error())
523                 return err
524         }
525         tmp, err := ctx.NewTmpFileWHash()
526         if err != nil {
527                 return err
528         }
529         if _, err = io.Copy(tmp.W, src); err != nil {
530                 return err
531         }
532         nodePath := filepath.Join(ctx.Spool, node.Id.String())
533         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
534         if err == nil {
535                 ctx.LogI("tx", sds, "sent")
536         } else {
537                 ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent")
538         }
539         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
540         return err
541 }