2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
33 xdr "github.com/davecgh/go-xdr/xdr2"
34 "github.com/dustin/go-humanize"
35 "github.com/klauspost/compress/zstd"
36 "golang.org/x/crypto/blake2b"
46 type PktEncWriteResult struct {
56 srcSize, minSize, maxSize int64,
60 ) (*Node, int64, error) {
63 area = ctx.AreaId2Area[*areaId]
65 return nil, 0, errors.New("area has no encryption keys")
68 hops := make([]*Node, 0, 1+len(node.Via))
69 hops = append(hops, node)
71 for i := len(node.Via); i > 0; i-- {
72 lastNode = ctx.Neigh[*node.Via[i-1]]
73 hops = append(hops, lastNode)
79 var expectedSize int64
81 expectedSize = srcSize + PktOverhead
82 expectedSize += sizePadCalc(expectedSize, minSize, wrappers)
83 expectedSize = PktEncOverhead + sizeWithTags(expectedSize)
84 if maxSize != 0 && expectedSize > maxSize {
87 if !ctx.IsEnoughSpace(expectedSize) {
88 return nil, 0, errors.New("is not enough space")
91 tmp, err := ctx.NewTmpFileWHash()
96 results := make(chan PktEncWriteResult)
97 pipeR, pipeW := io.Pipe()
98 var pipeRPrev io.Reader
100 go func(src io.Reader, dst io.WriteCloser) {
102 {"Node", hops[0].Id},
104 {"Size", expectedSize},
105 }, func(les LEs) string {
107 "Tx packet to %s (source %s) nice: %s",
108 ctx.NodeName(hops[0].Id),
109 humanize.IBytes(uint64(expectedSize)),
113 pktEncRaw, size, err := PktEncWrite(
114 ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst,
116 results <- PktEncWriteResult{pktEncRaw, size, err}
120 go func(src io.Reader, dst io.WriteCloser) {
124 {"Size", expectedSize},
125 }, func(les LEs) string {
127 "Tx area packet to %s (source %s) nice: %s",
128 ctx.AreaName(areaId),
129 humanize.IBytes(uint64(expectedSize)),
133 areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
134 copy(areaNode.Id[:], area.Id[:])
135 copy(areaNode.ExchPub[:], area.Pub[:])
136 pktEncRaw, size, err := PktEncWrite(
137 ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst,
139 results <- PktEncWriteResult{pktEncRaw, size, err}
143 pipeR, pipeW = io.Pipe()
144 go func(src io.Reader, dst io.WriteCloser) {
145 pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
150 {"Node", hops[0].Id},
152 {"Size", expectedSize},
153 }, func(les LEs) string {
155 "Tx packet to %s (source %s) nice: %s",
156 ctx.NodeName(hops[0].Id),
157 humanize.IBytes(uint64(expectedSize)),
161 pktEncRaw, size, err := PktEncWrite(
162 ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst,
164 results <- PktEncWriteResult{pktEncRaw, size, err}
168 for i := 1; i < len(hops); i++ {
169 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
174 pipeR, pipeW = io.Pipe()
175 go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) {
179 }, func(les LEs) string {
181 "Tx trns packet to %s nice: %s",
182 ctx.NodeName(node.Id),
186 pktEncRaw, size, err := PktEncWrite(
187 ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst,
189 results <- PktEncWriteResult{pktEncRaw, size, err}
191 }(hops[i], pktTrns, pipeRPrev, pipeW)
194 _, err := CopyProgressed(
196 LEs{{"Pkt", pktName}, {"FullSize", expectedSize}},
199 results <- PktEncWriteResult{err: err}
203 var payloadSize int64
207 pktEncMsg = r.pktEncRaw
210 for i := 0; i <= wrappers; i++ {
216 if r.pktEncRaw != nil {
217 pktEncRaw = r.pktEncRaw
218 if payloadSize == 0 {
223 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
224 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
225 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
227 return lastNode, 0, err
230 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
233 msgHashRaw := blake2b.Sum256(pktEncMsg)
234 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
235 seenDir := filepath.Join(
236 ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
238 seenPath := filepath.Join(seenDir, msgHash)
242 {"Size", expectedSize},
244 {"AreaMsg", msgHash},
246 logMsg := func(les LEs) string {
248 "Tx area packet to %s (source %s) nice: %s, area %s: %s",
249 ctx.NodeName(node.Id),
250 humanize.IBytes(uint64(expectedSize)),
256 if err = ensureDir(seenDir); err != nil {
257 ctx.LogE("tx-mkdir", les, err, logMsg)
258 return lastNode, 0, err
260 if fd, err := os.Create(seenPath); err == nil {
262 if err = DirSync(seenDir); err != nil {
263 ctx.LogE("tx-dirsync", les, err, logMsg)
264 return lastNode, 0, err
267 ctx.LogI("tx-area", les, logMsg)
269 return lastNode, payloadSize, err
272 type DummyCloser struct{}
274 func (dc DummyCloser) Close() error { return nil }
276 func prepareTxFile(srcPath string) (
289 srcStat, err := os.Stat(srcPath)
294 mode := srcStat.Mode()
296 if mode.IsRegular() {
297 // It is regular file, just send it
298 src, err := os.Open(srcPath)
305 srcSize = srcStat.Size()
310 rerr = errors.New("unsupported file type")
314 // It is directory, create PAX archive with its contents
316 basePath := filepath.Base(srcPath)
317 rootPath, err := filepath.Abs(srcPath)
327 dirs := make([]einfo, 0, 1<<10)
328 files := make([]einfo, 0, 1<<10)
329 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
333 if info.Mode().IsDir() {
334 // directory header, PAX record header+contents
335 srcSize += TarBlockSize + 2*TarBlockSize
336 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
337 } else if info.Mode().IsRegular() {
338 // file header, PAX record header+contents, file content
339 srcSize += TarBlockSize + 2*TarBlockSize + info.Size()
340 if n := info.Size() % TarBlockSize; n != 0 {
341 srcSize += TarBlockSize - n // padding
343 files = append(files, einfo{
345 modTime: info.ModTime(),
357 closer = DummyCloser{}
358 srcSize += 2 * TarBlockSize // termination block
361 tarWr := tar.NewWriter(w)
363 Typeflag: tar.TypeDir,
365 PAXRecords: map[string]string{
366 "comment": "Autogenerated by " + VersionGet(),
368 Format: tar.FormatPAX,
370 for _, e := range dirs {
371 hdr.Name = basePath + e.path[len(rootPath):]
372 hdr.ModTime = e.modTime
373 if err = tarWr.WriteHeader(&hdr); err != nil {
374 return w.CloseWithError(err)
377 hdr.Typeflag = tar.TypeReg
379 for _, e := range files {
380 hdr.Name = basePath + e.path[len(rootPath):]
381 hdr.ModTime = e.modTime
383 if err = tarWr.WriteHeader(&hdr); err != nil {
384 return w.CloseWithError(err)
386 fd, err := os.Open(e.path)
389 return w.CloseWithError(err)
391 if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
393 return w.CloseWithError(err)
397 if err = tarWr.Close(); err != nil {
398 return w.CloseWithError(err)
405 func (ctx *Ctx) TxFile(
408 srcPath, dstPath string,
409 chunkSize, minSize, maxSize int64,
412 dstPathSpecified := false
415 return errors.New("Must provide destination filename")
417 dstPath = filepath.Base(srcPath)
419 dstPathSpecified = true
421 dstPath = filepath.Clean(dstPath)
422 if filepath.IsAbs(dstPath) {
423 return errors.New("Relative destination path required")
425 reader, closer, srcSize, archived, err := prepareTxFile(srcPath)
432 if archived && !dstPathSpecified {
436 if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) {
437 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
441 _, finalSize, err := ctx.Tx(
443 srcSize, minSize, maxSize,
444 bufio.NewReader(reader), dstPath, areaId,
454 logMsg := func(les LEs) string {
456 "File %s (%s) sent to %s:%s",
458 humanize.IBytes(uint64(finalSize)),
459 ctx.NodeName(node.Id),
464 ctx.LogI("tx", les, logMsg)
466 ctx.LogE("tx", les, err, logMsg)
471 br := bufio.NewReader(reader)
474 checksums := [][MTHSize]byte{}
476 lr := io.LimitReader(br, chunkSize)
477 path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
478 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
483 _, size, err := ctx.Tx(
486 io.TeeReader(lr, hsh),
498 logMsg := func(les LEs) string {
500 "File %s (%s) sent to %s:%s",
502 humanize.IBytes(uint64(size)),
503 ctx.NodeName(node.Id),
508 ctx.LogI("tx", les, logMsg)
510 ctx.LogE("tx", les, err, logMsg)
514 sizeFull += size - PktOverhead
515 var checksum [MTHSize]byte
516 hsh.Sum(checksum[:0])
517 checksums = append(checksums, checksum)
519 if size < chunkSize {
522 if _, err = br.Peek(1); err != nil {
527 metaPkt := ChunkedMeta{
528 Magic: MagicNNCPMv2.B,
529 FileSize: uint64(sizeFull),
530 ChunkSize: uint64(chunkSize),
531 Checksums: checksums,
534 _, err = xdr.Marshal(&buf, metaPkt)
538 path := dstPath + ChunkedSuffixMeta
539 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
543 metaPktSize := int64(buf.Len())
548 metaPktSize, minSize, maxSize,
557 {"Size", metaPktSize},
559 logMsg := func(les LEs) string {
561 "File %s (%s) sent to %s:%s",
563 humanize.IBytes(uint64(metaPktSize)),
564 ctx.NodeName(node.Id),
569 ctx.LogI("tx", les, logMsg)
571 ctx.LogE("tx", les, err, logMsg)
576 func (ctx *Ctx) TxFreq(
578 nice, replyNice uint8,
579 srcPath, dstPath string,
582 dstPath = filepath.Clean(dstPath)
583 if filepath.IsAbs(dstPath) {
584 return errors.New("Relative destination path required")
586 srcPath = filepath.Clean(srcPath)
587 if filepath.IsAbs(srcPath) {
588 return errors.New("Relative source path required")
590 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
594 src := strings.NewReader(dstPath)
595 size := int64(src.Len())
596 _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil)
601 {"ReplyNice", int(replyNice)},
605 logMsg := func(les LEs) string {
607 "File request from %s:%s to %s sent",
608 ctx.NodeName(node.Id), srcPath,
613 ctx.LogI("tx", les, logMsg)
615 ctx.LogE("tx", les, err, logMsg)
620 func (ctx *Ctx) TxExec(
622 nice, replyNice uint8,
626 minSize int64, maxSize int64,
630 path := make([][]byte, 0, 1+len(args))
631 path = append(path, []byte(handle))
632 for _, arg := range args {
633 path = append(path, []byte(arg))
635 pktType := PktTypeExec
637 pktType = PktTypeExecFat
639 pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
643 compressErr := make(chan error, 1)
646 compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault))
650 go func(r io.Reader) {
651 if _, err := io.Copy(compressor, r); err != nil {
655 compressErr <- compressor.Close()
660 _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId)
667 dst := strings.Join(append([]string{handle}, args...), " ")
672 {"ReplyNice", int(replyNice)},
676 logMsg := func(les LEs) string {
678 "Exec sent to %s@%s (%s)",
679 ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
683 ctx.LogI("tx", les, logMsg)
685 ctx.LogE("tx", les, err, logMsg)
690 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
697 logMsg := func(les LEs) string {
699 "Transitional packet to %s (%s) (nice %s)",
700 ctx.NodeName(node.Id),
701 humanize.IBytes(uint64(size)),
705 ctx.LogD("tx", les, logMsg)
706 if !ctx.IsEnoughSpace(size) {
707 err := errors.New("is not enough space")
708 ctx.LogE("tx", les, err, logMsg)
711 tmp, err := ctx.NewTmpFileWHash()
715 if _, err = CopyProgressed(
716 tmp.W, src, "Tx trns",
717 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
722 nodePath := filepath.Join(ctx.Spool, node.Id.String())
723 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
725 ctx.LogI("tx", les, logMsg)
727 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
729 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))