2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
35 xdr "github.com/davecgh/go-xdr/xdr2"
36 "github.com/klauspost/compress/zstd"
37 "golang.org/x/crypto/blake2b"
38 "golang.org/x/crypto/chacha20poly1305"
53 hops := make([]*Node, 0, 1+len(node.Via))
54 hops = append(hops, node)
56 for i := len(node.Via); i > 0; i-- {
57 lastNode = ctx.Neigh[*node.Via[i-1]]
58 hops = append(hops, lastNode)
61 for i := 0; i < len(hops); i++ {
62 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
64 padSize := minSize - expectedSize
68 if !ctx.IsEnoughSpace(size + padSize) {
69 return nil, errors.New("is not enough space")
71 tmp, err := ctx.NewTmpFileWHash()
76 errs := make(chan error)
78 pipeR, pipeW := io.Pipe()
79 go func(size int64, src io.Reader, dst io.WriteCloser) {
82 "nice": strconv.Itoa(int(nice)),
83 "size": strconv.FormatInt(size, 10),
85 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
87 }(curSize, src, pipeW)
88 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
90 var pipeRPrev io.Reader
91 for i := 1; i < len(hops); i++ {
92 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
94 pipeR, pipeW = io.Pipe()
95 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
98 "nice": strconv.Itoa(int(nice)),
99 "size": strconv.FormatInt(size, 10),
101 errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
103 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
104 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
107 _, err := io.Copy(tmp.W, pipeR)
110 for i := 0; i <= len(hops); i++ {
117 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
118 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
119 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
123 type DummyCloser struct{}
125 func (dc DummyCloser) Close() error { return nil }
127 func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) {
129 // Read content from stdin, saving to temporary file, encrypting
131 src, err := ioutil.TempFile("", "nncp-file")
136 os.Remove(src.Name())
137 tmpW := bufio.NewWriter(src)
138 tmpKey := make([]byte, chacha20poly1305.KeySize)
139 if _, rerr = rand.Read(tmpKey[:]); rerr != nil {
142 aead, err := chacha20poly1305.New(tmpKey)
147 nonce := make([]byte, aead.NonceSize())
148 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
153 fileSize = int64(written)
154 if err = tmpW.Flush(); err != nil {
157 src.Seek(0, io.SeekStart)
160 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
161 w.CloseWithError(err)
169 srcStat, err := os.Stat(srcPath)
174 mode := srcStat.Mode()
176 if mode.IsRegular() {
177 // It is regular file, just send it
178 src, err := os.Open(srcPath)
183 fileSize = srcStat.Size()
184 reader = bufio.NewReader(src)
190 rerr = errors.New("unsupported file type")
194 // It is directory, create PAX archive with its contents
196 basePath := filepath.Base(srcPath)
197 rootPath, err := filepath.Abs(srcPath)
207 dirs := make([]einfo, 0, 1<<10)
208 files := make([]einfo, 0, 1<<10)
209 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
214 // directory header, PAX record header+contents
215 fileSize += TarBlockSize + 2*TarBlockSize
216 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
218 // file header, PAX record header+contents, file content
219 fileSize += TarBlockSize + 2*TarBlockSize + info.Size()
220 if n := info.Size() % TarBlockSize; n != 0 {
221 fileSize += TarBlockSize - n // padding
223 files = append(files, einfo{
225 modTime: info.ModTime(),
237 closer = DummyCloser{}
238 fileSize += 2 * TarBlockSize // termination block
241 tarWr := tar.NewWriter(w)
243 Typeflag: tar.TypeDir,
245 PAXRecords: map[string]string{
246 "comment": "Autogenerated by " + VersionGet(),
248 Format: tar.FormatPAX,
250 for _, e := range dirs {
251 hdr.Name = basePath + e.path[len(rootPath):]
252 hdr.ModTime = e.modTime
253 if err = tarWr.WriteHeader(&hdr); err != nil {
254 w.CloseWithError(err)
257 hdr.Typeflag = tar.TypeReg
259 for _, e := range files {
260 hdr.Name = basePath + e.path[len(rootPath):]
261 hdr.ModTime = e.modTime
263 if err = tarWr.WriteHeader(&hdr); err != nil {
264 w.CloseWithError(err)
266 fd, err := os.Open(e.path)
268 w.CloseWithError(err)
270 _, err = io.Copy(tarWr, bufio.NewReader(fd))
272 w.CloseWithError(err)
282 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
283 dstPathSpecified := false
286 return errors.New("Must provide destination filename")
288 dstPath = filepath.Base(srcPath)
290 dstPathSpecified = true
292 dstPath = filepath.Clean(dstPath)
293 if filepath.IsAbs(dstPath) {
294 return errors.New("Relative destination path required")
296 reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
303 if archived && !dstPathSpecified {
306 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
310 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
314 "nice": strconv.Itoa(int(nice)),
317 "size": strconv.FormatInt(fileSize, 10),
320 ctx.LogI("tx", sds, "sent")
323 ctx.LogE("tx", sds, "sent")
328 func (ctx *Ctx) TxFileChunked(
331 srcPath, dstPath string,
335 dstPathSpecified := false
338 return errors.New("Must provide destination filename")
340 dstPath = filepath.Base(srcPath)
342 dstPathSpecified = true
344 dstPath = filepath.Clean(dstPath)
345 if filepath.IsAbs(dstPath) {
346 return errors.New("Relative destination path required")
348 reader, closer, fileSize, archived, err := prepareTxFile(srcPath)
355 if archived && !dstPathSpecified {
359 if fileSize <= chunkSize {
360 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
364 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
368 "nice": strconv.Itoa(int(nice)),
371 "size": strconv.FormatInt(fileSize, 10),
374 ctx.LogI("tx", sds, "sent")
377 ctx.LogE("tx", sds, "sent")
383 metaPkt := ChunkedMeta{
385 FileSize: uint64(fileSize),
386 ChunkSize: uint64(chunkSize),
387 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
389 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
391 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
399 if leftSize <= chunkSize {
400 sizeToSend = leftSize
402 sizeToSend = chunkSize
404 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
405 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
409 hsh, err = blake2b.New256(nil)
419 io.TeeReader(reader, hsh),
424 "nice": strconv.Itoa(int(nice)),
427 "size": strconv.FormatInt(sizeToSend, 10),
430 ctx.LogI("tx", sds, "sent")
433 ctx.LogE("tx", sds, "sent")
436 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
437 leftSize -= sizeToSend
443 var metaBuf bytes.Buffer
444 _, err = xdr.Marshal(&metaBuf, metaPkt)
448 path = dstPath + ChunkedSuffixMeta
449 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
453 metaPktSize := int64(metaBuf.Len())
454 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
458 "nice": strconv.Itoa(int(nice)),
461 "size": strconv.FormatInt(metaPktSize, 10),
464 ctx.LogI("tx", sds, "sent")
467 ctx.LogE("tx", sds, "sent")
472 func (ctx *Ctx) TxFreq(
474 nice, replyNice uint8,
475 srcPath, dstPath string,
476 minSize int64) error {
477 dstPath = filepath.Clean(dstPath)
478 if filepath.IsAbs(dstPath) {
479 return errors.New("Relative destination path required")
481 srcPath = filepath.Clean(srcPath)
482 if filepath.IsAbs(srcPath) {
483 return errors.New("Relative source path required")
485 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
489 src := strings.NewReader(dstPath)
490 size := int64(src.Len())
491 _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
495 "nice": strconv.Itoa(int(nice)),
496 "replynice": strconv.Itoa(int(replyNice)),
501 ctx.LogI("tx", sds, "sent")
504 ctx.LogE("tx", sds, "sent")
509 func (ctx *Ctx) TxExec(
511 nice, replyNice uint8,
517 path := make([][]byte, 0, 1+len(args))
518 path = append(path, []byte(handle))
519 for _, arg := range args {
520 path = append(path, []byte(arg))
522 pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
526 var compressed bytes.Buffer
527 compressor, err := zstd.NewWriter(
529 zstd.WithEncoderLevel(zstd.SpeedDefault),
534 _, err = io.Copy(compressor, in)
539 size := int64(compressed.Len())
540 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
544 "nice": strconv.Itoa(int(nice)),
545 "replynice": strconv.Itoa(int(replyNice)),
546 "dst": strings.Join(append([]string{handle}, args...), " "),
547 "size": strconv.FormatInt(size, 10),
550 ctx.LogI("tx", sds, "sent")
553 ctx.LogE("tx", sds, "sent")
558 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
562 "nice": strconv.Itoa(int(nice)),
563 "size": strconv.FormatInt(size, 10),
565 ctx.LogD("tx", sds, "taken")
566 if !ctx.IsEnoughSpace(size) {
567 err := errors.New("is not enough space")
568 ctx.LogE("tx", sds, err.Error())
571 tmp, err := ctx.NewTmpFileWHash()
575 if _, err = io.Copy(tmp.W, src); err != nil {
578 nodePath := filepath.Join(ctx.Spool, node.Id.String())
579 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
581 ctx.LogI("tx", sds, "sent")
584 ctx.LogI("tx", sds, "sent")
586 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))