1 // NNCP -- Node to Node copy, utilities for store-and-forward data exchange
2 // Copyright (C) 2016-2024 Sergey Matveev <stargrave@stargrave.org>
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, version 3 of the License.
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 // GNU General Public License for more details.
13 // You should have received a copy of the GNU General Public License
14 // along with this program. If not, see <http://www.gnu.org/licenses/>.
31 xdr "github.com/davecgh/go-xdr/xdr2"
32 "github.com/dustin/go-humanize"
33 "github.com/klauspost/compress/zstd"
34 "golang.org/x/crypto/blake2b"
44 type PktEncWriteResult struct {
54 srcSize, minSize, maxSize int64,
58 ) (*Node, int64, string, error) {
61 area = ctx.AreaId2Area[*areaId]
63 return nil, 0, "", errors.New("area has no encryption keys")
66 hops := make([]*Node, 0, 1+len(node.Via))
67 hops = append(hops, node)
69 for i := len(node.Via); i > 0; i-- {
70 lastNode = ctx.Neigh[*node.Via[i-1]]
71 hops = append(hops, lastNode)
77 var expectedSize int64
79 expectedSize = srcSize + PktOverhead
80 expectedSize += sizePadCalc(expectedSize, minSize, wrappers)
81 expectedSize = PktEncOverhead + sizeWithTags(expectedSize)
82 if maxSize != 0 && expectedSize > maxSize {
83 return nil, 0, "", TooBig
85 if !ctx.IsEnoughSpace(expectedSize) {
86 return nil, 0, "", errors.New("is not enough space")
89 tmp, err := ctx.NewTmpFileWHash()
91 return nil, 0, "", err
94 results := make(chan PktEncWriteResult)
95 pipeR, pipeW := io.Pipe()
96 var pipeRPrev io.Reader
98 go func(src io.Reader, dst io.WriteCloser) {
100 {"Node", hops[0].Id},
102 {"Size", expectedSize},
103 }, func(les LEs) string {
105 "Tx packet to %s (source %s) nice: %s",
106 ctx.NodeName(hops[0].Id),
107 humanize.IBytes(uint64(expectedSize)),
111 pktEncRaw, size, err := PktEncWrite(
112 ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst,
114 results <- PktEncWriteResult{pktEncRaw, size, err}
118 go func(src io.Reader, dst io.WriteCloser) {
122 {"Size", expectedSize},
123 }, func(les LEs) string {
125 "Tx area packet to %s (source %s) nice: %s",
126 ctx.AreaName(areaId),
127 humanize.IBytes(uint64(expectedSize)),
131 areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
132 copy(areaNode.Id[:], area.Id[:])
133 copy(areaNode.ExchPub[:], area.Pub[:])
134 pktEncRaw, size, err := PktEncWrite(
135 ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst,
137 results <- PktEncWriteResult{pktEncRaw, size, err}
141 pipeR, pipeW = io.Pipe()
142 go func(src io.Reader, dst io.WriteCloser) {
143 pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
148 {"Node", hops[0].Id},
150 {"Size", expectedSize},
151 }, func(les LEs) string {
153 "Tx packet to %s (source %s) nice: %s",
154 ctx.NodeName(hops[0].Id),
155 humanize.IBytes(uint64(expectedSize)),
159 pktEncRaw, size, err := PktEncWrite(
160 ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst,
162 results <- PktEncWriteResult{pktEncRaw, size, err}
166 for i := 1; i < len(hops); i++ {
167 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
172 pipeR, pipeW = io.Pipe()
173 go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) {
177 }, func(les LEs) string {
179 "Tx trns packet to %s nice: %s",
180 ctx.NodeName(node.Id),
184 pktEncRaw, size, err := PktEncWrite(
185 ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst,
187 results <- PktEncWriteResult{pktEncRaw, size, err}
189 }(hops[i], pktTrns, pipeRPrev, pipeW)
192 _, err := CopyProgressed(
194 LEs{{"Pkt", pktName}, {"FullSize", expectedSize}},
197 results <- PktEncWriteResult{err: err}
201 var payloadSize int64
205 pktEncMsg = r.pktEncRaw
208 for i := 0; i <= wrappers; i++ {
212 return nil, 0, "", r.err
214 if r.pktEncRaw != nil {
215 pktEncRaw = r.pktEncRaw
216 if payloadSize == 0 {
221 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
222 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
223 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
225 return lastNode, 0, "", err
228 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
231 msgHashRaw := blake2b.Sum256(pktEncMsg)
232 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
233 seenDir := filepath.Join(
234 ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
236 seenPath := filepath.Join(seenDir, msgHash)
240 {"Size", expectedSize},
242 {"AreaMsg", msgHash},
244 logMsg := func(les LEs) string {
246 "Tx area packet to %s (source %s) nice: %s, area %s: %s",
247 ctx.NodeName(node.Id),
248 humanize.IBytes(uint64(expectedSize)),
254 if err = ensureDir(seenDir); err != nil {
255 ctx.LogE("tx-mkdir", les, err, logMsg)
256 return lastNode, 0, "", err
258 if fd, err := os.Create(seenPath); err == nil {
260 if err = DirSync(seenDir); err != nil {
261 ctx.LogE("tx-dirsync", les, err, logMsg)
262 return lastNode, 0, "", err
265 ctx.LogI("tx-area", les, logMsg)
267 return lastNode, payloadSize, tmp.Checksum(), err
270 type DummyCloser struct{}
272 func (dc DummyCloser) Close() error { return nil }
274 func prepareTxFile(srcPath string) (
287 srcStat, err := os.Stat(srcPath)
292 mode := srcStat.Mode()
294 if mode.IsRegular() {
295 // It is regular file, just send it
296 src, err := os.Open(srcPath)
303 srcSize = srcStat.Size()
308 rerr = errors.New("unsupported file type")
312 // It is directory, create PAX archive with its contents
314 basePath := filepath.Base(srcPath)
315 rootPath, err := filepath.Abs(srcPath)
325 dirs := make([]einfo, 0, 1<<10)
326 files := make([]einfo, 0, 1<<10)
327 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
331 if info.Mode().IsDir() {
332 // directory header, PAX record header+contents
333 srcSize += TarBlockSize + 2*TarBlockSize
334 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
335 } else if info.Mode().IsRegular() {
336 // file header, PAX record header+contents, file content
337 srcSize += TarBlockSize + 2*TarBlockSize + info.Size()
338 if n := info.Size() % TarBlockSize; n != 0 {
339 srcSize += TarBlockSize - n // padding
341 files = append(files, einfo{
343 modTime: info.ModTime(),
355 closer = DummyCloser{}
356 srcSize += 2 * TarBlockSize // termination block
359 tarWr := tar.NewWriter(w)
361 Typeflag: tar.TypeDir,
363 PAXRecords: map[string]string{
364 "comment": "Autogenerated by " + VersionGet(),
366 Format: tar.FormatPAX,
368 for _, e := range dirs {
369 hdr.Name = basePath + e.path[len(rootPath):]
370 hdr.ModTime = e.modTime
371 if err = tarWr.WriteHeader(&hdr); err != nil {
372 return w.CloseWithError(err)
375 hdr.Typeflag = tar.TypeReg
377 for _, e := range files {
378 hdr.Name = basePath + e.path[len(rootPath):]
379 hdr.ModTime = e.modTime
381 if err = tarWr.WriteHeader(&hdr); err != nil {
382 return w.CloseWithError(err)
384 fd, err := os.Open(e.path)
387 return w.CloseWithError(err)
390 tarWr, bufio.NewReaderSize(fd, MTHBlockSize),
393 return w.CloseWithError(err)
397 if err = tarWr.Close(); err != nil {
398 return w.CloseWithError(err)
405 func (ctx *Ctx) TxFile(
408 srcPath, dstPath string,
409 chunkSize, minSize, maxSize int64,
412 dstPathSpecified := false
415 return errors.New("Must provide destination filename")
417 dstPath = filepath.Base(srcPath)
419 dstPathSpecified = true
421 dstPath = filepath.Clean(dstPath)
422 if filepath.IsAbs(dstPath) {
423 return errors.New("Relative destination path required")
425 reader, closer, srcSize, archived, err := prepareTxFile(srcPath)
432 if archived && !dstPathSpecified {
436 if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) {
437 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
441 _, finalSize, pktName, err := ctx.Tx(
443 srcSize, minSize, maxSize,
444 bufio.NewReaderSize(reader, MTHBlockSize), dstPath, areaId,
455 logMsg := func(les LEs) string {
457 "File %s (%s) is sent to %s:%s",
459 humanize.IBytes(uint64(finalSize)),
460 ctx.NodeName(node.Id),
465 ctx.LogI("tx", les, logMsg)
467 ctx.LogE("tx", les, err, logMsg)
472 br := bufio.NewReaderSize(reader, MTHBlockSize)
475 checksums := [][MTHSize]byte{}
477 lr := io.LimitReader(br, chunkSize)
478 path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
479 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
484 _, size, pktName, err := ctx.Tx(
487 io.TeeReader(lr, hsh),
500 logMsg := func(les LEs) string {
502 "File %s (%s) is sent to %s:%s",
504 humanize.IBytes(uint64(size)),
505 ctx.NodeName(node.Id),
510 ctx.LogI("tx", les, logMsg)
512 ctx.LogE("tx", les, err, logMsg)
516 sizeFull += size - PktOverhead
517 var checksum [MTHSize]byte
518 hsh.Sum(checksum[:0])
519 checksums = append(checksums, checksum)
521 if size < chunkSize {
524 if _, err = br.Peek(1); err != nil {
529 metaPkt := ChunkedMeta{
530 Magic: MagicNNCPMv2.B,
531 FileSize: uint64(sizeFull),
532 ChunkSize: uint64(chunkSize),
533 Checksums: checksums,
536 _, err = xdr.Marshal(&buf, metaPkt)
540 path := dstPath + ChunkedSuffixMeta
541 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
545 metaPktSize := int64(buf.Len())
546 _, _, pktName, err := ctx.Tx(
550 metaPktSize, minSize, maxSize,
559 {"Size", metaPktSize},
562 logMsg := func(les LEs) string {
564 "File %s (%s) is sent to %s:%s",
566 humanize.IBytes(uint64(metaPktSize)),
567 ctx.NodeName(node.Id),
572 ctx.LogI("tx", les, logMsg)
574 ctx.LogE("tx", les, err, logMsg)
579 func (ctx *Ctx) TxFreq(
581 nice, replyNice uint8,
582 srcPath, dstPath string,
585 dstPath = filepath.Clean(dstPath)
586 if filepath.IsAbs(dstPath) {
587 return errors.New("Relative destination path required")
589 srcPath = filepath.Clean(srcPath)
590 if filepath.IsAbs(srcPath) {
591 return errors.New("Relative source path required")
593 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
597 src := strings.NewReader(dstPath)
598 size := int64(src.Len())
599 _, _, pktName, err := ctx.Tx(
600 node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil,
606 {"ReplyNice", int(replyNice)},
611 logMsg := func(les LEs) string {
613 "File request from %s:%s to %s is sent",
614 ctx.NodeName(node.Id), srcPath,
619 ctx.LogI("tx", les, logMsg)
621 ctx.LogE("tx", les, err, logMsg)
626 func (ctx *Ctx) TxExec(
628 nice, replyNice uint8,
632 minSize int64, maxSize int64,
636 path := make([][]byte, 0, 1+len(args))
637 path = append(path, []byte(handle))
638 for _, arg := range args {
639 path = append(path, []byte(arg))
641 pktType := PktTypeExec
643 pktType = PktTypeExecFat
645 pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
649 compressErr := make(chan error, 1)
652 compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault))
656 go func(r io.Reader) {
657 if _, err := io.Copy(compressor, r); err != nil {
661 compressErr <- compressor.Close()
666 _, size, pktName, err := ctx.Tx(
667 node, pkt, nice, 0, minSize, maxSize, in, handle, areaId,
675 dst := strings.Join(append([]string{handle}, args...), " ")
680 {"ReplyNice", int(replyNice)},
685 logMsg := func(les LEs) string {
687 "Exec is sent to %s@%s (%s)",
688 ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
692 ctx.LogI("tx", les, logMsg)
694 ctx.LogE("tx", les, err, logMsg)
699 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
706 logMsg := func(les LEs) string {
708 "Transitional packet to %s (%s) (nice %s)",
709 ctx.NodeName(node.Id),
710 humanize.IBytes(uint64(size)),
714 ctx.LogD("tx", les, logMsg)
715 if !ctx.IsEnoughSpace(size) {
716 err := errors.New("is not enough space")
717 ctx.LogE("tx", les, err, logMsg)
720 tmp, err := ctx.NewTmpFileWHash()
724 if _, err = CopyProgressed(
725 tmp.W, src, "Tx trns",
726 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
731 nodePath := filepath.Join(ctx.Spool, node.Id.String())
732 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
734 ctx.LogI("tx", les, logMsg)
736 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
738 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
742 func (ctx *Ctx) TxACK(
747 ) (pktName string, err error) {
748 hshRaw, err := Base32Codec.DecodeString(hsh)
752 if len(hshRaw) != MTHSize {
753 return "", errors.New("Invalid packet id size")
755 pkt, err := NewPkt(PktTypeACK, nice, []byte(hshRaw))
759 src := bytes.NewReader([]byte{})
760 _, _, pktName, err = ctx.Tx(
761 node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil,
770 logMsg := func(les LEs) string {
771 return fmt.Sprintf("ACK to %s of %s is sent", ctx.NodeName(node.Id), hsh)
774 ctx.LogI("tx", les, logMsg)
776 ctx.LogE("tx", les, err, logMsg)