]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Reduce code with SdsAdd
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "hash"
27         "io"
28         "io/ioutil"
29         "os"
30         "path/filepath"
31         "strconv"
32         "strings"
33         "time"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/klauspost/compress/zstd"
37         "golang.org/x/crypto/blake2b"
38         "golang.org/x/crypto/chacha20poly1305"
39 )
40
41 const (
42         MaxFileSize = 1 << 62
43
44         TarBlockSize = 512
45         TarExt       = ".tar"
46 )
47
48 func (ctx *Ctx) Tx(
49         node *Node,
50         pkt *Pkt,
51         nice uint8,
52         size, minSize int64,
53         src io.Reader,
54 ) (*Node, error) {
55         hops := make([]*Node, 0, 1+len(node.Via))
56         hops = append(hops, node)
57         lastNode := node
58         for i := len(node.Via); i > 0; i-- {
59                 lastNode = ctx.Neigh[*node.Via[i-1]]
60                 hops = append(hops, lastNode)
61         }
62         expectedSize := size
63         for i := 0; i < len(hops); i++ {
64                 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
65         }
66         padSize := minSize - expectedSize
67         if padSize < 0 {
68                 padSize = 0
69         }
70         if !ctx.IsEnoughSpace(size + padSize) {
71                 return nil, errors.New("is not enough space")
72         }
73         tmp, err := ctx.NewTmpFileWHash()
74         if err != nil {
75                 return nil, err
76         }
77
78         errs := make(chan error)
79         curSize := size
80         pipeR, pipeW := io.Pipe()
81         go func(size int64, src io.Reader, dst io.WriteCloser) {
82                 ctx.LogD("tx", SDS{
83                         "node": hops[0].Id,
84                         "nice": strconv.Itoa(int(nice)),
85                         "size": strconv.FormatInt(size, 10),
86                 }, "wrote")
87                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
88                 dst.Close()
89         }(curSize, src, pipeW)
90         curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
91
92         var pipeRPrev io.Reader
93         for i := 1; i < len(hops); i++ {
94                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
95                 pipeRPrev = pipeR
96                 pipeR, pipeW = io.Pipe()
97                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
98                         ctx.LogD("tx", SDS{
99                                 "node": node.Id,
100                                 "nice": strconv.Itoa(int(nice)),
101                                 "size": strconv.FormatInt(size, 10),
102                         }, "trns wrote")
103                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
104                         dst.Close()
105                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
106                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
107         }
108         go func() {
109                 _, err := io.Copy(tmp.W, pipeR)
110                 errs <- err
111         }()
112         for i := 0; i <= len(hops); i++ {
113                 err = <-errs
114                 if err != nil {
115                         tmp.Fd.Close()
116                         return nil, err
117                 }
118         }
119         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
120         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
121         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
122         return lastNode, err
123 }
124
125 type DummyCloser struct{}
126
127 func (dc DummyCloser) Close() error { return nil }
128
129 func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) {
130         if srcPath == "-" {
131                 // Read content from stdin, saving to temporary file, encrypting
132                 // on the fly
133                 src, err := ioutil.TempFile("", "nncp-file")
134                 if err != nil {
135                         rerr = err
136                         return
137                 }
138                 os.Remove(src.Name())
139                 tmpW := bufio.NewWriter(src)
140                 tmpKey := make([]byte, chacha20poly1305.KeySize)
141                 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
142                         return
143                 }
144                 aead, err := chacha20poly1305.New(tmpKey)
145                 if err != nil {
146                         rerr = err
147                         return
148                 }
149                 nonce := make([]byte, aead.NonceSize())
150                 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
151                 if err != nil {
152                         rerr = err
153                         return
154                 }
155                 fileSize = int64(written)
156                 if err = tmpW.Flush(); err != nil {
157                         return
158                 }
159                 src.Seek(0, io.SeekStart)
160                 r, w := io.Pipe()
161                 go func() {
162                         if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
163                                 w.CloseWithError(err)
164                         }
165                 }()
166                 reader = r
167                 closer = src
168                 return
169         }
170
171         srcStat, err := os.Stat(srcPath)
172         if err != nil {
173                 rerr = err
174                 return
175         }
176         mode := srcStat.Mode()
177
178         if mode.IsRegular() {
179                 // It is regular file, just send it
180                 src, err := os.Open(srcPath)
181                 if err != nil {
182                         rerr = err
183                         return
184                 }
185                 fileSize = srcStat.Size()
186                 reader = bufio.NewReader(src)
187                 closer = src
188                 return
189         }
190
191         if !mode.IsDir() {
192                 rerr = errors.New("unsupported file type")
193                 return
194         }
195
196         // It is directory, create PAX archive with its contents
197         archived = true
198         basePath := filepath.Base(srcPath)
199         rootPath, err := filepath.Abs(srcPath)
200         if err != nil {
201                 rerr = err
202                 return
203         }
204         type einfo struct {
205                 path    string
206                 modTime time.Time
207                 size    int64
208         }
209         dirs := make([]einfo, 0, 1<<10)
210         files := make([]einfo, 0, 1<<10)
211         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
212                 if err != nil {
213                         return err
214                 }
215                 if info.IsDir() {
216                         // directory header, PAX record header+contents
217                         fileSize += TarBlockSize + 2*TarBlockSize
218                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
219                 } else {
220                         // file header, PAX record header+contents, file content
221                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
222                         if n := info.Size() % TarBlockSize; n != 0 {
223                                 fileSize += TarBlockSize - n // padding
224                         }
225                         files = append(files, einfo{
226                                 path:    path,
227                                 modTime: info.ModTime(),
228                                 size:    info.Size(),
229                         })
230                 }
231                 return nil
232         })
233         if rerr != nil {
234                 return
235         }
236
237         r, w := io.Pipe()
238         reader = r
239         closer = DummyCloser{}
240         fileSize += 2 * TarBlockSize // termination block
241
242         go func() {
243                 tarWr := tar.NewWriter(w)
244                 hdr := tar.Header{
245                         Typeflag: tar.TypeDir,
246                         Mode:     0777,
247                         PAXRecords: map[string]string{
248                                 "comment": "Autogenerated by " + VersionGet(),
249                         },
250                         Format: tar.FormatPAX,
251                 }
252                 for _, e := range dirs {
253                         hdr.Name = basePath + e.path[len(rootPath):]
254                         hdr.ModTime = e.modTime
255                         if err = tarWr.WriteHeader(&hdr); err != nil {
256                                 w.CloseWithError(err)
257                         }
258                 }
259                 hdr.Typeflag = tar.TypeReg
260                 hdr.Mode = 0666
261                 for _, e := range files {
262                         hdr.Name = basePath + e.path[len(rootPath):]
263                         hdr.ModTime = e.modTime
264                         hdr.Size = e.size
265                         if err = tarWr.WriteHeader(&hdr); err != nil {
266                                 w.CloseWithError(err)
267                         }
268                         fd, err := os.Open(e.path)
269                         if err != nil {
270                                 w.CloseWithError(err)
271                         }
272                         _, err = io.Copy(tarWr, bufio.NewReader(fd))
273                         if err != nil {
274                                 w.CloseWithError(err)
275                         }
276                         fd.Close()
277                 }
278                 tarWr.Close()
279                 w.Close()
280         }()
281         return
282 }
283
284 func (ctx *Ctx) TxFile(
285         node *Node,
286         nice uint8,
287         srcPath, dstPath string,
288         chunkSize int64,
289         minSize int64,
290 ) error {
291         dstPathSpecified := false
292         if dstPath == "" {
293                 if srcPath == "-" {
294                         return errors.New("Must provide destination filename")
295                 }
296                 dstPath = filepath.Base(srcPath)
297         } else {
298                 dstPathSpecified = true
299         }
300         dstPath = filepath.Clean(dstPath)
301         if filepath.IsAbs(dstPath) {
302                 return errors.New("Relative destination path required")
303         }
304         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
305         if closer != nil {
306                 defer closer.Close()
307         }
308         if err != nil {
309                 return err
310         }
311         if archived && !dstPathSpecified {
312                 dstPath += TarExt
313         }
314
315         if fileSize <= chunkSize {
316                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
317                 if err != nil {
318                         return err
319                 }
320                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
321                 sds := SDS{
322                         "type": "file",
323                         "node": node.Id,
324                         "nice": strconv.Itoa(int(nice)),
325                         "src":  srcPath,
326                         "dst":  dstPath,
327                         "size": strconv.FormatInt(fileSize, 10),
328                 }
329                 if err == nil {
330                         ctx.LogI("tx", sds, "sent")
331                 } else {
332                         ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent")
333                 }
334                 return err
335         }
336
337         leftSize := fileSize
338         metaPkt := ChunkedMeta{
339                 Magic:     MagicNNCPMv1,
340                 FileSize:  uint64(fileSize),
341                 ChunkSize: uint64(chunkSize),
342                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
343         }
344         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
345                 hsh := new([32]byte)
346                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
347         }
348         var sizeToSend int64
349         var hsh hash.Hash
350         var pkt *Pkt
351         var chunkNum int
352         var path string
353         for {
354                 if leftSize <= chunkSize {
355                         sizeToSend = leftSize
356                 } else {
357                         sizeToSend = chunkSize
358                 }
359                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
360                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
361                 if err != nil {
362                         return err
363                 }
364                 hsh, err = blake2b.New256(nil)
365                 if err != nil {
366                         return err
367                 }
368                 _, err = ctx.Tx(
369                         node,
370                         pkt,
371                         nice,
372                         sizeToSend,
373                         minSize,
374                         io.TeeReader(reader, hsh),
375                 )
376                 sds := SDS{
377                         "type": "file",
378                         "node": node.Id,
379                         "nice": strconv.Itoa(int(nice)),
380                         "src":  srcPath,
381                         "dst":  path,
382                         "size": strconv.FormatInt(sizeToSend, 10),
383                 }
384                 if err == nil {
385                         ctx.LogI("tx", sds, "sent")
386                 } else {
387                         ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent")
388                         return err
389                 }
390                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
391                 leftSize -= sizeToSend
392                 chunkNum++
393                 if leftSize == 0 {
394                         break
395                 }
396         }
397         var metaBuf bytes.Buffer
398         _, err = xdr.Marshal(&metaBuf, metaPkt)
399         if err != nil {
400                 return err
401         }
402         path = dstPath + ChunkedSuffixMeta
403         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
404         if err != nil {
405                 return err
406         }
407         metaPktSize := int64(metaBuf.Len())
408         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
409         sds := SDS{
410                 "type": "file",
411                 "node": node.Id,
412                 "nice": strconv.Itoa(int(nice)),
413                 "src":  srcPath,
414                 "dst":  path,
415                 "size": strconv.FormatInt(metaPktSize, 10),
416         }
417         if err == nil {
418                 ctx.LogI("tx", sds, "sent")
419         } else {
420                 ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent")
421         }
422         return err
423 }
424
425 func (ctx *Ctx) TxFreq(
426         node *Node,
427         nice, replyNice uint8,
428         srcPath, dstPath string,
429         minSize int64) error {
430         dstPath = filepath.Clean(dstPath)
431         if filepath.IsAbs(dstPath) {
432                 return errors.New("Relative destination path required")
433         }
434         srcPath = filepath.Clean(srcPath)
435         if filepath.IsAbs(srcPath) {
436                 return errors.New("Relative source path required")
437         }
438         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
439         if err != nil {
440                 return err
441         }
442         src := strings.NewReader(dstPath)
443         size := int64(src.Len())
444         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
445         sds := SDS{
446                 "type":      "freq",
447                 "node":      node.Id,
448                 "nice":      strconv.Itoa(int(nice)),
449                 "replynice": strconv.Itoa(int(replyNice)),
450                 "src":       srcPath,
451                 "dst":       dstPath,
452         }
453         if err == nil {
454                 ctx.LogI("tx", sds, "sent")
455         } else {
456                 ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent")
457         }
458         return err
459 }
460
461 func (ctx *Ctx) TxExec(
462         node *Node,
463         nice, replyNice uint8,
464         handle string,
465         args []string,
466         in io.Reader,
467         minSize int64,
468 ) error {
469         path := make([][]byte, 0, 1+len(args))
470         path = append(path, []byte(handle))
471         for _, arg := range args {
472                 path = append(path, []byte(arg))
473         }
474         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
475         if err != nil {
476                 return err
477         }
478         var compressed bytes.Buffer
479         compressor, err := zstd.NewWriter(
480                 &compressed,
481                 zstd.WithEncoderLevel(zstd.SpeedDefault),
482         )
483         if err != nil {
484                 return err
485         }
486         _, err = io.Copy(compressor, in)
487         compressor.Close()
488         if err != nil {
489                 return err
490         }
491         size := int64(compressed.Len())
492         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
493         sds := SDS{
494                 "type":      "exec",
495                 "node":      node.Id,
496                 "nice":      strconv.Itoa(int(nice)),
497                 "replynice": strconv.Itoa(int(replyNice)),
498                 "dst":       strings.Join(append([]string{handle}, args...), " "),
499                 "size":      strconv.FormatInt(size, 10),
500         }
501         if err == nil {
502                 ctx.LogI("tx", sds, "sent")
503         } else {
504                 ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent")
505         }
506         return err
507 }
508
509 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
510         sds := SDS{
511                 "type": "trns",
512                 "node": node.Id,
513                 "nice": strconv.Itoa(int(nice)),
514                 "size": strconv.FormatInt(size, 10),
515         }
516         ctx.LogD("tx", sds, "taken")
517         if !ctx.IsEnoughSpace(size) {
518                 err := errors.New("is not enough space")
519                 ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), err.Error())
520                 return err
521         }
522         tmp, err := ctx.NewTmpFileWHash()
523         if err != nil {
524                 return err
525         }
526         if _, err = io.Copy(tmp.W, src); err != nil {
527                 return err
528         }
529         nodePath := filepath.Join(ctx.Spool, node.Id.String())
530         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
531         if err == nil {
532                 ctx.LogI("tx", sds, "sent")
533         } else {
534                 ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent")
535         }
536         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
537         return err
538 }