2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
35 xdr "github.com/davecgh/go-xdr/xdr2"
36 "github.com/klauspost/compress/zstd"
37 "golang.org/x/crypto/blake2b"
38 "golang.org/x/crypto/chacha20poly1305"
53 tmp, err := ctx.NewTmpFileWHash()
57 hops := make([]*Node, 0, 1+len(node.Via))
58 hops = append(hops, node)
60 for i := len(node.Via); i > 0; i-- {
61 lastNode = ctx.Neigh[*node.Via[i-1]]
62 hops = append(hops, lastNode)
65 for i := 0; i < len(hops); i++ {
66 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
68 padSize := minSize - expectedSize
72 errs := make(chan error)
74 pipeR, pipeW := io.Pipe()
75 go func(size int64, src io.Reader, dst io.WriteCloser) {
78 "nice": strconv.Itoa(int(nice)),
79 "size": strconv.FormatInt(size, 10),
81 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
83 }(curSize, src, pipeW)
84 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
86 var pipeRPrev io.Reader
87 for i := 1; i < len(hops); i++ {
88 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
90 pipeR, pipeW = io.Pipe()
91 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
94 "nice": strconv.Itoa(int(nice)),
95 "size": strconv.FormatInt(size, 10),
97 errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
99 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
100 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
103 _, err := io.Copy(tmp.W, pipeR)
106 for i := 0; i <= len(hops); i++ {
113 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
114 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
115 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
119 type DummyCloser struct{}
121 func (dc DummyCloser) Close() error { return nil }
123 func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) {
125 // Read content from stdin, saving to temporary file, encrypting
127 src, err := ioutil.TempFile("", "nncp-file")
132 os.Remove(src.Name())
133 tmpW := bufio.NewWriter(src)
134 tmpKey := make([]byte, chacha20poly1305.KeySize)
135 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
138 aead, err := chacha20poly1305.New(tmpKey)
143 nonce := make([]byte, aead.NonceSize())
144 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
149 fileSize = int64(written)
150 if err = tmpW.Flush(); err != nil {
153 src.Seek(0, io.SeekStart)
156 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
157 w.CloseWithError(err)
165 srcStat, err := os.Stat(srcPath)
170 mode := srcStat.Mode()
172 if mode.IsRegular() {
173 // It is regular file, just send it
174 src, err := os.Open(srcPath)
179 fileSize = srcStat.Size()
180 reader = bufio.NewReader(src)
186 rerr = errors.New("unsupported file type")
190 // It is directory, create PAX archive with its contents
192 basePath := filepath.Base(srcPath)
193 rootPath, err := filepath.Abs(srcPath)
203 dirs := make([]einfo, 0, 1<<10)
204 files := make([]einfo, 0, 1<<10)
205 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
210 // directory header, PAX record header+contents
211 fileSize += TarBlockSize + 2*TarBlockSize
212 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
214 // file header, PAX record header+contents, file content
215 fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
216 if n := info.Size() % TarBlockSize; n != 0 {
217 fileSize += TarBlockSize - n // padding
219 files = append(files, einfo{
221 modTime: info.ModTime(),
233 closer = DummyCloser{}
234 fileSize += 2 * TarBlockSize // termination block
237 tarWr := tar.NewWriter(w)
239 Typeflag: tar.TypeDir,
241 PAXRecords: map[string]string{
242 "comment": "Autogenerated by " + VersionGet(),
244 Format: tar.FormatPAX,
246 for _, e := range dirs {
247 hdr.Name = basePath + e.path[len(rootPath):]
248 hdr.ModTime = e.modTime
249 if err = tarWr.WriteHeader(&hdr); err != nil {
250 w.CloseWithError(err)
253 hdr.Typeflag = tar.TypeReg
255 for _, e := range files {
256 hdr.Name = basePath + e.path[len(rootPath):]
257 hdr.ModTime = e.modTime
259 if err = tarWr.WriteHeader(&hdr); err != nil {
260 w.CloseWithError(err)
262 fd, err := os.Open(e.path)
264 w.CloseWithError(err)
266 _, err = io.Copy(tarWr, bufio.NewReader(fd))
268 w.CloseWithError(err)
278 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
279 dstPathSpecified := false
282 return errors.New("Must provide destination filename")
284 dstPath = filepath.Base(srcPath)
286 dstPathSpecified = true
288 dstPath = filepath.Clean(dstPath)
289 if filepath.IsAbs(dstPath) {
290 return errors.New("Relative destination path required")
292 reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
299 if archived && !dstPathSpecified {
302 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
306 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
310 "nice": strconv.Itoa(int(nice)),
313 "size": strconv.FormatInt(fileSize, 10),
316 ctx.LogI("tx", sds, "sent")
319 ctx.LogE("tx", sds, "sent")
324 func (ctx *Ctx) TxFileChunked(
327 srcPath, dstPath string,
331 dstPathSpecified := false
334 return errors.New("Must provide destination filename")
336 dstPath = filepath.Base(srcPath)
338 dstPathSpecified = true
340 dstPath = filepath.Clean(dstPath)
341 if filepath.IsAbs(dstPath) {
342 return errors.New("Relative destination path required")
344 reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
351 if archived && !dstPathSpecified {
355 if fileSize <= chunkSize {
356 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
360 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
364 "nice": strconv.Itoa(int(nice)),
367 "size": strconv.FormatInt(fileSize, 10),
370 ctx.LogI("tx", sds, "sent")
373 ctx.LogE("tx", sds, "sent")
379 metaPkt := ChunkedMeta{
381 FileSize: uint64(fileSize),
382 ChunkSize: uint64(chunkSize),
383 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
385 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
387 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
395 if leftSize <= chunkSize {
396 sizeToSend = leftSize
398 sizeToSend = chunkSize
400 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
401 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
405 hsh, err = blake2b.New256(nil)
415 io.TeeReader(reader, hsh),
420 "nice": strconv.Itoa(int(nice)),
423 "size": strconv.FormatInt(sizeToSend, 10),
426 ctx.LogI("tx", sds, "sent")
429 ctx.LogE("tx", sds, "sent")
432 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
433 leftSize -= sizeToSend
439 var metaBuf bytes.Buffer
440 _, err = xdr.Marshal(&metaBuf, metaPkt)
444 path = dstPath + ChunkedSuffixMeta
445 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
449 metaPktSize := int64(metaBuf.Len())
450 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
454 "nice": strconv.Itoa(int(nice)),
457 "size": strconv.FormatInt(metaPktSize, 10),
460 ctx.LogI("tx", sds, "sent")
463 ctx.LogE("tx", sds, "sent")
468 func (ctx *Ctx) TxFreq(
470 nice, replyNice uint8,
471 srcPath, dstPath string,
472 minSize int64) error {
473 dstPath = filepath.Clean(dstPath)
474 if filepath.IsAbs(dstPath) {
475 return errors.New("Relative destination path required")
477 srcPath = filepath.Clean(srcPath)
478 if filepath.IsAbs(srcPath) {
479 return errors.New("Relative source path required")
481 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
485 src := strings.NewReader(dstPath)
486 size := int64(src.Len())
487 _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
491 "nice": strconv.Itoa(int(nice)),
492 "replynice": strconv.Itoa(int(replyNice)),
497 ctx.LogI("tx", sds, "sent")
500 ctx.LogE("tx", sds, "sent")
505 func (ctx *Ctx) TxExec(
507 nice, replyNice uint8,
513 path := make([][]byte, 0, 1+len(args))
514 path = append(path, []byte(handle))
515 for _, arg := range args {
516 path = append(path, []byte(arg))
518 pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
522 var compressed bytes.Buffer
523 compressor, err := zstd.NewWriter(
525 zstd.WithEncoderLevel(zstd.SpeedDefault),
530 _, err = io.Copy(compressor, in)
535 size := int64(compressed.Len())
536 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
540 "nice": strconv.Itoa(int(nice)),
541 "replynice": strconv.Itoa(int(replyNice)),
542 "dst": strings.Join(append([]string{handle}, args...), " "),
543 "size": strconv.FormatInt(size, 10),
546 ctx.LogI("tx", sds, "sent")
549 ctx.LogE("tx", sds, "sent")
554 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
558 "nice": strconv.Itoa(int(nice)),
559 "size": strconv.FormatInt(size, 10),
561 ctx.LogD("tx", sds, "taken")
562 tmp, err := ctx.NewTmpFileWHash()
566 if _, err = io.Copy(tmp.W, src); err != nil {
569 nodePath := filepath.Join(ctx.Spool, node.Id.String())
570 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
572 ctx.LogI("tx", sds, "sent")
575 ctx.LogI("tx", sds, "sent")
577 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))