2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
36 xdr "github.com/davecgh/go-xdr/xdr2"
37 "github.com/dustin/go-humanize"
38 "github.com/klauspost/compress/zstd"
39 "golang.org/x/crypto/blake2b"
40 "golang.org/x/crypto/chacha20poly1305"
61 area = ctx.AreaId2Area[*areaId]
63 return nil, errors.New("unknown area id")
66 hops := make([]*Node, 0, 1+len(node.Via))
67 hops = append(hops, node)
69 for i := len(node.Via); i > 0; i-- {
70 lastNode = ctx.Neigh[*node.Via[i-1]]
71 hops = append(hops, lastNode)
78 for i := 0; i < wrappers; i++ {
79 expectedSize = PktEncOverhead +
81 sizeWithTags(PktOverhead+expectedSize)
83 padSize := minSize - expectedSize
87 if !ctx.IsEnoughSpace(size + padSize) {
88 return nil, errors.New("is not enough space")
90 tmp, err := ctx.NewTmpFileWHash()
95 errs := make(chan error)
96 pktEncRaws := make(chan []byte)
98 pipeR, pipeW := io.Pipe()
99 var pipeRPrev io.Reader
101 go func(size int64, src io.Reader, dst io.WriteCloser) {
103 {"Node", hops[0].Id},
106 }, func(les LEs) string {
108 "Tx packet to %s (%s) nice: %s",
109 ctx.NodeName(hops[0].Id),
110 humanize.IBytes(uint64(size)),
114 pktEncRaw, err := PktEncWrite(
115 ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
117 pktEncRaws <- pktEncRaw
119 dst.Close() // #nosec G104
120 }(curSize, src, pipeW)
121 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
124 go func(size, padSize int64, src io.Reader, dst io.WriteCloser) {
129 }, func(les LEs) string {
131 "Tx area packet to %s (%s) nice: %s",
132 ctx.AreaName(areaId),
133 humanize.IBytes(uint64(size)),
137 areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
138 copy(areaNode.Id[:], area.Id[:])
139 copy(areaNode.ExchPub[:], area.Pub[:])
140 pktEncRaw, err := PktEncWrite(
141 ctx.Self, &areaNode, pkt, nice, size, padSize, src, dst,
143 pktEncRaws <- pktEncRaw
145 dst.Close() // #nosec G104
146 }(curSize, padSize, src, pipeW)
147 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
150 pipeR, pipeW = io.Pipe()
151 go func(size int64, src io.Reader, dst io.WriteCloser) {
152 pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
157 {"Node", hops[0].Id},
160 }, func(les LEs) string {
162 "Tx packet to %s (%s) nice: %s",
163 ctx.NodeName(hops[0].Id),
164 humanize.IBytes(uint64(size)),
168 pktEncRaw, err := PktEncWrite(
169 ctx.Self, hops[0], pktArea, nice, size, 0, src, dst,
171 pktEncRaws <- pktEncRaw
173 dst.Close() // #nosec G104
174 }(curSize, pipeRPrev, pipeW)
175 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
177 for i := 1; i < len(hops); i++ {
178 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
183 pipeR, pipeW = io.Pipe()
184 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
189 }, func(les LEs) string {
191 "Tx trns packet to %s (%s) nice: %s",
192 ctx.NodeName(node.Id),
193 humanize.IBytes(uint64(size)),
197 pktEncRaw, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
198 pktEncRaws <- pktEncRaw
200 dst.Close() // #nosec G104
201 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
202 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
205 _, err := CopyProgressed(
207 LEs{{"Pkt", pktName}, {"FullSize", curSize}},
215 pktEncMsg = <-pktEncRaws
217 for i := 0; i < len(hops); i++ {
218 pktEncRaw = <-pktEncRaws
220 for i := 0; i <= wrappers; i++ {
223 tmp.Fd.Close() // #nosec G104
227 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
228 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
229 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
234 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
237 msgHashRaw := blake2b.Sum256(pktEncMsg)
238 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
239 seenDir := filepath.Join(
240 ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
242 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
248 {"AreaMsg", msgHash},
250 logMsg := func(les LEs) string {
252 "Tx area packet to %s (%s) nice: %s, area %s: %s",
253 ctx.NodeName(node.Id),
254 humanize.IBytes(uint64(size)),
260 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
261 ctx.LogE("tx-mkdir", les, err, logMsg)
264 if fd, err := os.Create(seenPath); err == nil {
266 if err = DirSync(seenDir); err != nil {
267 ctx.LogE("tx-dirsync", les, err, logMsg)
271 ctx.LogI("tx-area", les, logMsg)
276 type DummyCloser struct{}
278 func (dc DummyCloser) Close() error { return nil }
280 func throughTmpFile(r io.Reader) (
286 src, err := ioutil.TempFile("", "nncp-file")
291 os.Remove(src.Name()) // #nosec G104
292 tmpW := bufio.NewWriter(src)
293 tmpKey := make([]byte, chacha20poly1305.KeySize)
294 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
297 aead, err := chacha20poly1305.New(tmpKey)
302 nonce := make([]byte, aead.NonceSize())
303 written, err := aeadProcess(aead, nonce, nil, true, r, tmpW)
308 fileSize = int64(written)
309 if err = tmpW.Flush(); err != nil {
313 if _, err = src.Seek(0, io.SeekStart); err != nil {
319 for i := 0; i < aead.NonceSize(); i++ {
322 if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil {
323 w.CloseWithError(err) // #nosec G104
331 func prepareTxFile(srcPath string) (
339 reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin))
343 srcStat, err := os.Stat(srcPath)
348 mode := srcStat.Mode()
350 if mode.IsRegular() {
351 // It is regular file, just send it
352 src, err := os.Open(srcPath)
357 fileSize = srcStat.Size()
358 reader = bufio.NewReader(src)
364 rerr = errors.New("unsupported file type")
368 // It is directory, create PAX archive with its contents
370 basePath := filepath.Base(srcPath)
371 rootPath, err := filepath.Abs(srcPath)
381 dirs := make([]einfo, 0, 1<<10)
382 files := make([]einfo, 0, 1<<10)
383 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
388 // directory header, PAX record header+contents
389 fileSize += TarBlockSize + 2*TarBlockSize
390 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
392 // file header, PAX record header+contents, file content
393 fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
394 if n := info.Size() % TarBlockSize; n != 0 {
395 fileSize += TarBlockSize - n // padding
397 files = append(files, einfo{
399 modTime: info.ModTime(),
411 closer = DummyCloser{}
412 fileSize += 2 * TarBlockSize // termination block
415 tarWr := tar.NewWriter(w)
417 Typeflag: tar.TypeDir,
419 PAXRecords: map[string]string{
420 "comment": "Autogenerated by " + VersionGet(),
422 Format: tar.FormatPAX,
424 for _, e := range dirs {
425 hdr.Name = basePath + e.path[len(rootPath):]
426 hdr.ModTime = e.modTime
427 if err = tarWr.WriteHeader(&hdr); err != nil {
428 return w.CloseWithError(err)
431 hdr.Typeflag = tar.TypeReg
433 for _, e := range files {
434 hdr.Name = basePath + e.path[len(rootPath):]
435 hdr.ModTime = e.modTime
437 if err = tarWr.WriteHeader(&hdr); err != nil {
438 return w.CloseWithError(err)
440 fd, err := os.Open(e.path)
442 fd.Close() // #nosec G104
443 return w.CloseWithError(err)
445 if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
446 fd.Close() // #nosec G104
447 return w.CloseWithError(err)
449 fd.Close() // #nosec G104
451 if err = tarWr.Close(); err != nil {
452 return w.CloseWithError(err)
459 func (ctx *Ctx) TxFile(
462 srcPath, dstPath string,
464 minSize, maxSize int64,
467 dstPathSpecified := false
470 return errors.New("Must provide destination filename")
472 dstPath = filepath.Base(srcPath)
474 dstPathSpecified = true
476 dstPath = filepath.Clean(dstPath)
477 if filepath.IsAbs(dstPath) {
478 return errors.New("Relative destination path required")
480 reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
487 if fileSize > maxSize {
488 return errors.New("Too big than allowed")
490 if archived && !dstPathSpecified {
494 if fileSize <= chunkSize {
495 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
499 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath, areaId)
508 logMsg := func(les LEs) string {
510 "File %s (%s) sent to %s:%s",
512 humanize.IBytes(uint64(fileSize)),
513 ctx.NodeName(node.Id),
518 ctx.LogI("tx", les, logMsg)
520 ctx.LogE("tx", les, err, logMsg)
526 metaPkt := ChunkedMeta{
527 Magic: MagicNNCPMv2.B,
528 FileSize: uint64(fileSize),
529 ChunkSize: uint64(chunkSize),
530 Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1),
532 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
533 hsh := new([MTHSize]byte)
534 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
542 if leftSize <= chunkSize {
543 sizeToSend = leftSize
545 sizeToSend = chunkSize
547 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
548 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
559 io.TeeReader(reader, hsh),
569 {"Size", sizeToSend},
571 logMsg := func(les LEs) string {
573 "File %s (%s) sent to %s:%s",
575 humanize.IBytes(uint64(sizeToSend)),
576 ctx.NodeName(node.Id),
581 ctx.LogI("tx", les, logMsg)
583 ctx.LogE("tx", les, err, logMsg)
586 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
587 leftSize -= sizeToSend
593 var metaBuf bytes.Buffer
594 _, err = xdr.Marshal(&metaBuf, metaPkt)
598 path = dstPath + ChunkedSuffixMeta
599 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
603 metaPktSize := int64(metaBuf.Len())
604 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path, areaId)
611 {"Size", metaPktSize},
613 logMsg := func(les LEs) string {
615 "File %s (%s) sent to %s:%s",
617 humanize.IBytes(uint64(metaPktSize)),
618 ctx.NodeName(node.Id),
623 ctx.LogI("tx", les, logMsg)
625 ctx.LogE("tx", les, err, logMsg)
630 func (ctx *Ctx) TxFreq(
632 nice, replyNice uint8,
633 srcPath, dstPath string,
634 minSize int64) error {
635 dstPath = filepath.Clean(dstPath)
636 if filepath.IsAbs(dstPath) {
637 return errors.New("Relative destination path required")
639 srcPath = filepath.Clean(srcPath)
640 if filepath.IsAbs(srcPath) {
641 return errors.New("Relative source path required")
643 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
647 src := strings.NewReader(dstPath)
648 size := int64(src.Len())
649 _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath, nil)
654 {"ReplyNice", int(replyNice)},
658 logMsg := func(les LEs) string {
660 "File request from %s:%s to %s sent",
661 ctx.NodeName(node.Id), srcPath,
666 ctx.LogI("tx", les, logMsg)
668 ctx.LogE("tx", les, err, logMsg)
673 func (ctx *Ctx) TxExec(
675 nice, replyNice uint8,
684 path := make([][]byte, 0, 1+len(args))
685 path = append(path, []byte(handle))
686 for _, arg := range args {
687 path = append(path, []byte(arg))
689 pktType := PktTypeExec
691 pktType = PktTypeExecFat
693 pkt, rerr := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
699 if !noCompress && !useTmp {
700 var compressed bytes.Buffer
701 compressor, err := zstd.NewWriter(
703 zstd.WithEncoderLevel(zstd.SpeedDefault),
708 if _, err = io.Copy(compressor, in); err != nil {
709 compressor.Close() // #nosec G104
712 if err = compressor.Close(); err != nil {
715 size = int64(compressed.Len())
716 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle, areaId)
718 if noCompress && !useTmp {
719 var data bytes.Buffer
720 if _, err := io.Copy(&data, in); err != nil {
723 size = int64(data.Len())
724 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &data, handle, areaId)
726 if !noCompress && useTmp {
728 compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault))
732 copyErr := make(chan error)
734 _, err := io.Copy(compressor, in)
736 compressor.Close() // #nosec G104
739 err = compressor.Close()
743 tmpReader, closer, fileSize, err := throughTmpFile(r)
755 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId)
757 if noCompress && useTmp {
758 tmpReader, closer, fileSize, err := throughTmpFile(in)
766 _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId)
769 dst := strings.Join(append([]string{handle}, args...), " ")
774 {"ReplyNice", int(replyNice)},
778 logMsg := func(les LEs) string {
780 "Exec sent to %s@%s (%s)",
781 ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
785 ctx.LogI("tx", les, logMsg)
787 ctx.LogE("tx", les, rerr, logMsg)
792 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
799 logMsg := func(les LEs) string {
801 "Transitional packet to %s (%s) (nice %s)",
802 ctx.NodeName(node.Id),
803 humanize.IBytes(uint64(size)),
807 ctx.LogD("tx", les, logMsg)
808 if !ctx.IsEnoughSpace(size) {
809 err := errors.New("is not enough space")
810 ctx.LogE("tx", les, err, logMsg)
813 tmp, err := ctx.NewTmpFileWHash()
817 if _, err = CopyProgressed(
818 tmp.W, src, "Tx trns",
819 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
824 nodePath := filepath.Join(ctx.Spool, node.Id.String())
825 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
827 ctx.LogI("tx", les, logMsg)
829 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
831 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104