2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
36 xdr "github.com/davecgh/go-xdr/xdr2"
37 "github.com/dustin/go-humanize"
38 "github.com/klauspost/compress/zstd"
39 "golang.org/x/crypto/chacha20poly1305"
57 hops := make([]*Node, 0, 1+len(node.Via))
58 hops = append(hops, node)
60 for i := len(node.Via); i > 0; i-- {
61 lastNode = ctx.Neigh[*node.Via[i-1]]
62 hops = append(hops, lastNode)
65 for i := 0; i < len(hops); i++ {
66 expectedSize = PktEncOverhead +
68 sizeWithTags(PktOverhead+expectedSize)
70 padSize := minSize - expectedSize
74 if !ctx.IsEnoughSpace(size + padSize) {
75 return nil, errors.New("is not enough space")
77 tmp, err := ctx.NewTmpFileWHash()
82 errs := make(chan error)
84 pipeR, pipeW := io.Pipe()
86 go func(size int64, src io.Reader, dst io.WriteCloser) {
91 }, func(les LEs) string {
93 "Tx packet to %s (%s) nice: %s",
94 ctx.NodeName(hops[0].Id),
95 humanize.IBytes(uint64(size)),
99 pktEncRaw, err = PktEncWrite(
100 ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
103 dst.Close() // #nosec G104
104 }(curSize, src, pipeW)
105 curSize = PktEncOverhead +
107 sizeWithTags(PktOverhead+curSize) +
110 var pipeRPrev io.Reader
111 for i := 1; i < len(hops); i++ {
112 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
114 pipeR, pipeW = io.Pipe()
115 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
120 }, func(les LEs) string {
122 "Tx trns packet to %s (%s) nice: %s",
123 ctx.NodeName(node.Id),
124 humanize.IBytes(uint64(size)),
128 _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
130 dst.Close() // #nosec G104
131 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
132 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
135 _, err := CopyProgressed(
137 LEs{{"Pkt", pktName}, {"FullSize", curSize}},
142 for i := 0; i <= len(hops); i++ {
145 tmp.Fd.Close() // #nosec G104
149 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
150 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
151 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
156 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
161 type DummyCloser struct{}
163 func (dc DummyCloser) Close() error { return nil }
165 func throughTmpFile(r io.Reader) (
171 src, err := ioutil.TempFile("", "nncp-file")
176 os.Remove(src.Name()) // #nosec G104
177 tmpW := bufio.NewWriter(src)
178 tmpKey := make([]byte, chacha20poly1305.KeySize)
179 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
182 aead, err := chacha20poly1305.New(tmpKey)
187 nonce := make([]byte, aead.NonceSize())
188 written, err := aeadProcess(aead, nonce, nil, true, r, tmpW)
193 fileSize = int64(written)
194 if err = tmpW.Flush(); err != nil {
198 if _, err = src.Seek(0, io.SeekStart); err != nil {
204 if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil {
205 w.CloseWithError(err) // #nosec G104
213 func prepareTxFile(srcPath string) (
221 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
225 srcStat, err := os.Stat(srcPath)
230 mode := srcStat.Mode()
232 if mode.IsRegular() {
233 // It is regular file, just send it
234 src, err := os.Open(srcPath)
239 fileSize = srcStat.Size()
240 reader = bufio.NewReader(src)
246 rerr = errors.New("unsupported file type")
250 // It is directory, create PAX archive with its contents
252 basePath := filepath.Base(srcPath)
253 rootPath, err := filepath.Abs(srcPath)
263 dirs := make([]einfo, 0, 1<<10)
264 files := make([]einfo, 0, 1<<10)
265 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
270 // directory header, PAX record header+contents
271 fileSize += TarBlockSize + 2*TarBlockSize
272 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
274 // file header, PAX record header+contents, file content
275 fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
276 if n := info.Size() % TarBlockSize; n != 0 {
277 fileSize += TarBlockSize - n // padding
279 files = append(files, einfo{
281 modTime: info.ModTime(),
293 closer = DummyCloser{}
294 fileSize += 2 * TarBlockSize // termination block
297 tarWr := tar.NewWriter(w)
299 Typeflag: tar.TypeDir,
301 PAXRecords: map[string]string{
302 "comment": "Autogenerated by " + VersionGet(),
304 Format: tar.FormatPAX,
306 for _, e := range dirs {
307 hdr.Name = basePath + e.path[len(rootPath):]
308 hdr.ModTime = e.modTime
309 if err = tarWr.WriteHeader(&hdr); err != nil {
310 return w.CloseWithError(err)
313 hdr.Typeflag = tar.TypeReg
315 for _, e := range files {
316 hdr.Name = basePath + e.path[len(rootPath):]
317 hdr.ModTime = e.modTime
319 if err = tarWr.WriteHeader(&hdr); err != nil {
320 return w.CloseWithError(err)
322 fd, err := os.Open(e.path)
324 fd.Close() // #nosec G104
325 return w.CloseWithError(err)
327 if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
328 fd.Close() // #nosec G104
329 return w.CloseWithError(err)
331 fd.Close() // #nosec G104
333 if err = tarWr.Close(); err != nil {
334 return w.CloseWithError(err)
341 func (ctx *Ctx) TxFile(
344 srcPath, dstPath string,
346 minSize, maxSize int64,
348 dstPathSpecified := false
351 return errors.New("Must provide destination filename")
353 dstPath = filepath.Base(srcPath)
355 dstPathSpecified = true
357 dstPath = filepath.Clean(dstPath)
358 if filepath.IsAbs(dstPath) {
359 return errors.New("Relative destination path required")
361 reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
368 if fileSize > maxSize {
369 return errors.New("Too big than allowed")
371 if archived && !dstPathSpecified {
375 if fileSize <= chunkSize {
376 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
380 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
389 logMsg := func(les LEs) string {
391 "File %s (%s) sent to %s:%s",
393 humanize.IBytes(uint64(fileSize)),
394 ctx.NodeName(node.Id),
399 ctx.LogI("tx", les, logMsg)
401 ctx.LogE("tx", les, err, logMsg)
407 metaPkt := ChunkedMeta{
409 FileSize: uint64(fileSize),
410 ChunkSize: uint64(chunkSize),
411 Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
413 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
414 hsh := new([MTHSize]byte)
415 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
423 if leftSize <= chunkSize {
424 sizeToSend = leftSize
426 sizeToSend = chunkSize
428 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
429 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
440 io.TeeReader(reader, hsh),
449 {"Size", sizeToSend},
451 logMsg := func(les LEs) string {
453 "File %s (%s) sent to %s:%s",
455 humanize.IBytes(uint64(sizeToSend)),
456 ctx.NodeName(node.Id),
461 ctx.LogI("tx", les, logMsg)
463 ctx.LogE("tx", les, err, logMsg)
466 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
467 leftSize -= sizeToSend
473 var metaBuf bytes.Buffer
474 _, err = xdr.Marshal(&metaBuf, metaPkt)
478 path = dstPath + ChunkedSuffixMeta
479 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
483 metaPktSize := int64(metaBuf.Len())
484 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
491 {"Size", metaPktSize},
493 logMsg := func(les LEs) string {
495 "File %s (%s) sent to %s:%s",
497 humanize.IBytes(uint64(metaPktSize)),
498 ctx.NodeName(node.Id),
503 ctx.LogI("tx", les, logMsg)
505 ctx.LogE("tx", les, err, logMsg)
510 func (ctx *Ctx) TxFreq(
512 nice, replyNice uint8,
513 srcPath, dstPath string,
514 minSize int64) error {
515 dstPath = filepath.Clean(dstPath)
516 if filepath.IsAbs(dstPath) {
517 return errors.New("Relative destination path required")
519 srcPath = filepath.Clean(srcPath)
520 if filepath.IsAbs(srcPath) {
521 return errors.New("Relative source path required")
523 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
527 src := strings.NewReader(dstPath)
528 size := int64(src.Len())
529 _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
534 {"ReplyNice", int(replyNice)},
538 logMsg := func(les LEs) string {
540 "File request from %s:%s to %s sent",
541 ctx.NodeName(node.Id), srcPath,
546 ctx.LogI("tx", les, logMsg)
548 ctx.LogE("tx", les, err, logMsg)
553 func (ctx *Ctx) TxExec(
555 nice, replyNice uint8,
563 path := make([][]byte, 0, 1+len(args))
564 path = append(path, []byte(handle))
565 for _, arg := range args {
566 path = append(path, []byte(arg))
568 pktType := PktTypeExec
570 pktType = PktTypeExecFat
572 pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
578 if !noCompress && !useTmp {
579 var compressed bytes.Buffer
580 compressor, err := zstd.NewWriter(
582 zstd.WithEncoderLevel(zstd.SpeedDefault),
587 if _, err = io.Copy(compressor, in); err != nil {
588 compressor.Close() // #nosec G104
591 if err = compressor.Close(); err != nil {
594 size = int64(compressed.Len())
595 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
597 if noCompress && !useTmp {
598 var data bytes.Buffer
599 if _, err = io.Copy(&data, in); err != nil {
602 size = int64(data.Len())
603 _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
605 if !noCompress && useTmp {
607 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
611 copyErr := make(chan error)
613 _, err := io.Copy(compressor, in)
615 compressor.Close() // #nosec G104
618 err = compressor.Close()
622 tmpReader, closer, fileSize, err := throughTmpFile(r)
634 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
636 if noCompress && useTmp {
637 tmpReader, closer, fileSize, err := throughTmpFile(in)
645 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
648 dst := strings.Join(append([]string{handle}, args...), " ")
653 {"ReplyNice", int(replyNice)},
657 logMsg := func(les LEs) string {
659 "Exec sent to %s@%s (%s)",
660 ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
664 ctx.LogI("tx", les, logMsg)
666 ctx.LogE("tx", les, err, logMsg)
671 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
678 logMsg := func(les LEs) string {
680 "Transitional packet to %s (%s) (nice %s)",
681 ctx.NodeName(node.Id),
682 humanize.IBytes(uint64(size)),
686 ctx.LogD("tx", les, logMsg)
687 if !ctx.IsEnoughSpace(size) {
688 err := errors.New("is not enough space")
689 ctx.LogE("tx", les, err, logMsg)
692 tmp, err := ctx.NewTmpFileWHash()
696 if _, err = CopyProgressed(
697 tmp.W, src, "Tx trns",
698 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
703 nodePath := filepath.Join(ctx.Spool, node.Id.String())
704 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
706 ctx.LogI("tx", les, logMsg)
708 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
710 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104