2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
36 xdr "github.com/davecgh/go-xdr/xdr2"
37 "github.com/dustin/go-humanize"
38 "github.com/klauspost/compress/zstd"
39 "golang.org/x/crypto/blake2b"
40 "golang.org/x/crypto/chacha20poly1305"
58 hops := make([]*Node, 0, 1+len(node.Via))
59 hops = append(hops, node)
61 for i := len(node.Via); i > 0; i-- {
62 lastNode = ctx.Neigh[*node.Via[i-1]]
63 hops = append(hops, lastNode)
66 for i := 0; i < len(hops); i++ {
67 expectedSize = PktEncOverhead +
69 sizeWithTags(PktOverhead+expectedSize)
71 padSize := minSize - expectedSize
75 if !ctx.IsEnoughSpace(size + padSize) {
76 return nil, errors.New("is not enough space")
78 tmp, err := ctx.NewTmpFileWHash()
83 errs := make(chan error)
85 pipeR, pipeW := io.Pipe()
87 go func(size int64, src io.Reader, dst io.WriteCloser) {
92 }, func(les LEs) string {
94 "Tx packet to %s (%s) nice: %s",
95 ctx.NodeName(hops[0].Id),
96 humanize.IBytes(uint64(size)),
100 pktEncRaw, err = PktEncWrite(
101 ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
104 dst.Close() // #nosec G104
105 }(curSize, src, pipeW)
106 curSize = PktEncOverhead +
108 sizeWithTags(PktOverhead+curSize) +
111 var pipeRPrev io.Reader
112 for i := 1; i < len(hops); i++ {
113 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
115 pipeR, pipeW = io.Pipe()
116 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
121 }, func(les LEs) string {
123 "Tx trns packet to %s (%s) nice: %s",
124 ctx.NodeName(node.Id),
125 humanize.IBytes(uint64(size)),
129 _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
131 dst.Close() // #nosec G104
132 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
133 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
136 _, err := CopyProgressed(
138 LEs{{"Pkt", pktName}, {"FullSize", curSize}},
143 for i := 0; i <= len(hops); i++ {
146 tmp.Fd.Close() // #nosec G104
150 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
151 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
152 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
157 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
162 type DummyCloser struct{}
164 func (dc DummyCloser) Close() error { return nil }
166 func throughTmpFile(r io.Reader) (
172 src, err := ioutil.TempFile("", "nncp-file")
177 os.Remove(src.Name()) // #nosec G104
178 tmpW := bufio.NewWriter(src)
179 tmpKey := make([]byte, chacha20poly1305.KeySize)
180 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
183 aead, err := chacha20poly1305.New(tmpKey)
188 nonce := make([]byte, aead.NonceSize())
189 written, err := aeadProcess(aead, nonce, true, r, tmpW)
194 fileSize = int64(written)
195 if err = tmpW.Flush(); err != nil {
199 if _, err = src.Seek(0, io.SeekStart); err != nil {
205 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
206 w.CloseWithError(err) // #nosec G104
214 func prepareTxFile(srcPath string) (
222 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
226 srcStat, err := os.Stat(srcPath)
231 mode := srcStat.Mode()
233 if mode.IsRegular() {
234 // It is regular file, just send it
235 src, err := os.Open(srcPath)
240 fileSize = srcStat.Size()
241 reader = bufio.NewReader(src)
247 rerr = errors.New("unsupported file type")
251 // It is directory, create PAX archive with its contents
253 basePath := filepath.Base(srcPath)
254 rootPath, err := filepath.Abs(srcPath)
264 dirs := make([]einfo, 0, 1<<10)
265 files := make([]einfo, 0, 1<<10)
266 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
271 // directory header, PAX record header+contents
272 fileSize += TarBlockSize + 2*TarBlockSize
273 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
275 // file header, PAX record header+contents, file content
276 fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
277 if n := info.Size() % TarBlockSize; n != 0 {
278 fileSize += TarBlockSize - n // padding
280 files = append(files, einfo{
282 modTime: info.ModTime(),
294 closer = DummyCloser{}
295 fileSize += 2 * TarBlockSize // termination block
298 tarWr := tar.NewWriter(w)
300 Typeflag: tar.TypeDir,
302 PAXRecords: map[string]string{
303 "comment": "Autogenerated by " + VersionGet(),
305 Format: tar.FormatPAX,
307 for _, e := range dirs {
308 hdr.Name = basePath + e.path[len(rootPath):]
309 hdr.ModTime = e.modTime
310 if err = tarWr.WriteHeader(&hdr); err != nil {
311 return w.CloseWithError(err)
314 hdr.Typeflag = tar.TypeReg
316 for _, e := range files {
317 hdr.Name = basePath + e.path[len(rootPath):]
318 hdr.ModTime = e.modTime
320 if err = tarWr.WriteHeader(&hdr); err != nil {
321 return w.CloseWithError(err)
323 fd, err := os.Open(e.path)
325 fd.Close() // #nosec G104
326 return w.CloseWithError(err)
328 if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
329 fd.Close() // #nosec G104
330 return w.CloseWithError(err)
332 fd.Close() // #nosec G104
334 if err = tarWr.Close(); err != nil {
335 return w.CloseWithError(err)
342 func (ctx *Ctx) TxFile(
345 srcPath, dstPath string,
347 minSize, maxSize int64,
349 dstPathSpecified := false
352 return errors.New("Must provide destination filename")
354 dstPath = filepath.Base(srcPath)
356 dstPathSpecified = true
358 dstPath = filepath.Clean(dstPath)
359 if filepath.IsAbs(dstPath) {
360 return errors.New("Relative destination path required")
362 reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
369 if fileSize > maxSize {
370 return errors.New("Too big than allowed")
372 if archived && !dstPathSpecified {
376 if fileSize <= chunkSize {
377 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
381 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
390 logMsg := func(les LEs) string {
392 "File %s (%s) sent to %s:%s",
394 humanize.IBytes(uint64(fileSize)),
395 ctx.NodeName(node.Id),
400 ctx.LogI("tx", les, logMsg)
402 ctx.LogE("tx", les, err, logMsg)
408 metaPkt := ChunkedMeta{
410 FileSize: uint64(fileSize),
411 ChunkSize: uint64(chunkSize),
412 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
414 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
416 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
424 if leftSize <= chunkSize {
425 sizeToSend = leftSize
427 sizeToSend = chunkSize
429 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
430 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
434 hsh, err = blake2b.New256(nil)
444 io.TeeReader(reader, hsh),
453 {"Size", sizeToSend},
455 logMsg := func(les LEs) string {
457 "File %s (%s) sent to %s:%s",
459 humanize.IBytes(uint64(sizeToSend)),
460 ctx.NodeName(node.Id),
465 ctx.LogI("tx", les, logMsg)
467 ctx.LogE("tx", les, err, logMsg)
470 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
471 leftSize -= sizeToSend
477 var metaBuf bytes.Buffer
478 _, err = xdr.Marshal(&metaBuf, metaPkt)
482 path = dstPath + ChunkedSuffixMeta
483 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
487 metaPktSize := int64(metaBuf.Len())
488 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
495 {"Size", metaPktSize},
497 logMsg := func(les LEs) string {
499 "File %s (%s) sent to %s:%s",
501 humanize.IBytes(uint64(metaPktSize)),
502 ctx.NodeName(node.Id),
507 ctx.LogI("tx", les, logMsg)
509 ctx.LogE("tx", les, err, logMsg)
514 func (ctx *Ctx) TxFreq(
516 nice, replyNice uint8,
517 srcPath, dstPath string,
518 minSize int64) error {
519 dstPath = filepath.Clean(dstPath)
520 if filepath.IsAbs(dstPath) {
521 return errors.New("Relative destination path required")
523 srcPath = filepath.Clean(srcPath)
524 if filepath.IsAbs(srcPath) {
525 return errors.New("Relative source path required")
527 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
531 src := strings.NewReader(dstPath)
532 size := int64(src.Len())
533 _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
538 {"ReplyNice", int(replyNice)},
542 logMsg := func(les LEs) string {
544 "File request from %s:%s to %s sent",
545 ctx.NodeName(node.Id), srcPath,
550 ctx.LogI("tx", les, logMsg)
552 ctx.LogE("tx", les, err, logMsg)
557 func (ctx *Ctx) TxExec(
559 nice, replyNice uint8,
567 path := make([][]byte, 0, 1+len(args))
568 path = append(path, []byte(handle))
569 for _, arg := range args {
570 path = append(path, []byte(arg))
572 pktType := PktTypeExec
574 pktType = PktTypeExecFat
576 pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
582 if !noCompress && !useTmp {
583 var compressed bytes.Buffer
584 compressor, err := zstd.NewWriter(
586 zstd.WithEncoderLevel(zstd.SpeedDefault),
591 if _, err = io.Copy(compressor, in); err != nil {
592 compressor.Close() // #nosec G104
595 if err = compressor.Close(); err != nil {
598 size = int64(compressed.Len())
599 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
601 if noCompress && !useTmp {
602 var data bytes.Buffer
603 if _, err = io.Copy(&data, in); err != nil {
606 size = int64(data.Len())
607 _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
609 if !noCompress && useTmp {
611 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
615 copyErr := make(chan error)
617 _, err := io.Copy(compressor, in)
619 compressor.Close() // #nosec G104
622 err = compressor.Close()
626 tmpReader, closer, fileSize, err := throughTmpFile(r)
638 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
640 if noCompress && useTmp {
641 tmpReader, closer, fileSize, err := throughTmpFile(in)
649 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
652 dst := strings.Join(append([]string{handle}, args...), " ")
657 {"ReplyNice", int(replyNice)},
661 logMsg := func(les LEs) string {
663 "Exec sent to %s@%s (%s)",
664 ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
668 ctx.LogI("tx", les, logMsg)
670 ctx.LogE("tx", les, err, logMsg)
675 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
682 logMsg := func(les LEs) string {
684 "Transitional packet to %s (%s) (nice %s)",
685 ctx.NodeName(node.Id),
686 humanize.IBytes(uint64(size)),
690 ctx.LogD("tx", les, logMsg)
691 if !ctx.IsEnoughSpace(size) {
692 err := errors.New("is not enough space")
693 ctx.LogE("tx", les, err, logMsg)
696 tmp, err := ctx.NewTmpFileWHash()
700 if _, err = CopyProgressed(
701 tmp.W, src, "Tx trns",
702 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
707 nodePath := filepath.Join(ctx.Spool, node.Id.String())
708 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
710 ctx.LogI("tx", les, logMsg)
712 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
714 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104