2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
35 xdr "github.com/davecgh/go-xdr/xdr2"
36 "github.com/klauspost/compress/zstd"
37 "golang.org/x/crypto/blake2b"
38 "golang.org/x/crypto/chacha20poly1305"
56 hops := make([]*Node, 0, 1+len(node.Via))
57 hops = append(hops, node)
59 for i := len(node.Via); i > 0; i-- {
60 lastNode = ctx.Neigh[*node.Via[i-1]]
61 hops = append(hops, lastNode)
64 for i := 0; i < len(hops); i++ {
65 expectedSize = PktEncOverhead +
67 sizeWithTags(PktOverhead+expectedSize)
69 padSize := minSize - expectedSize
73 if !ctx.IsEnoughSpace(size + padSize) {
74 return nil, errors.New("is not enough space")
76 tmp, err := ctx.NewTmpFileWHash()
81 errs := make(chan error)
83 pipeR, pipeW := io.Pipe()
85 go func(size int64, src io.Reader, dst io.WriteCloser) {
91 pktEncRaw, err = PktEncWrite(
92 ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
95 dst.Close() // #nosec G104
96 }(curSize, src, pipeW)
97 curSize = PktEncOverhead +
99 sizeWithTags(PktOverhead+curSize) +
102 var pipeRPrev io.Reader
103 for i := 1; i < len(hops); i++ {
104 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
106 pipeR, pipeW = io.Pipe()
107 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
113 _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
115 dst.Close() // #nosec G104
116 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
117 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
120 _, err := CopyProgressed(
122 LEs{{"Pkt", pktName}, {"FullSize", curSize}},
127 for i := 0; i <= len(hops); i++ {
130 tmp.Fd.Close() // #nosec G104
134 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
135 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
136 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
141 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
146 type DummyCloser struct{}
148 func (dc DummyCloser) Close() error { return nil }
150 func throughTmpFile(r io.Reader) (
156 src, err := ioutil.TempFile("", "nncp-file")
161 os.Remove(src.Name()) // #nosec G104
162 tmpW := bufio.NewWriter(src)
163 tmpKey := make([]byte, chacha20poly1305.KeySize)
164 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
167 aead, err := chacha20poly1305.New(tmpKey)
172 nonce := make([]byte, aead.NonceSize())
173 written, err := aeadProcess(aead, nonce, true, r, tmpW)
178 fileSize = int64(written)
179 if err = tmpW.Flush(); err != nil {
183 if _, err = src.Seek(0, io.SeekStart); err != nil {
189 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
190 w.CloseWithError(err) // #nosec G104
198 func prepareTxFile(srcPath string) (
206 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
210 srcStat, err := os.Stat(srcPath)
215 mode := srcStat.Mode()
217 if mode.IsRegular() {
218 // It is regular file, just send it
219 src, err := os.Open(srcPath)
224 fileSize = srcStat.Size()
225 reader = bufio.NewReader(src)
231 rerr = errors.New("unsupported file type")
235 // It is directory, create PAX archive with its contents
237 basePath := filepath.Base(srcPath)
238 rootPath, err := filepath.Abs(srcPath)
248 dirs := make([]einfo, 0, 1<<10)
249 files := make([]einfo, 0, 1<<10)
250 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
255 // directory header, PAX record header+contents
256 fileSize += TarBlockSize + 2*TarBlockSize
257 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
259 // file header, PAX record header+contents, file content
260 fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
261 if n := info.Size() % TarBlockSize; n != 0 {
262 fileSize += TarBlockSize - n // padding
264 files = append(files, einfo{
266 modTime: info.ModTime(),
278 closer = DummyCloser{}
279 fileSize += 2 * TarBlockSize // termination block
282 tarWr := tar.NewWriter(w)
284 Typeflag: tar.TypeDir,
286 PAXRecords: map[string]string{
287 "comment": "Autogenerated by " + VersionGet(),
289 Format: tar.FormatPAX,
291 for _, e := range dirs {
292 hdr.Name = basePath + e.path[len(rootPath):]
293 hdr.ModTime = e.modTime
294 if err = tarWr.WriteHeader(&hdr); err != nil {
295 return w.CloseWithError(err)
298 hdr.Typeflag = tar.TypeReg
300 for _, e := range files {
301 hdr.Name = basePath + e.path[len(rootPath):]
302 hdr.ModTime = e.modTime
304 if err = tarWr.WriteHeader(&hdr); err != nil {
305 return w.CloseWithError(err)
307 fd, err := os.Open(e.path)
309 fd.Close() // #nosec G104
310 return w.CloseWithError(err)
312 if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
313 fd.Close() // #nosec G104
314 return w.CloseWithError(err)
316 fd.Close() // #nosec G104
318 if err = tarWr.Close(); err != nil {
319 return w.CloseWithError(err)
326 func (ctx *Ctx) TxFile(
329 srcPath, dstPath string,
331 minSize, maxSize int64,
333 dstPathSpecified := false
336 return errors.New("Must provide destination filename")
338 dstPath = filepath.Base(srcPath)
340 dstPathSpecified = true
342 dstPath = filepath.Clean(dstPath)
343 if filepath.IsAbs(dstPath) {
344 return errors.New("Relative destination path required")
346 reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
353 if fileSize > maxSize {
354 return errors.New("Too big than allowed")
356 if archived && !dstPathSpecified {
360 if fileSize <= chunkSize {
361 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
365 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
375 ctx.LogI("tx", les, "sent")
377 ctx.LogE("tx", les, err, "sent")
383 metaPkt := ChunkedMeta{
385 FileSize: uint64(fileSize),
386 ChunkSize: uint64(chunkSize),
387 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
389 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
391 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
399 if leftSize <= chunkSize {
400 sizeToSend = leftSize
402 sizeToSend = chunkSize
404 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
405 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
409 hsh, err = blake2b.New256(nil)
419 io.TeeReader(reader, hsh),
428 {"Size", sizeToSend},
431 ctx.LogI("tx", les, "sent")
433 ctx.LogE("tx", les, err, "sent")
436 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
437 leftSize -= sizeToSend
443 var metaBuf bytes.Buffer
444 _, err = xdr.Marshal(&metaBuf, metaPkt)
448 path = dstPath + ChunkedSuffixMeta
449 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
453 metaPktSize := int64(metaBuf.Len())
454 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
461 {"Size", metaPktSize},
464 ctx.LogI("tx", les, "sent")
466 ctx.LogE("tx", les, err, "sent")
471 func (ctx *Ctx) TxFreq(
473 nice, replyNice uint8,
474 srcPath, dstPath string,
475 minSize int64) error {
476 dstPath = filepath.Clean(dstPath)
477 if filepath.IsAbs(dstPath) {
478 return errors.New("Relative destination path required")
480 srcPath = filepath.Clean(srcPath)
481 if filepath.IsAbs(srcPath) {
482 return errors.New("Relative source path required")
484 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
488 src := strings.NewReader(dstPath)
489 size := int64(src.Len())
490 _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
495 {"ReplyNice", int(replyNice)},
500 ctx.LogI("tx", les, "sent")
502 ctx.LogE("tx", les, err, "sent")
507 func (ctx *Ctx) TxExec(
509 nice, replyNice uint8,
517 path := make([][]byte, 0, 1+len(args))
518 path = append(path, []byte(handle))
519 for _, arg := range args {
520 path = append(path, []byte(arg))
522 pktType := PktTypeExec
524 pktType = PktTypeExecFat
526 pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
532 if !noCompress && !useTmp {
533 var compressed bytes.Buffer
534 compressor, err := zstd.NewWriter(
536 zstd.WithEncoderLevel(zstd.SpeedDefault),
541 if _, err = io.Copy(compressor, in); err != nil {
542 compressor.Close() // #nosec G104
545 if err = compressor.Close(); err != nil {
548 size = int64(compressed.Len())
549 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
551 if noCompress && !useTmp {
552 var data bytes.Buffer
553 if _, err = io.Copy(&data, in); err != nil {
556 size = int64(data.Len())
557 _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
559 if !noCompress && useTmp {
561 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
565 copyErr := make(chan error)
567 _, err := io.Copy(compressor, in)
569 compressor.Close() // #nosec G104
572 err = compressor.Close()
576 tmpReader, closer, fileSize, err := throughTmpFile(r)
588 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
590 if noCompress && useTmp {
591 tmpReader, closer, fileSize, err := throughTmpFile(in)
599 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
606 {"ReplyNice", int(replyNice)},
607 {"Dst", strings.Join(append([]string{handle}, args...), " ")},
611 ctx.LogI("tx", les, "sent")
613 ctx.LogE("tx", les, err, "sent")
618 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
625 ctx.LogD("tx", les, "taken")
626 if !ctx.IsEnoughSpace(size) {
627 err := errors.New("is not enough space")
628 ctx.LogE("tx", les, err, err.Error())
631 tmp, err := ctx.NewTmpFileWHash()
635 if _, err = CopyProgressed(
636 tmp.W, src, "Tx trns",
637 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
642 nodePath := filepath.Join(ctx.Spool, node.Id.String())
643 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
645 ctx.LogI("tx", les, "sent")
647 ctx.LogI("tx", append(les, LE{"Err", err}), "sent")
649 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104