2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
35 "github.com/davecgh/go-xdr/xdr2"
36 "golang.org/x/crypto/blake2b"
37 "golang.org/x/crypto/chacha20poly1305"
45 src io.Reader) (*Node, error) {
46 tmp, err := ctx.NewTmpFileWHash()
50 hops := make([]*Node, 0, 1+len(node.Via))
51 hops = append(hops, node)
53 for i := len(node.Via); i > 0; i-- {
54 lastNode = ctx.Neigh[*node.Via[i-1]]
55 hops = append(hops, lastNode)
58 for i := 0; i < len(hops); i++ {
59 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
61 padSize := minSize - expectedSize
65 errs := make(chan error)
67 pipeR, pipeW := io.Pipe()
68 go func(size int64, src io.Reader, dst io.WriteCloser) {
71 "nice": strconv.Itoa(int(nice)),
72 "size": strconv.FormatInt(size, 10),
74 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
76 }(curSize, src, pipeW)
77 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
79 var pipeRPrev io.Reader
80 for i := 1; i < len(hops); i++ {
81 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
83 pipeR, pipeW = io.Pipe()
84 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
87 "nice": strconv.Itoa(int(nice)),
88 "size": strconv.FormatInt(size, 10),
90 errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
92 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
93 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
96 _, err := io.Copy(tmp.W, pipeR)
99 for i := 0; i <= len(hops); i++ {
106 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
107 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
108 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
112 func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) {
118 src, err = ioutil.TempFile("", "nncp-file")
120 return nil, nil, 0, err
122 os.Remove(src.Name())
123 tmpW := bufio.NewWriter(src)
124 tmpKey := make([]byte, chacha20poly1305.KeySize)
125 if _, err = rand.Read(tmpKey[:]); err != nil {
126 return nil, nil, 0, err
128 aead, err := chacha20poly1305.New(tmpKey)
130 return nil, nil, 0, err
132 nonce := make([]byte, aead.NonceSize())
133 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
135 return nil, nil, 0, err
137 fileSize = int64(written)
138 if err = tmpW.Flush(); err != nil {
139 return nil, nil, 0, err
141 src.Seek(0, io.SeekStart)
144 if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
150 src, err = os.Open(srcPath)
152 return nil, nil, 0, err
154 srcStat, err := src.Stat()
156 return nil, nil, 0, err
158 fileSize = srcStat.Size()
159 reader = bufio.NewReader(src)
161 return reader, src, fileSize, nil
164 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
167 return errors.New("Must provide destination filename")
169 dstPath = filepath.Base(srcPath)
171 dstPath = filepath.Clean(dstPath)
172 if filepath.IsAbs(dstPath) {
173 return errors.New("Relative destination path required")
175 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
179 reader, src, fileSize, err := prepareTxFile(srcPath)
186 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
190 "nice": strconv.Itoa(int(nice)),
193 "size": strconv.FormatInt(fileSize, 10),
196 ctx.LogI("tx", sds, "sent")
199 ctx.LogE("tx", sds, "sent")
204 func (ctx *Ctx) TxFileChunked(
207 srcPath, dstPath string,
209 chunkSize int64) error {
212 return errors.New("Must provide destination filename")
214 dstPath = filepath.Base(srcPath)
216 dstPath = filepath.Clean(dstPath)
217 if filepath.IsAbs(dstPath) {
218 return errors.New("Relative destination path required")
220 reader, src, fileSize, err := prepareTxFile(srcPath)
228 if fileSize <= chunkSize {
229 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
233 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
237 "nice": strconv.Itoa(int(nice)),
240 "size": strconv.FormatInt(fileSize, 10),
243 ctx.LogI("tx", sds, "sent")
246 ctx.LogE("tx", sds, "sent")
252 metaPkt := ChunkedMeta{
254 FileSize: uint64(fileSize),
255 ChunkSize: uint64(chunkSize),
256 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
258 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
260 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
268 if leftSize <= chunkSize {
269 sizeToSend = leftSize
271 sizeToSend = chunkSize
273 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
274 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
278 hsh, err = blake2b.New256(nil)
288 io.TeeReader(reader, hsh),
293 "nice": strconv.Itoa(int(nice)),
296 "size": strconv.FormatInt(sizeToSend, 10),
299 ctx.LogI("tx", sds, "sent")
302 ctx.LogE("tx", sds, "sent")
305 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
306 leftSize -= sizeToSend
312 var metaBuf bytes.Buffer
313 _, err = xdr.Marshal(&metaBuf, metaPkt)
317 path = dstPath + ChunkedSuffixMeta
318 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
322 metaPktSize := int64(metaBuf.Len())
323 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
327 "nice": strconv.Itoa(int(nice)),
330 "size": strconv.FormatInt(metaPktSize, 10),
333 ctx.LogI("tx", sds, "sent")
336 ctx.LogE("tx", sds, "sent")
341 func (ctx *Ctx) TxFreq(
343 nice, replyNice uint8,
344 srcPath, dstPath string,
345 minSize int64) error {
346 dstPath = filepath.Clean(dstPath)
347 if filepath.IsAbs(dstPath) {
348 return errors.New("Relative destination path required")
350 srcPath = filepath.Clean(srcPath)
351 if filepath.IsAbs(srcPath) {
352 return errors.New("Relative source path required")
354 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
358 src := strings.NewReader(dstPath)
359 size := int64(src.Len())
360 _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
364 "nice": strconv.Itoa(int(nice)),
365 "replynice": strconv.Itoa(int(replyNice)),
370 ctx.LogI("tx", sds, "sent")
373 ctx.LogE("tx", sds, "sent")
378 func (ctx *Ctx) TxExec(
380 nice, replyNice uint8,
384 minSize int64) error {
385 path := make([][]byte, 0, 1+len(args))
386 path = append(path, []byte(handle))
387 for _, arg := range args {
388 path = append(path, []byte(arg))
390 pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
394 var compressed bytes.Buffer
395 compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
399 if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
403 size := int64(compressed.Len())
404 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
408 "nice": strconv.Itoa(int(nice)),
409 "replynice": strconv.Itoa(int(replyNice)),
410 "dst": strings.Join(append([]string{handle}, args...), " "),
411 "size": strconv.FormatInt(size, 10),
414 ctx.LogI("tx", sds, "sent")
417 ctx.LogE("tx", sds, "sent")
422 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
426 "nice": strconv.Itoa(int(nice)),
427 "size": strconv.FormatInt(size, 10),
429 ctx.LogD("tx", sds, "taken")
430 tmp, err := ctx.NewTmpFileWHash()
434 if _, err = io.Copy(tmp.W, src); err != nil {
437 nodePath := filepath.Join(ctx.Spool, node.Id.String())
438 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
440 ctx.LogI("tx", sds, "sent")
443 ctx.LogI("tx", sds, "sent")
445 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))