2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
36 xdr "github.com/davecgh/go-xdr/xdr2"
37 "github.com/dustin/go-humanize"
38 "github.com/klauspost/compress/zstd"
39 "golang.org/x/crypto/chacha20poly1305"
57 hops := make([]*Node, 0, 1+len(node.Via))
58 hops = append(hops, node)
60 for i := len(node.Via); i > 0; i-- {
61 lastNode = ctx.Neigh[*node.Via[i-1]]
62 hops = append(hops, lastNode)
65 for i := 0; i < len(hops); i++ {
66 expectedSize = PktEncOverhead +
68 sizeWithTags(PktOverhead+expectedSize)
70 padSize := minSize - expectedSize
74 if !ctx.IsEnoughSpace(size + padSize) {
75 return nil, errors.New("is not enough space")
77 tmp, err := ctx.NewTmpFileWHash()
82 errs := make(chan error)
83 pktEncRaws := make(chan []byte)
85 pipeR, pipeW := io.Pipe()
86 go func(size int64, src io.Reader, dst io.WriteCloser) {
91 }, func(les LEs) string {
93 "Tx packet to %s (%s) nice: %s",
94 ctx.NodeName(hops[0].Id),
95 humanize.IBytes(uint64(size)),
99 pktEncRaw, err := PktEncWrite(
100 ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
102 pktEncRaws <- pktEncRaw
104 dst.Close() // #nosec G104
105 }(curSize, src, pipeW)
106 curSize = PktEncOverhead +
108 sizeWithTags(PktOverhead+curSize) +
111 var pipeRPrev io.Reader
112 for i := 1; i < len(hops); i++ {
113 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
118 pipeR, pipeW = io.Pipe()
119 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
124 }, func(les LEs) string {
126 "Tx trns packet to %s (%s) nice: %s",
127 ctx.NodeName(node.Id),
128 humanize.IBytes(uint64(size)),
132 pktEncRaw, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
133 pktEncRaws <- pktEncRaw
135 dst.Close() // #nosec G104
136 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
137 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
140 _, err := CopyProgressed(
142 LEs{{"Pkt", pktName}, {"FullSize", curSize}},
148 for i := 0; i < len(hops); i++ {
149 pktEncRaw = <-pktEncRaws
151 for i := 0; i <= len(hops); i++ {
154 tmp.Fd.Close() // #nosec G104
158 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
159 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
160 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
165 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
170 type DummyCloser struct{}
172 func (dc DummyCloser) Close() error { return nil }
174 func throughTmpFile(r io.Reader) (
180 src, err := ioutil.TempFile("", "nncp-file")
185 os.Remove(src.Name()) // #nosec G104
186 tmpW := bufio.NewWriter(src)
187 tmpKey := make([]byte, chacha20poly1305.KeySize)
188 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
191 aead, err := chacha20poly1305.New(tmpKey)
196 nonce := make([]byte, aead.NonceSize())
197 written, err := aeadProcess(aead, nonce, nil, true, r, tmpW)
202 fileSize = int64(written)
203 if err = tmpW.Flush(); err != nil {
207 if _, err = src.Seek(0, io.SeekStart); err != nil {
213 if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil {
214 w.CloseWithError(err) // #nosec G104
222 func prepareTxFile(srcPath string) (
230 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
234 srcStat, err := os.Stat(srcPath)
239 mode := srcStat.Mode()
241 if mode.IsRegular() {
242 // It is regular file, just send it
243 src, err := os.Open(srcPath)
248 fileSize = srcStat.Size()
249 reader = bufio.NewReader(src)
255 rerr = errors.New("unsupported file type")
259 // It is directory, create PAX archive with its contents
261 basePath := filepath.Base(srcPath)
262 rootPath, err := filepath.Abs(srcPath)
272 dirs := make([]einfo, 0, 1<<10)
273 files := make([]einfo, 0, 1<<10)
274 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
279 // directory header, PAX record header+contents
280 fileSize += TarBlockSize + 2*TarBlockSize
281 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
283 // file header, PAX record header+contents, file content
284 fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
285 if n := info.Size() % TarBlockSize; n != 0 {
286 fileSize += TarBlockSize - n // padding
288 files = append(files, einfo{
290 modTime: info.ModTime(),
302 closer = DummyCloser{}
303 fileSize += 2 * TarBlockSize // termination block
306 tarWr := tar.NewWriter(w)
308 Typeflag: tar.TypeDir,
310 PAXRecords: map[string]string{
311 "comment": "Autogenerated by " + VersionGet(),
313 Format: tar.FormatPAX,
315 for _, e := range dirs {
316 hdr.Name = basePath + e.path[len(rootPath):]
317 hdr.ModTime = e.modTime
318 if err = tarWr.WriteHeader(&hdr); err != nil {
319 return w.CloseWithError(err)
322 hdr.Typeflag = tar.TypeReg
324 for _, e := range files {
325 hdr.Name = basePath + e.path[len(rootPath):]
326 hdr.ModTime = e.modTime
328 if err = tarWr.WriteHeader(&hdr); err != nil {
329 return w.CloseWithError(err)
331 fd, err := os.Open(e.path)
333 fd.Close() // #nosec G104
334 return w.CloseWithError(err)
336 if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
337 fd.Close() // #nosec G104
338 return w.CloseWithError(err)
340 fd.Close() // #nosec G104
342 if err = tarWr.Close(); err != nil {
343 return w.CloseWithError(err)
350 func (ctx *Ctx) TxFile(
353 srcPath, dstPath string,
355 minSize, maxSize int64,
357 dstPathSpecified := false
360 return errors.New("Must provide destination filename")
362 dstPath = filepath.Base(srcPath)
364 dstPathSpecified = true
366 dstPath = filepath.Clean(dstPath)
367 if filepath.IsAbs(dstPath) {
368 return errors.New("Relative destination path required")
370 reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
377 if fileSize > maxSize {
378 return errors.New("Too big than allowed")
380 if archived && !dstPathSpecified {
384 if fileSize <= chunkSize {
385 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
389 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
398 logMsg := func(les LEs) string {
400 "File %s (%s) sent to %s:%s",
402 humanize.IBytes(uint64(fileSize)),
403 ctx.NodeName(node.Id),
408 ctx.LogI("tx", les, logMsg)
410 ctx.LogE("tx", les, err, logMsg)
416 metaPkt := ChunkedMeta{
417 Magic: MagicNNCPMv2.B,
418 FileSize: uint64(fileSize),
419 ChunkSize: uint64(chunkSize),
420 Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
422 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
423 hsh := new([MTHSize]byte)
424 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
432 if leftSize <= chunkSize {
433 sizeToSend = leftSize
435 sizeToSend = chunkSize
437 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
438 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
449 io.TeeReader(reader, hsh),
458 {"Size", sizeToSend},
460 logMsg := func(les LEs) string {
462 "File %s (%s) sent to %s:%s",
464 humanize.IBytes(uint64(sizeToSend)),
465 ctx.NodeName(node.Id),
470 ctx.LogI("tx", les, logMsg)
472 ctx.LogE("tx", les, err, logMsg)
475 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
476 leftSize -= sizeToSend
482 var metaBuf bytes.Buffer
483 _, err = xdr.Marshal(&metaBuf, metaPkt)
487 path = dstPath + ChunkedSuffixMeta
488 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
492 metaPktSize := int64(metaBuf.Len())
493 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
500 {"Size", metaPktSize},
502 logMsg := func(les LEs) string {
504 "File %s (%s) sent to %s:%s",
506 humanize.IBytes(uint64(metaPktSize)),
507 ctx.NodeName(node.Id),
512 ctx.LogI("tx", les, logMsg)
514 ctx.LogE("tx", les, err, logMsg)
519 func (ctx *Ctx) TxFreq(
521 nice, replyNice uint8,
522 srcPath, dstPath string,
523 minSize int64) error {
524 dstPath = filepath.Clean(dstPath)
525 if filepath.IsAbs(dstPath) {
526 return errors.New("Relative destination path required")
528 srcPath = filepath.Clean(srcPath)
529 if filepath.IsAbs(srcPath) {
530 return errors.New("Relative source path required")
532 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
536 src := strings.NewReader(dstPath)
537 size := int64(src.Len())
538 _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
543 {"ReplyNice", int(replyNice)},
547 logMsg := func(les LEs) string {
549 "File request from %s:%s to %s sent",
550 ctx.NodeName(node.Id), srcPath,
555 ctx.LogI("tx", les, logMsg)
557 ctx.LogE("tx", les, err, logMsg)
562 func (ctx *Ctx) TxExec(
564 nice, replyNice uint8,
572 path := make([][]byte, 0, 1+len(args))
573 path = append(path, []byte(handle))
574 for _, arg := range args {
575 path = append(path, []byte(arg))
577 pktType := PktTypeExec
579 pktType = PktTypeExecFat
581 pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
587 if !noCompress && !useTmp {
588 var compressed bytes.Buffer
589 compressor, err := zstd.NewWriter(
591 zstd.WithEncoderLevel(zstd.SpeedDefault),
596 if _, err = io.Copy(compressor, in); err != nil {
597 compressor.Close() // #nosec G104
600 if err = compressor.Close(); err != nil {
603 size = int64(compressed.Len())
604 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
606 if noCompress && !useTmp {
607 var data bytes.Buffer
608 if _, err = io.Copy(&data, in); err != nil {
611 size = int64(data.Len())
612 _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle)
614 if !noCompress && useTmp {
616 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
620 copyErr := make(chan error)
622 _, err := io.Copy(compressor, in)
624 compressor.Close() // #nosec G104
627 err = compressor.Close()
631 tmpReader, closer, fileSize, err := throughTmpFile(r)
643 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
645 if noCompress && useTmp {
646 tmpReader, closer, fileSize, err := throughTmpFile(in)
654 _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
657 dst := strings.Join(append([]string{handle}, args...), " ")
662 {"ReplyNice", int(replyNice)},
666 logMsg := func(les LEs) string {
668 "Exec sent to %s@%s (%s)",
669 ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
673 ctx.LogI("tx", les, logMsg)
675 ctx.LogE("tx", les, err, logMsg)
680 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
687 logMsg := func(les LEs) string {
689 "Transitional packet to %s (%s) (nice %s)",
690 ctx.NodeName(node.Id),
691 humanize.IBytes(uint64(size)),
695 ctx.LogD("tx", les, logMsg)
696 if !ctx.IsEnoughSpace(size) {
697 err := errors.New("is not enough space")
698 ctx.LogE("tx", les, err, logMsg)
701 tmp, err := ctx.NewTmpFileWHash()
705 if _, err = CopyProgressed(
706 tmp.W, src, "Tx trns",
707 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
712 nodePath := filepath.Join(ctx.Spool, node.Id.String())
713 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
715 ctx.LogI("tx", les, logMsg)
717 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
719 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104