2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
35 xdr "github.com/davecgh/go-xdr/xdr2"
36 "github.com/klauspost/compress/zstd"
37 "golang.org/x/crypto/blake2b"
38 "golang.org/x/crypto/chacha20poly1305"
56 hops := make([]*Node, 0, 1+len(node.Via))
57 hops = append(hops, node)
59 for i := len(node.Via); i > 0; i-- {
60 lastNode = ctx.Neigh[*node.Via[i-1]]
61 hops = append(hops, lastNode)
64 for i := 0; i < len(hops); i++ {
65 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
67 padSize := minSize - expectedSize
71 if !ctx.IsEnoughSpace(size + padSize) {
72 return nil, errors.New("is not enough space")
74 tmp, err := ctx.NewTmpFileWHash()
79 errs := make(chan error)
81 pipeR, pipeW := io.Pipe()
82 go func(size int64, src io.Reader, dst io.WriteCloser) {
88 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
89 dst.Close() // #nosec G104
90 }(curSize, src, pipeW)
91 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
93 var pipeRPrev io.Reader
94 for i := 1; i < len(hops); i++ {
95 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
97 pipeR, pipeW = io.Pipe()
98 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
104 errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
105 dst.Close() // #nosec G104
106 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
107 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
110 _, err := CopyProgressed(
112 SDS{"pkt": pktName, "fullsize": curSize},
117 for i := 0; i <= len(hops); i++ {
120 tmp.Fd.Close() // #nosec G104
124 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
125 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
126 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
130 type DummyCloser struct{}
132 func (dc DummyCloser) Close() error { return nil }
134 func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) {
136 // Read content from stdin, saving to temporary file, encrypting
138 src, err := ioutil.TempFile("", "nncp-file")
143 os.Remove(src.Name()) // #nosec G104
144 tmpW := bufio.NewWriter(src)
145 tmpKey := make([]byte, chacha20poly1305.KeySize)
146 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
149 aead, err := chacha20poly1305.New(tmpKey)
154 nonce := make([]byte, aead.NonceSize())
155 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
160 fileSize = int64(written)
161 if err = tmpW.Flush(); err != nil {
165 if _, err = src.Seek(0, io.SeekStart); err != nil {
171 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
172 w.CloseWithError(err) // #nosec G104
180 srcStat, err := os.Stat(srcPath)
185 mode := srcStat.Mode()
187 if mode.IsRegular() {
188 // It is regular file, just send it
189 src, err := os.Open(srcPath)
194 fileSize = srcStat.Size()
195 reader = bufio.NewReader(src)
201 rerr = errors.New("unsupported file type")
205 // It is directory, create PAX archive with its contents
207 basePath := filepath.Base(srcPath)
208 rootPath, err := filepath.Abs(srcPath)
218 dirs := make([]einfo, 0, 1<<10)
219 files := make([]einfo, 0, 1<<10)
220 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
225 // directory header, PAX record header+contents
226 fileSize += TarBlockSize + 2*TarBlockSize
227 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
229 // file header, PAX record header+contents, file content
230 fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
231 if n := info.Size() % TarBlockSize; n != 0 {
232 fileSize += TarBlockSize - n // padding
234 files = append(files, einfo{
236 modTime: info.ModTime(),
248 closer = DummyCloser{}
249 fileSize += 2 * TarBlockSize // termination block
252 tarWr := tar.NewWriter(w)
254 Typeflag: tar.TypeDir,
256 PAXRecords: map[string]string{
257 "comment": "Autogenerated by " + VersionGet(),
259 Format: tar.FormatPAX,
261 for _, e := range dirs {
262 hdr.Name = basePath + e.path[len(rootPath):]
263 hdr.ModTime = e.modTime
264 if err = tarWr.WriteHeader(&hdr); err != nil {
265 return w.CloseWithError(err)
268 hdr.Typeflag = tar.TypeReg
270 for _, e := range files {
271 hdr.Name = basePath + e.path[len(rootPath):]
272 hdr.ModTime = e.modTime
274 if err = tarWr.WriteHeader(&hdr); err != nil {
275 return w.CloseWithError(err)
277 fd, err := os.Open(e.path)
279 fd.Close() // #nosec G104
280 return w.CloseWithError(err)
282 if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
283 fd.Close() // #nosec G104
284 return w.CloseWithError(err)
286 fd.Close() // #nosec G104
288 if err = tarWr.Close(); err != nil {
289 return w.CloseWithError(err)
296 func (ctx *Ctx) TxFile(
299 srcPath, dstPath string,
301 minSize, maxSize int64,
303 dstPathSpecified := false
306 return errors.New("Must provide destination filename")
308 dstPath = filepath.Base(srcPath)
310 dstPathSpecified = true
312 dstPath = filepath.Clean(dstPath)
313 if filepath.IsAbs(dstPath) {
314 return errors.New("Relative destination path required")
316 reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
323 if fileSize > maxSize {
324 return errors.New("Too big than allowed")
326 if archived && !dstPathSpecified {
330 if fileSize <= chunkSize {
331 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
335 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
345 ctx.LogI("tx", sds, "sent")
347 ctx.LogE("tx", sds, err, "sent")
353 metaPkt := ChunkedMeta{
355 FileSize: uint64(fileSize),
356 ChunkSize: uint64(chunkSize),
357 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
359 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
361 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
369 if leftSize <= chunkSize {
370 sizeToSend = leftSize
372 sizeToSend = chunkSize
374 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
375 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
379 hsh, err = blake2b.New256(nil)
389 io.TeeReader(reader, hsh),
401 ctx.LogI("tx", sds, "sent")
403 ctx.LogE("tx", sds, err, "sent")
406 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
407 leftSize -= sizeToSend
413 var metaBuf bytes.Buffer
414 _, err = xdr.Marshal(&metaBuf, metaPkt)
418 path = dstPath + ChunkedSuffixMeta
419 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
423 metaPktSize := int64(metaBuf.Len())
424 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
434 ctx.LogI("tx", sds, "sent")
436 ctx.LogE("tx", sds, err, "sent")
441 func (ctx *Ctx) TxFreq(
443 nice, replyNice uint8,
444 srcPath, dstPath string,
445 minSize int64) error {
446 dstPath = filepath.Clean(dstPath)
447 if filepath.IsAbs(dstPath) {
448 return errors.New("Relative destination path required")
450 srcPath = filepath.Clean(srcPath)
451 if filepath.IsAbs(srcPath) {
452 return errors.New("Relative source path required")
454 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
458 src := strings.NewReader(dstPath)
459 size := int64(src.Len())
460 _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
465 "replynice": int(replyNice),
470 ctx.LogI("tx", sds, "sent")
472 ctx.LogE("tx", sds, err, "sent")
477 func (ctx *Ctx) TxExec(
479 nice, replyNice uint8,
485 path := make([][]byte, 0, 1+len(args))
486 path = append(path, []byte(handle))
487 for _, arg := range args {
488 path = append(path, []byte(arg))
490 pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
494 var compressed bytes.Buffer
495 compressor, err := zstd.NewWriter(
497 zstd.WithEncoderLevel(zstd.SpeedDefault),
502 if _, err = io.Copy(compressor, in); err != nil {
503 compressor.Close() // #nosec G104
506 if err = compressor.Close(); err != nil {
509 size := int64(compressed.Len())
510 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle)
515 "replynice": int(replyNice),
516 "dst": strings.Join(append([]string{handle}, args...), " "),
520 ctx.LogI("tx", sds, "sent")
522 ctx.LogE("tx", sds, err, "sent")
527 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
534 ctx.LogD("tx", sds, "taken")
535 if !ctx.IsEnoughSpace(size) {
536 err := errors.New("is not enough space")
537 ctx.LogE("tx", sds, err, err.Error())
540 tmp, err := ctx.NewTmpFileWHash()
544 if _, err = CopyProgressed(
545 tmp.W, src, "Tx trns",
546 SDS{"pkt": node.Id.String(), "fullsize": size},
551 nodePath := filepath.Join(ctx.Spool, node.Id.String())
552 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
554 ctx.LogI("tx", sds, "sent")
556 ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent")
558 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104