2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
33 "github.com/davecgh/go-xdr/xdr2"
34 "github.com/klauspost/compress/zstd"
35 "golang.org/x/crypto/blake2b"
36 "golang.org/x/crypto/chacha20poly1305"
46 tmp, err := ctx.NewTmpFileWHash()
50 hops := make([]*Node, 0, 1+len(node.Via))
51 hops = append(hops, node)
53 for i := len(node.Via); i > 0; i-- {
54 lastNode = ctx.Neigh[*node.Via[i-1]]
55 hops = append(hops, lastNode)
58 for i := 0; i < len(hops); i++ {
59 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
61 padSize := minSize - expectedSize
65 errs := make(chan error)
67 pipeR, pipeW := io.Pipe()
68 go func(size int64, src io.Reader, dst io.WriteCloser) {
71 "nice": strconv.Itoa(int(nice)),
72 "size": strconv.FormatInt(size, 10),
74 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
76 }(curSize, src, pipeW)
77 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
79 var pipeRPrev io.Reader
80 for i := 1; i < len(hops); i++ {
81 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
83 pipeR, pipeW = io.Pipe()
84 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
87 "nice": strconv.Itoa(int(nice)),
88 "size": strconv.FormatInt(size, 10),
90 errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
92 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
93 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
96 _, err := io.Copy(tmp.W, pipeR)
99 for i := 0; i <= len(hops); i++ {
106 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
107 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
108 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
112 func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) {
118 src, err = ioutil.TempFile("", "nncp-file")
120 return nil, nil, 0, err
122 os.Remove(src.Name())
123 tmpW := bufio.NewWriter(src)
124 tmpKey := make([]byte, chacha20poly1305.KeySize)
125 if _, err = rand.Read(tmpKey[:]); err != nil {
126 return nil, nil, 0, err
128 aead, err := chacha20poly1305.New(tmpKey)
130 return nil, nil, 0, err
132 nonce := make([]byte, aead.NonceSize())
133 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
135 return nil, nil, 0, err
137 fileSize = int64(written)
138 if err = tmpW.Flush(); err != nil {
139 return nil, nil, 0, err
141 src.Seek(0, io.SeekStart)
144 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
150 src, err = os.Open(srcPath)
152 return nil, nil, 0, err
154 srcStat, err := src.Stat()
156 return nil, nil, 0, err
158 fileSize = srcStat.Size()
159 reader = bufio.NewReader(src)
161 return reader, src, fileSize, nil
164 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
167 return errors.New("Must provide destination filename")
169 dstPath = filepath.Base(srcPath)
171 dstPath = filepath.Clean(dstPath)
172 if filepath.IsAbs(dstPath) {
173 return errors.New("Relative destination path required")
175 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
179 reader, src, fileSize, err := prepareTxFile(srcPath)
186 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
190 "nice": strconv.Itoa(int(nice)),
193 "size": strconv.FormatInt(fileSize, 10),
196 ctx.LogI("tx", sds, "sent")
199 ctx.LogE("tx", sds, "sent")
204 func (ctx *Ctx) TxFileChunked(
207 srcPath, dstPath string,
213 return errors.New("Must provide destination filename")
215 dstPath = filepath.Base(srcPath)
217 dstPath = filepath.Clean(dstPath)
218 if filepath.IsAbs(dstPath) {
219 return errors.New("Relative destination path required")
221 reader, src, fileSize, err := prepareTxFile(srcPath)
229 if fileSize <= chunkSize {
230 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
234 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
238 "nice": strconv.Itoa(int(nice)),
241 "size": strconv.FormatInt(fileSize, 10),
244 ctx.LogI("tx", sds, "sent")
247 ctx.LogE("tx", sds, "sent")
253 metaPkt := ChunkedMeta{
255 FileSize: uint64(fileSize),
256 ChunkSize: uint64(chunkSize),
257 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
259 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
261 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
269 if leftSize <= chunkSize {
270 sizeToSend = leftSize
272 sizeToSend = chunkSize
274 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
275 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
279 hsh, err = blake2b.New256(nil)
289 io.TeeReader(reader, hsh),
294 "nice": strconv.Itoa(int(nice)),
297 "size": strconv.FormatInt(sizeToSend, 10),
300 ctx.LogI("tx", sds, "sent")
303 ctx.LogE("tx", sds, "sent")
306 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
307 leftSize -= sizeToSend
313 var metaBuf bytes.Buffer
314 _, err = xdr.Marshal(&metaBuf, metaPkt)
318 path = dstPath + ChunkedSuffixMeta
319 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
323 metaPktSize := int64(metaBuf.Len())
324 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
328 "nice": strconv.Itoa(int(nice)),
331 "size": strconv.FormatInt(metaPktSize, 10),
334 ctx.LogI("tx", sds, "sent")
337 ctx.LogE("tx", sds, "sent")
342 func (ctx *Ctx) TxFreq(
344 nice, replyNice uint8,
345 srcPath, dstPath string,
346 minSize int64) error {
347 dstPath = filepath.Clean(dstPath)
348 if filepath.IsAbs(dstPath) {
349 return errors.New("Relative destination path required")
351 srcPath = filepath.Clean(srcPath)
352 if filepath.IsAbs(srcPath) {
353 return errors.New("Relative source path required")
355 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
359 src := strings.NewReader(dstPath)
360 size := int64(src.Len())
361 _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
365 "nice": strconv.Itoa(int(nice)),
366 "replynice": strconv.Itoa(int(replyNice)),
371 ctx.LogI("tx", sds, "sent")
374 ctx.LogE("tx", sds, "sent")
379 func (ctx *Ctx) TxExec(
381 nice, replyNice uint8,
387 path := make([][]byte, 0, 1+len(args))
388 path = append(path, []byte(handle))
389 for _, arg := range args {
390 path = append(path, []byte(arg))
392 pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
396 var compressed bytes.Buffer
397 compressor, err := zstd.NewWriter(
399 zstd.WithEncoderLevel(zstd.SpeedDefault),
404 _, err = io.Copy(compressor, in)
409 size := int64(compressed.Len())
410 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
414 "nice": strconv.Itoa(int(nice)),
415 "replynice": strconv.Itoa(int(replyNice)),
416 "dst": strings.Join(append([]string{handle}, args...), " "),
417 "size": strconv.FormatInt(size, 10),
420 ctx.LogI("tx", sds, "sent")
423 ctx.LogE("tx", sds, "sent")
428 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
432 "nice": strconv.Itoa(int(nice)),
433 "size": strconv.FormatInt(size, 10),
435 ctx.LogD("tx", sds, "taken")
436 tmp, err := ctx.NewTmpFileWHash()
440 if _, err = io.Copy(tmp.W, src); err != nil {
443 nodePath := filepath.Join(ctx.Spool, node.Id.String())
444 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
446 ctx.LogI("tx", sds, "sent")
449 ctx.LogI("tx", sds, "sent")
451 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))