2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
33 xdr "github.com/davecgh/go-xdr/xdr2"
34 "github.com/dustin/go-humanize"
35 "github.com/klauspost/compress/zstd"
36 "golang.org/x/crypto/blake2b"
46 type PktEncWriteResult struct {
56 srcSize, minSize, maxSize int64,
60 ) (*Node, int64, error) {
63 area = ctx.AreaId2Area[*areaId]
65 return nil, 0, errors.New("area has no encryption keys")
68 hops := make([]*Node, 0, 1+len(node.Via))
69 hops = append(hops, node)
71 for i := len(node.Via); i > 0; i-- {
72 lastNode = ctx.Neigh[*node.Via[i-1]]
73 hops = append(hops, lastNode)
79 var expectedSize int64
81 expectedSize = srcSize + PktOverhead
82 expectedSize += sizePadCalc(expectedSize, minSize, wrappers)
83 expectedSize = PktEncOverhead + sizeWithTags(expectedSize)
84 if maxSize != 0 && expectedSize > maxSize {
87 if !ctx.IsEnoughSpace(expectedSize) {
88 return nil, 0, errors.New("is not enough space")
91 tmp, err := ctx.NewTmpFileWHash()
96 results := make(chan PktEncWriteResult)
97 pipeR, pipeW := io.Pipe()
98 var pipeRPrev io.Reader
100 go func(src io.Reader, dst io.WriteCloser) {
102 {"Node", hops[0].Id},
104 {"Size", expectedSize},
105 }, func(les LEs) string {
107 "Tx packet to %s (source %s) nice: %s",
108 ctx.NodeName(hops[0].Id),
109 humanize.IBytes(uint64(expectedSize)),
113 pktEncRaw, size, err := PktEncWrite(
114 ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst,
116 results <- PktEncWriteResult{pktEncRaw, size, err}
120 go func(src io.Reader, dst io.WriteCloser) {
124 {"Size", expectedSize},
125 }, func(les LEs) string {
127 "Tx area packet to %s (source %s) nice: %s",
128 ctx.AreaName(areaId),
129 humanize.IBytes(uint64(expectedSize)),
133 areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
134 copy(areaNode.Id[:], area.Id[:])
135 copy(areaNode.ExchPub[:], area.Pub[:])
136 pktEncRaw, size, err := PktEncWrite(
137 ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst,
139 results <- PktEncWriteResult{pktEncRaw, size, err}
143 pipeR, pipeW = io.Pipe()
144 go func(src io.Reader, dst io.WriteCloser) {
145 pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
150 {"Node", hops[0].Id},
152 {"Size", expectedSize},
153 }, func(les LEs) string {
155 "Tx packet to %s (source %s) nice: %s",
156 ctx.NodeName(hops[0].Id),
157 humanize.IBytes(uint64(expectedSize)),
161 pktEncRaw, size, err := PktEncWrite(
162 ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst,
164 results <- PktEncWriteResult{pktEncRaw, size, err}
168 for i := 1; i < len(hops); i++ {
169 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
174 pipeR, pipeW = io.Pipe()
175 go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) {
179 }, func(les LEs) string {
181 "Tx trns packet to %s nice: %s",
182 ctx.NodeName(node.Id),
186 pktEncRaw, size, err := PktEncWrite(
187 ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst,
189 results <- PktEncWriteResult{pktEncRaw, size, err}
191 }(hops[i], pktTrns, pipeRPrev, pipeW)
194 _, err := CopyProgressed(
196 LEs{{"Pkt", pktName}, {"FullSize", expectedSize}},
199 results <- PktEncWriteResult{err: err}
203 var payloadSize int64
207 pktEncMsg = r.pktEncRaw
209 for i := 0; i <= wrappers; i++ {
215 if r.pktEncRaw != nil {
216 pktEncRaw = r.pktEncRaw
217 if payloadSize == 0 {
222 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
223 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
224 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
226 return lastNode, 0, err
229 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
232 msgHashRaw := blake2b.Sum256(pktEncMsg)
233 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
234 seenDir := filepath.Join(
235 ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
237 seenPath := filepath.Join(seenDir, msgHash)
241 {"Size", expectedSize},
243 {"AreaMsg", msgHash},
245 logMsg := func(les LEs) string {
247 "Tx area packet to %s (source %s) nice: %s, area %s: %s",
248 ctx.NodeName(node.Id),
249 humanize.IBytes(uint64(expectedSize)),
255 if err = ensureDir(seenDir); err != nil {
256 ctx.LogE("tx-mkdir", les, err, logMsg)
257 return lastNode, 0, err
259 if fd, err := os.Create(seenPath); err == nil {
261 if err = DirSync(seenDir); err != nil {
262 ctx.LogE("tx-dirsync", les, err, logMsg)
263 return lastNode, 0, err
266 ctx.LogI("tx-area", les, logMsg)
268 return lastNode, payloadSize, err
271 type DummyCloser struct{}
273 func (dc DummyCloser) Close() error { return nil }
275 func prepareTxFile(srcPath string) (
288 srcStat, err := os.Stat(srcPath)
293 mode := srcStat.Mode()
295 if mode.IsRegular() {
296 // It is regular file, just send it
297 src, err := os.Open(srcPath)
304 srcSize = srcStat.Size()
309 rerr = errors.New("unsupported file type")
313 // It is directory, create PAX archive with its contents
315 basePath := filepath.Base(srcPath)
316 rootPath, err := filepath.Abs(srcPath)
326 dirs := make([]einfo, 0, 1<<10)
327 files := make([]einfo, 0, 1<<10)
328 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
333 // directory header, PAX record header+contents
334 srcSize += TarBlockSize + 2*TarBlockSize
335 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
337 // file header, PAX record header+contents, file content
338 srcSize += TarBlockSize + 2*TarBlockSize + info.Size()
339 if n := info.Size() % TarBlockSize; n != 0 {
340 srcSize += TarBlockSize - n // padding
342 files = append(files, einfo{
344 modTime: info.ModTime(),
356 closer = DummyCloser{}
357 srcSize += 2 * TarBlockSize // termination block
360 tarWr := tar.NewWriter(w)
362 Typeflag: tar.TypeDir,
364 PAXRecords: map[string]string{
365 "comment": "Autogenerated by " + VersionGet(),
367 Format: tar.FormatPAX,
369 for _, e := range dirs {
370 hdr.Name = basePath + e.path[len(rootPath):]
371 hdr.ModTime = e.modTime
372 if err = tarWr.WriteHeader(&hdr); err != nil {
373 return w.CloseWithError(err)
376 hdr.Typeflag = tar.TypeReg
378 for _, e := range files {
379 hdr.Name = basePath + e.path[len(rootPath):]
380 hdr.ModTime = e.modTime
382 if err = tarWr.WriteHeader(&hdr); err != nil {
383 return w.CloseWithError(err)
385 fd, err := os.Open(e.path)
388 return w.CloseWithError(err)
390 if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
392 return w.CloseWithError(err)
396 if err = tarWr.Close(); err != nil {
397 return w.CloseWithError(err)
404 func (ctx *Ctx) TxFile(
407 srcPath, dstPath string,
408 chunkSize, minSize, maxSize int64,
411 dstPathSpecified := false
414 return errors.New("Must provide destination filename")
416 dstPath = filepath.Base(srcPath)
418 dstPathSpecified = true
420 dstPath = filepath.Clean(dstPath)
421 if filepath.IsAbs(dstPath) {
422 return errors.New("Relative destination path required")
424 reader, closer, srcSize, archived, err := prepareTxFile(srcPath)
431 if archived && !dstPathSpecified {
435 if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) {
436 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
440 _, finalSize, err := ctx.Tx(
442 srcSize, minSize, maxSize,
443 bufio.NewReader(reader), dstPath, areaId,
453 logMsg := func(les LEs) string {
455 "File %s (%s) sent to %s:%s",
457 humanize.IBytes(uint64(finalSize)),
458 ctx.NodeName(node.Id),
463 ctx.LogI("tx", les, logMsg)
465 ctx.LogE("tx", les, err, logMsg)
470 br := bufio.NewReader(reader)
473 checksums := [][MTHSize]byte{}
475 lr := io.LimitReader(br, chunkSize)
476 path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
477 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
482 _, size, err := ctx.Tx(
485 io.TeeReader(lr, hsh),
497 logMsg := func(les LEs) string {
499 "File %s (%s) sent to %s:%s",
501 humanize.IBytes(uint64(size)),
502 ctx.NodeName(node.Id),
507 ctx.LogI("tx", les, logMsg)
509 ctx.LogE("tx", les, err, logMsg)
513 sizeFull += size - PktOverhead
514 var checksum [MTHSize]byte
515 hsh.Sum(checksum[:0])
516 checksums = append(checksums, checksum)
518 if size < chunkSize {
521 if _, err = br.Peek(1); err != nil {
526 metaPkt := ChunkedMeta{
527 Magic: MagicNNCPMv2.B,
528 FileSize: uint64(sizeFull),
529 ChunkSize: uint64(chunkSize),
530 Checksums: checksums,
533 _, err = xdr.Marshal(&buf, metaPkt)
537 path := dstPath + ChunkedSuffixMeta
538 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
542 metaPktSize := int64(buf.Len())
547 metaPktSize, minSize, maxSize,
556 {"Size", metaPktSize},
558 logMsg := func(les LEs) string {
560 "File %s (%s) sent to %s:%s",
562 humanize.IBytes(uint64(metaPktSize)),
563 ctx.NodeName(node.Id),
568 ctx.LogI("tx", les, logMsg)
570 ctx.LogE("tx", les, err, logMsg)
575 func (ctx *Ctx) TxFreq(
577 nice, replyNice uint8,
578 srcPath, dstPath string,
581 dstPath = filepath.Clean(dstPath)
582 if filepath.IsAbs(dstPath) {
583 return errors.New("Relative destination path required")
585 srcPath = filepath.Clean(srcPath)
586 if filepath.IsAbs(srcPath) {
587 return errors.New("Relative source path required")
589 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
593 src := strings.NewReader(dstPath)
594 size := int64(src.Len())
595 _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil)
600 {"ReplyNice", int(replyNice)},
604 logMsg := func(les LEs) string {
606 "File request from %s:%s to %s sent",
607 ctx.NodeName(node.Id), srcPath,
612 ctx.LogI("tx", les, logMsg)
614 ctx.LogE("tx", les, err, logMsg)
619 func (ctx *Ctx) TxExec(
621 nice, replyNice uint8,
625 minSize int64, maxSize int64,
629 path := make([][]byte, 0, 1+len(args))
630 path = append(path, []byte(handle))
631 for _, arg := range args {
632 path = append(path, []byte(arg))
634 pktType := PktTypeExec
636 pktType = PktTypeExecFat
638 pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
642 compressErr := make(chan error, 1)
645 compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault))
649 go func(r io.Reader) {
650 if _, err := io.Copy(compressor, r); err != nil {
654 compressErr <- compressor.Close()
659 _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId)
666 dst := strings.Join(append([]string{handle}, args...), " ")
671 {"ReplyNice", int(replyNice)},
675 logMsg := func(les LEs) string {
677 "Exec sent to %s@%s (%s)",
678 ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
682 ctx.LogI("tx", les, logMsg)
684 ctx.LogE("tx", les, err, logMsg)
689 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
696 logMsg := func(les LEs) string {
698 "Transitional packet to %s (%s) (nice %s)",
699 ctx.NodeName(node.Id),
700 humanize.IBytes(uint64(size)),
704 ctx.LogD("tx", les, logMsg)
705 if !ctx.IsEnoughSpace(size) {
706 err := errors.New("is not enough space")
707 ctx.LogE("tx", les, err, logMsg)
710 tmp, err := ctx.NewTmpFileWHash()
714 if _, err = CopyProgressed(
715 tmp.W, src, "Tx trns",
716 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
721 nodePath := filepath.Join(ctx.Spool, node.Id.String())
722 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
724 ctx.LogI("tx", les, logMsg)
726 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
728 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))