2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
33 xdr "github.com/davecgh/go-xdr/xdr2"
34 "github.com/dustin/go-humanize"
35 "github.com/klauspost/compress/zstd"
36 "golang.org/x/crypto/blake2b"
46 type PktEncWriteResult struct {
56 srcSize, minSize, maxSize int64,
60 ) (*Node, int64, error) {
63 area = ctx.AreaId2Area[*areaId]
65 return nil, 0, errors.New("area has no encryption keys")
68 hops := make([]*Node, 0, 1+len(node.Via))
69 hops = append(hops, node)
71 for i := len(node.Via); i > 0; i-- {
72 lastNode = ctx.Neigh[*node.Via[i-1]]
73 hops = append(hops, lastNode)
79 var expectedSize int64
81 expectedSize = srcSize + PktOverhead
82 expectedSize += sizePadCalc(expectedSize, minSize, wrappers)
83 expectedSize = PktEncOverhead + sizeWithTags(expectedSize)
84 if maxSize != 0 && expectedSize > maxSize {
87 if !ctx.IsEnoughSpace(expectedSize) {
88 return nil, 0, errors.New("is not enough space")
91 tmp, err := ctx.NewTmpFileWHash()
96 results := make(chan PktEncWriteResult)
97 pipeR, pipeW := io.Pipe()
98 var pipeRPrev io.Reader
100 go func(src io.Reader, dst io.WriteCloser) {
102 {"Node", hops[0].Id},
104 {"Size", expectedSize},
105 }, func(les LEs) string {
107 "Tx packet to %s (source %s) nice: %s",
108 ctx.NodeName(hops[0].Id),
109 humanize.IBytes(uint64(expectedSize)),
113 pktEncRaw, size, err := PktEncWrite(
114 ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst,
116 results <- PktEncWriteResult{pktEncRaw, size, err}
120 go func(src io.Reader, dst io.WriteCloser) {
124 {"Size", expectedSize},
125 }, func(les LEs) string {
127 "Tx area packet to %s (source %s) nice: %s",
128 ctx.AreaName(areaId),
129 humanize.IBytes(uint64(expectedSize)),
133 areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
134 copy(areaNode.Id[:], area.Id[:])
135 copy(areaNode.ExchPub[:], area.Pub[:])
136 pktEncRaw, size, err := PktEncWrite(
137 ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst,
139 results <- PktEncWriteResult{pktEncRaw, size, err}
143 pipeR, pipeW = io.Pipe()
144 go func(src io.Reader, dst io.WriteCloser) {
145 pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
150 {"Node", hops[0].Id},
152 {"Size", expectedSize},
153 }, func(les LEs) string {
155 "Tx packet to %s (source %s) nice: %s",
156 ctx.NodeName(hops[0].Id),
157 humanize.IBytes(uint64(expectedSize)),
161 pktEncRaw, size, err := PktEncWrite(
162 ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst,
164 results <- PktEncWriteResult{pktEncRaw, size, err}
168 for i := 1; i < len(hops); i++ {
169 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
174 pipeR, pipeW = io.Pipe()
175 go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) {
179 }, func(les LEs) string {
181 "Tx trns packet to %s nice: %s",
182 ctx.NodeName(node.Id),
186 pktEncRaw, size, err := PktEncWrite(
187 ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst,
189 results <- PktEncWriteResult{pktEncRaw, size, err}
191 }(hops[i], pktTrns, pipeRPrev, pipeW)
194 _, err := CopyProgressed(
196 LEs{{"Pkt", pktName}, {"FullSize", expectedSize}},
199 results <- PktEncWriteResult{err: err}
204 pktEncMsg = (<-results).pktEncRaw
207 for i := 0; i <= wrappers; i++ {
213 if r.pktEncRaw != nil {
215 pktEncRaw = r.pktEncRaw
218 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
219 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
220 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
222 return lastNode, 0, err
225 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
228 msgHashRaw := blake2b.Sum256(pktEncMsg)
229 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
230 seenDir := filepath.Join(
231 ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
233 seenPath := filepath.Join(seenDir, msgHash)
237 {"Size", expectedSize},
239 {"AreaMsg", msgHash},
241 logMsg := func(les LEs) string {
243 "Tx area packet to %s (source %s) nice: %s, area %s: %s",
244 ctx.NodeName(node.Id),
245 humanize.IBytes(uint64(expectedSize)),
251 if err = ensureDir(seenDir); err != nil {
252 ctx.LogE("tx-mkdir", les, err, logMsg)
253 return lastNode, 0, err
255 if fd, err := os.Create(seenPath); err == nil {
257 if err = DirSync(seenDir); err != nil {
258 ctx.LogE("tx-dirsync", les, err, logMsg)
259 return lastNode, 0, err
262 ctx.LogI("tx-area", les, logMsg)
264 return lastNode, finalSize, err
267 type DummyCloser struct{}
269 func (dc DummyCloser) Close() error { return nil }
271 func prepareTxFile(srcPath string) (
284 srcStat, err := os.Stat(srcPath)
289 mode := srcStat.Mode()
291 if mode.IsRegular() {
292 // It is regular file, just send it
293 src, err := os.Open(srcPath)
300 srcSize = srcStat.Size()
305 rerr = errors.New("unsupported file type")
309 // It is directory, create PAX archive with its contents
311 basePath := filepath.Base(srcPath)
312 rootPath, err := filepath.Abs(srcPath)
322 dirs := make([]einfo, 0, 1<<10)
323 files := make([]einfo, 0, 1<<10)
324 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
329 // directory header, PAX record header+contents
330 srcSize += TarBlockSize + 2*TarBlockSize
331 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
333 // file header, PAX record header+contents, file content
334 srcSize += TarBlockSize + 2*TarBlockSize + info.Size()
335 if n := info.Size() % TarBlockSize; n != 0 {
336 srcSize += TarBlockSize - n // padding
338 files = append(files, einfo{
340 modTime: info.ModTime(),
352 closer = DummyCloser{}
353 srcSize += 2 * TarBlockSize // termination block
356 tarWr := tar.NewWriter(w)
358 Typeflag: tar.TypeDir,
360 PAXRecords: map[string]string{
361 "comment": "Autogenerated by " + VersionGet(),
363 Format: tar.FormatPAX,
365 for _, e := range dirs {
366 hdr.Name = basePath + e.path[len(rootPath):]
367 hdr.ModTime = e.modTime
368 if err = tarWr.WriteHeader(&hdr); err != nil {
369 return w.CloseWithError(err)
372 hdr.Typeflag = tar.TypeReg
374 for _, e := range files {
375 hdr.Name = basePath + e.path[len(rootPath):]
376 hdr.ModTime = e.modTime
378 if err = tarWr.WriteHeader(&hdr); err != nil {
379 return w.CloseWithError(err)
381 fd, err := os.Open(e.path)
384 return w.CloseWithError(err)
386 if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
388 return w.CloseWithError(err)
392 if err = tarWr.Close(); err != nil {
393 return w.CloseWithError(err)
400 func (ctx *Ctx) TxFile(
403 srcPath, dstPath string,
404 chunkSize, minSize, maxSize int64,
407 dstPathSpecified := false
410 return errors.New("Must provide destination filename")
412 dstPath = filepath.Base(srcPath)
414 dstPathSpecified = true
416 dstPath = filepath.Clean(dstPath)
417 if filepath.IsAbs(dstPath) {
418 return errors.New("Relative destination path required")
420 reader, closer, srcSize, archived, err := prepareTxFile(srcPath)
427 if archived && !dstPathSpecified {
431 if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) {
432 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
436 _, finalSize, err := ctx.Tx(
438 srcSize, minSize, maxSize,
439 bufio.NewReader(reader), dstPath, areaId,
449 logMsg := func(les LEs) string {
451 "File %s (%s) sent to %s:%s",
453 humanize.IBytes(uint64(finalSize)),
454 ctx.NodeName(node.Id),
459 ctx.LogI("tx", les, logMsg)
461 ctx.LogE("tx", les, err, logMsg)
466 br := bufio.NewReader(reader)
469 checksums := [][MTHSize]byte{}
471 lr := io.LimitReader(br, chunkSize)
472 path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
473 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
478 _, size, err := ctx.Tx(
481 io.TeeReader(lr, hsh),
493 logMsg := func(les LEs) string {
495 "File %s (%s) sent to %s:%s",
497 humanize.IBytes(uint64(size)),
498 ctx.NodeName(node.Id),
503 ctx.LogI("tx", les, logMsg)
505 ctx.LogE("tx", les, err, logMsg)
509 sizeFull += size - PktOverhead
510 var checksum [MTHSize]byte
511 hsh.Sum(checksum[:0])
512 checksums = append(checksums, checksum)
514 if size < chunkSize {
517 if _, err = br.Peek(1); err != nil {
522 metaPkt := ChunkedMeta{
523 Magic: MagicNNCPMv2.B,
524 FileSize: uint64(sizeFull),
525 ChunkSize: uint64(chunkSize),
526 Checksums: checksums,
529 _, err = xdr.Marshal(&buf, metaPkt)
533 path := dstPath + ChunkedSuffixMeta
534 pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
538 metaPktSize := int64(buf.Len())
543 metaPktSize, minSize, maxSize,
552 {"Size", metaPktSize},
554 logMsg := func(les LEs) string {
556 "File %s (%s) sent to %s:%s",
558 humanize.IBytes(uint64(metaPktSize)),
559 ctx.NodeName(node.Id),
564 ctx.LogI("tx", les, logMsg)
566 ctx.LogE("tx", les, err, logMsg)
571 func (ctx *Ctx) TxFreq(
573 nice, replyNice uint8,
574 srcPath, dstPath string,
577 dstPath = filepath.Clean(dstPath)
578 if filepath.IsAbs(dstPath) {
579 return errors.New("Relative destination path required")
581 srcPath = filepath.Clean(srcPath)
582 if filepath.IsAbs(srcPath) {
583 return errors.New("Relative source path required")
585 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
589 src := strings.NewReader(dstPath)
590 size := int64(src.Len())
591 _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil)
596 {"ReplyNice", int(replyNice)},
600 logMsg := func(les LEs) string {
602 "File request from %s:%s to %s sent",
603 ctx.NodeName(node.Id), srcPath,
608 ctx.LogI("tx", les, logMsg)
610 ctx.LogE("tx", les, err, logMsg)
615 func (ctx *Ctx) TxExec(
617 nice, replyNice uint8,
621 minSize int64, maxSize int64,
625 path := make([][]byte, 0, 1+len(args))
626 path = append(path, []byte(handle))
627 for _, arg := range args {
628 path = append(path, []byte(arg))
630 pktType := PktTypeExec
632 pktType = PktTypeExecFat
634 pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
638 compressErr := make(chan error, 1)
641 compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault))
645 go func(r io.Reader) {
646 if _, err := io.Copy(compressor, r); err != nil {
650 compressErr <- compressor.Close()
655 _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId)
662 dst := strings.Join(append([]string{handle}, args...), " ")
667 {"ReplyNice", int(replyNice)},
671 logMsg := func(les LEs) string {
673 "Exec sent to %s@%s (%s)",
674 ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
678 ctx.LogI("tx", les, logMsg)
680 ctx.LogE("tx", les, err, logMsg)
685 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
692 logMsg := func(les LEs) string {
694 "Transitional packet to %s (%s) (nice %s)",
695 ctx.NodeName(node.Id),
696 humanize.IBytes(uint64(size)),
700 ctx.LogD("tx", les, logMsg)
701 if !ctx.IsEnoughSpace(size) {
702 err := errors.New("is not enough space")
703 ctx.LogE("tx", les, err, logMsg)
706 tmp, err := ctx.NewTmpFileWHash()
710 if _, err = CopyProgressed(
711 tmp.W, src, "Tx trns",
712 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
717 nodePath := filepath.Join(ctx.Spool, node.Id.String())
718 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
720 ctx.LogI("tx", les, logMsg)
722 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
724 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))