]> Cypherpunks.ru repositories - nncp.git/blob - src/tx.go
Ability to send directories, creating pax archive
[nncp.git] / src / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "archive/tar"
22         "bufio"
23         "bytes"
24         "crypto/rand"
25         "errors"
26         "hash"
27         "io"
28         "io/ioutil"
29         "os"
30         "path/filepath"
31         "strconv"
32         "strings"
33         "time"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/klauspost/compress/zstd"
37         "golang.org/x/crypto/blake2b"
38         "golang.org/x/crypto/chacha20poly1305"
39 )
40
41 const (
42         TarBlockSize = 512
43         TarExt       = ".tar"
44 )
45
46 func (ctx *Ctx) Tx(
47         node *Node,
48         pkt *Pkt,
49         nice uint8,
50         size, minSize int64,
51         src io.Reader,
52 ) (*Node, error) {
53         tmp, err := ctx.NewTmpFileWHash()
54         if err != nil {
55                 return nil, err
56         }
57         hops := make([]*Node, 0, 1+len(node.Via))
58         hops = append(hops, node)
59         lastNode := node
60         for i := len(node.Via); i > 0; i-- {
61                 lastNode = ctx.Neigh[*node.Via[i-1]]
62                 hops = append(hops, lastNode)
63         }
64         expectedSize := size
65         for i := 0; i < len(hops); i++ {
66                 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
67         }
68         padSize := minSize - expectedSize
69         if padSize < 0 {
70                 padSize = 0
71         }
72         errs := make(chan error)
73         curSize := size
74         pipeR, pipeW := io.Pipe()
75         go func(size int64, src io.Reader, dst io.WriteCloser) {
76                 ctx.LogD("tx", SDS{
77                         "node": hops[0].Id,
78                         "nice": strconv.Itoa(int(nice)),
79                         "size": strconv.FormatInt(size, 10),
80                 }, "wrote")
81                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
82                 dst.Close()
83         }(curSize, src, pipeW)
84         curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
85
86         var pipeRPrev io.Reader
87         for i := 1; i < len(hops); i++ {
88                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
89                 pipeRPrev = pipeR
90                 pipeR, pipeW = io.Pipe()
91                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
92                         ctx.LogD("tx", SDS{
93                                 "node": node.Id,
94                                 "nice": strconv.Itoa(int(nice)),
95                                 "size": strconv.FormatInt(size, 10),
96                         }, "trns wrote")
97                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
98                         dst.Close()
99                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
100                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
101         }
102         go func() {
103                 _, err := io.Copy(tmp.W, pipeR)
104                 errs <- err
105         }()
106         for i := 0; i <= len(hops); i++ {
107                 err = <-errs
108                 if err != nil {
109                         tmp.Fd.Close()
110                         return nil, err
111                 }
112         }
113         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
114         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
115         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
116         return lastNode, err
117 }
118
119 type DummyCloser struct{}
120
121 func (dc DummyCloser) Close() error { return nil }
122
123 func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) {
124         if srcPath == "-" {
125                 // Read content from stdin, saving to temporary file, encrypting
126                 // on the fly
127                 src, err := ioutil.TempFile("", "nncp-file")
128                 if err != nil {
129                         rerr = err
130                         return
131                 }
132                 os.Remove(src.Name())
133                 tmpW := bufio.NewWriter(src)
134                 tmpKey := make([]byte, chacha20poly1305.KeySize)
135                 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
136                         return
137                 }
138                 aead, err := chacha20poly1305.New(tmpKey)
139                 if err != nil {
140                         rerr = err
141                         return
142                 }
143                 nonce := make([]byte, aead.NonceSize())
144                 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
145                 if err != nil {
146                         rerr = err
147                         return
148                 }
149                 fileSize = int64(written)
150                 if err = tmpW.Flush(); err != nil {
151                         return
152                 }
153                 src.Seek(0, io.SeekStart)
154                 r, w := io.Pipe()
155                 go func() {
156                         if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
157                                 w.CloseWithError(err)
158                         }
159                 }()
160                 reader = r
161                 closer = src
162                 return
163         }
164
165         srcStat, err := os.Stat(srcPath)
166         if err != nil {
167                 rerr = err
168                 return
169         }
170         mode := srcStat.Mode()
171
172         if mode.IsRegular() {
173                 // It is regular file, just send it
174                 src, err := os.Open(srcPath)
175                 if err != nil {
176                         rerr = err
177                         return
178                 }
179                 fileSize = srcStat.Size()
180                 reader = bufio.NewReader(src)
181                 closer = src
182                 return
183         }
184
185         if !mode.IsDir() {
186                 rerr = errors.New("unsupported file type")
187                 return
188         }
189
190         // It is directory, create PAX archive with its contents
191         archived = true
192         basePath := filepath.Base(srcPath)
193         rootPath, err := filepath.Abs(srcPath)
194         if err != nil {
195                 rerr = err
196                 return
197         }
198         type einfo struct {
199                 path    string
200                 modTime time.Time
201                 size    int64
202         }
203         dirs := make([]einfo, 0, 1<<10)
204         files := make([]einfo, 0, 1<<10)
205         rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
206                 if err != nil {
207                         return err
208                 }
209                 if info.IsDir() {
210                         // directory header, PAX record header+contents
211                         fileSize += TarBlockSize + 2*TarBlockSize
212                         dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
213                 } else {
214                         // file header, PAX record header+contents, file content
215                         fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
216                         if n := info.Size() % TarBlockSize; n != 0 {
217                                 fileSize += TarBlockSize - n // padding
218                         }
219                         files = append(files, einfo{
220                                 path:    path,
221                                 modTime: info.ModTime(),
222                                 size:    info.Size(),
223                         })
224                 }
225                 return nil
226         })
227         if rerr != nil {
228                 return
229         }
230
231         r, w := io.Pipe()
232         reader = r
233         closer = DummyCloser{}
234         fileSize += 2 * TarBlockSize // termination block
235
236         go func() {
237                 tarWr := tar.NewWriter(w)
238                 hdr := tar.Header{
239                         Typeflag: tar.TypeDir,
240                         Mode:     0777,
241                         PAXRecords: map[string]string{
242                                 "comment": "Autogenerated by " + VersionGet(),
243                         },
244                         Format: tar.FormatPAX,
245                 }
246                 for _, e := range dirs {
247                         hdr.Name = basePath + e.path[len(rootPath):]
248                         hdr.ModTime = e.modTime
249                         if err = tarWr.WriteHeader(&hdr); err != nil {
250                                 w.CloseWithError(err)
251                         }
252                 }
253                 hdr.Typeflag = tar.TypeReg
254                 hdr.Mode = 0666
255                 for _, e := range files {
256                         hdr.Name = basePath + e.path[len(rootPath):]
257                         hdr.ModTime = e.modTime
258                         hdr.Size = e.size
259                         if err = tarWr.WriteHeader(&hdr); err != nil {
260                                 w.CloseWithError(err)
261                         }
262                         fd, err := os.Open(e.path)
263                         if err != nil {
264                                 w.CloseWithError(err)
265                         }
266                         _, err = io.Copy(tarWr, bufio.NewReader(fd))
267                         if err != nil {
268                                 w.CloseWithError(err)
269                         }
270                         fd.Close()
271                 }
272                 tarWr.Close()
273                 w.Close()
274         }()
275         return
276 }
277
278 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
279         dstPathSpecified := false
280         if dstPath == "" {
281                 if srcPath == "-" {
282                         return errors.New("Must provide destination filename")
283                 }
284                 dstPath = filepath.Base(srcPath)
285         } else {
286                 dstPathSpecified = true
287         }
288         dstPath = filepath.Clean(dstPath)
289         if filepath.IsAbs(dstPath) {
290                 return errors.New("Relative destination path required")
291         }
292         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
293         if closer != nil {
294                 defer closer.Close()
295         }
296         if err != nil {
297                 return err
298         }
299         if archived && !dstPathSpecified {
300                 dstPath += TarExt
301         }
302         pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
303         if err != nil {
304                 return err
305         }
306         _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
307         sds := SDS{
308                 "type": "file",
309                 "node": node.Id,
310                 "nice": strconv.Itoa(int(nice)),
311                 "src":  srcPath,
312                 "dst":  dstPath,
313                 "size": strconv.FormatInt(fileSize, 10),
314         }
315         if err == nil {
316                 ctx.LogI("tx", sds, "sent")
317         } else {
318                 sds["err"] = err
319                 ctx.LogE("tx", sds, "sent")
320         }
321         return err
322 }
323
324 func (ctx *Ctx) TxFileChunked(
325         node *Node,
326         nice uint8,
327         srcPath, dstPath string,
328         minSize int64,
329         chunkSize int64,
330 ) error {
331         dstPathSpecified := false
332         if dstPath == "" {
333                 if srcPath == "-" {
334                         return errors.New("Must provide destination filename")
335                 }
336                 dstPath = filepath.Base(srcPath)
337         } else {
338                 dstPathSpecified = true
339         }
340         dstPath = filepath.Clean(dstPath)
341         if filepath.IsAbs(dstPath) {
342                 return errors.New("Relative destination path required")
343         }
344         reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
345         if closer != nil {
346                 defer closer.Close()
347         }
348         if err != nil {
349                 return err
350         }
351         if archived && !dstPathSpecified {
352                 dstPath += TarExt
353         }
354
355         if fileSize <= chunkSize {
356                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
357                 if err != nil {
358                         return err
359                 }
360                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
361                 sds := SDS{
362                         "type": "file",
363                         "node": node.Id,
364                         "nice": strconv.Itoa(int(nice)),
365                         "src":  srcPath,
366                         "dst":  dstPath,
367                         "size": strconv.FormatInt(fileSize, 10),
368                 }
369                 if err == nil {
370                         ctx.LogI("tx", sds, "sent")
371                 } else {
372                         sds["err"] = err
373                         ctx.LogE("tx", sds, "sent")
374                 }
375                 return err
376         }
377
378         leftSize := fileSize
379         metaPkt := ChunkedMeta{
380                 Magic:     MagicNNCPMv1,
381                 FileSize:  uint64(fileSize),
382                 ChunkSize: uint64(chunkSize),
383                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
384         }
385         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
386                 hsh := new([32]byte)
387                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
388         }
389         var sizeToSend int64
390         var hsh hash.Hash
391         var pkt *Pkt
392         var chunkNum int
393         var path string
394         for {
395                 if leftSize <= chunkSize {
396                         sizeToSend = leftSize
397                 } else {
398                         sizeToSend = chunkSize
399                 }
400                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
401                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
402                 if err != nil {
403                         return err
404                 }
405                 hsh, err = blake2b.New256(nil)
406                 if err != nil {
407                         return err
408                 }
409                 _, err = ctx.Tx(
410                         node,
411                         pkt,
412                         nice,
413                         sizeToSend,
414                         minSize,
415                         io.TeeReader(reader, hsh),
416                 )
417                 sds := SDS{
418                         "type": "file",
419                         "node": node.Id,
420                         "nice": strconv.Itoa(int(nice)),
421                         "src":  srcPath,
422                         "dst":  path,
423                         "size": strconv.FormatInt(sizeToSend, 10),
424                 }
425                 if err == nil {
426                         ctx.LogI("tx", sds, "sent")
427                 } else {
428                         sds["err"] = err
429                         ctx.LogE("tx", sds, "sent")
430                         return err
431                 }
432                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
433                 leftSize -= sizeToSend
434                 chunkNum++
435                 if leftSize == 0 {
436                         break
437                 }
438         }
439         var metaBuf bytes.Buffer
440         _, err = xdr.Marshal(&metaBuf, metaPkt)
441         if err != nil {
442                 return err
443         }
444         path = dstPath + ChunkedSuffixMeta
445         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
446         if err != nil {
447                 return err
448         }
449         metaPktSize := int64(metaBuf.Len())
450         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
451         sds := SDS{
452                 "type": "file",
453                 "node": node.Id,
454                 "nice": strconv.Itoa(int(nice)),
455                 "src":  srcPath,
456                 "dst":  path,
457                 "size": strconv.FormatInt(metaPktSize, 10),
458         }
459         if err == nil {
460                 ctx.LogI("tx", sds, "sent")
461         } else {
462                 sds["err"] = err
463                 ctx.LogE("tx", sds, "sent")
464         }
465         return err
466 }
467
468 func (ctx *Ctx) TxFreq(
469         node *Node,
470         nice, replyNice uint8,
471         srcPath, dstPath string,
472         minSize int64) error {
473         dstPath = filepath.Clean(dstPath)
474         if filepath.IsAbs(dstPath) {
475                 return errors.New("Relative destination path required")
476         }
477         srcPath = filepath.Clean(srcPath)
478         if filepath.IsAbs(srcPath) {
479                 return errors.New("Relative source path required")
480         }
481         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
482         if err != nil {
483                 return err
484         }
485         src := strings.NewReader(dstPath)
486         size := int64(src.Len())
487         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
488         sds := SDS{
489                 "type":      "freq",
490                 "node":      node.Id,
491                 "nice":      strconv.Itoa(int(nice)),
492                 "replynice": strconv.Itoa(int(replyNice)),
493                 "src":       srcPath,
494                 "dst":       dstPath,
495         }
496         if err == nil {
497                 ctx.LogI("tx", sds, "sent")
498         } else {
499                 sds["err"] = err
500                 ctx.LogE("tx", sds, "sent")
501         }
502         return err
503 }
504
505 func (ctx *Ctx) TxExec(
506         node *Node,
507         nice, replyNice uint8,
508         handle string,
509         args []string,
510         in io.Reader,
511         minSize int64,
512 ) error {
513         path := make([][]byte, 0, 1+len(args))
514         path = append(path, []byte(handle))
515         for _, arg := range args {
516                 path = append(path, []byte(arg))
517         }
518         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
519         if err != nil {
520                 return err
521         }
522         var compressed bytes.Buffer
523         compressor, err := zstd.NewWriter(
524                 &compressed,
525                 zstd.WithEncoderLevel(zstd.SpeedDefault),
526         )
527         if err != nil {
528                 return err
529         }
530         _, err = io.Copy(compressor, in)
531         compressor.Close()
532         if err != nil {
533                 return err
534         }
535         size := int64(compressed.Len())
536         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
537         sds := SDS{
538                 "type":      "exec",
539                 "node":      node.Id,
540                 "nice":      strconv.Itoa(int(nice)),
541                 "replynice": strconv.Itoa(int(replyNice)),
542                 "dst":       strings.Join(append([]string{handle}, args...), " "),
543                 "size":      strconv.FormatInt(size, 10),
544         }
545         if err == nil {
546                 ctx.LogI("tx", sds, "sent")
547         } else {
548                 sds["err"] = err
549                 ctx.LogE("tx", sds, "sent")
550         }
551         return err
552 }
553
554 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
555         sds := SDS{
556                 "type": "trns",
557                 "node": node.Id,
558                 "nice": strconv.Itoa(int(nice)),
559                 "size": strconv.FormatInt(size, 10),
560         }
561         ctx.LogD("tx", sds, "taken")
562         tmp, err := ctx.NewTmpFileWHash()
563         if err != nil {
564                 return err
565         }
566         if _, err = io.Copy(tmp.W, src); err != nil {
567                 return err
568         }
569         nodePath := filepath.Join(ctx.Spool, node.Id.String())
570         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
571         if err == nil {
572                 ctx.LogI("tx", sds, "sent")
573         } else {
574                 sds["err"] = err
575                 ctx.LogI("tx", sds, "sent")
576         }
577         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
578         return err
579 }