2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
33 "github.com/davecgh/go-xdr/xdr2"
34 "golang.org/x/crypto/blake2b"
37 func (ctx *Ctx) Tx(node *Node, pkt *Pkt, nice uint8, size, minSize int64, src io.Reader) (*Node, error) {
38 tmp, err := ctx.NewTmpFileWHash()
42 hops := make([]*Node, 0, 1+len(node.Via))
43 hops = append(hops, node)
45 for i := len(node.Via); i > 0; i-- {
46 lastNode = ctx.Neigh[*node.Via[i-1]]
47 hops = append(hops, lastNode)
49 padSize := minSize - size - int64(len(hops))*(PktOverhead+PktEncOverhead)
53 errs := make(chan error)
55 pipeR, pipeW := io.Pipe()
56 go func(size int64, src io.Reader, dst io.WriteCloser) {
59 "nice": strconv.Itoa(int(nice)),
60 "size": strconv.FormatInt(size, 10),
62 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
64 }(curSize, src, pipeW)
67 var pipeRPrev io.Reader
68 for i := 1; i < len(hops); i++ {
72 PathLen: blake2b.Size256,
73 Path: new([MaxPathSize]byte),
75 copy(pktTrans.Path[:], hops[i-1].Id[:])
76 curSize += PktOverhead + PktEncOverhead
78 pipeR, pipeW = io.Pipe()
79 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
82 "nice": strconv.Itoa(int(nice)),
83 "size": strconv.FormatInt(size, 10),
85 errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
87 }(hops[i], &pktTrans, curSize, pipeRPrev, pipeW)
90 _, err := io.Copy(tmp.W, pipeR)
93 for i := 0; i <= len(hops); i++ {
100 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
101 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
102 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
106 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
108 dstPath = filepath.Base(srcPath)
110 dstPath = filepath.Clean(dstPath)
111 if filepath.IsAbs(dstPath) {
112 return errors.New("Relative destination path required")
114 pkt, err := NewPkt(PktTypeFile, dstPath)
118 src, err := os.Open(srcPath)
123 srcStat, err := src.Stat()
127 _, err = ctx.Tx(node, pkt, nice, srcStat.Size(), minSize, bufio.NewReader(src))
132 "nice": strconv.Itoa(int(nice)),
135 "size": strconv.FormatInt(srcStat.Size(), 10),
141 "nice": strconv.Itoa(int(nice)),
144 "size": strconv.FormatInt(srcStat.Size(), 10),
151 func (ctx *Ctx) TxFileChunked(node *Node, nice uint8, srcPath, dstPath string, minSize int64, chunkSize int64) error {
153 dstPath = filepath.Base(srcPath)
155 dstPath = filepath.Clean(dstPath)
156 if filepath.IsAbs(dstPath) {
157 return errors.New("Relative destination path required")
159 src, err := os.Open(srcPath)
164 srcStat, err := src.Stat()
168 srcReader := bufio.NewReader(src)
169 fileSize := srcStat.Size()
171 metaPkt := ChunkedMeta{
173 FileSize: uint64(fileSize),
174 ChunkSize: uint64(chunkSize),
175 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
177 for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
179 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
187 if leftSize <= chunkSize {
188 sizeToSend = leftSize
190 sizeToSend = chunkSize
192 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
193 pkt, err = NewPkt(PktTypeFile, path)
197 hsh, err = blake2b.New256(nil)
207 io.TeeReader(srcReader, hsh),
213 "nice": strconv.Itoa(int(nice)),
216 "size": strconv.FormatInt(sizeToSend, 10),
222 "nice": strconv.Itoa(int(nice)),
225 "size": strconv.FormatInt(sizeToSend, 10),
230 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
231 leftSize -= sizeToSend
237 var metaBuf bytes.Buffer
238 _, err = xdr.Marshal(&metaBuf, metaPkt)
242 path = dstPath + ChunkedSuffixMeta
243 pkt, err = NewPkt(PktTypeFile, path)
247 metaPktSize := int64(metaBuf.Len())
248 _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
253 "nice": strconv.Itoa(int(nice)),
256 "size": strconv.FormatInt(metaPktSize, 10),
261 "nice": strconv.Itoa(int(nice)),
264 "size": strconv.FormatInt(fileSize, 10),
270 "nice": strconv.Itoa(int(nice)),
273 "size": strconv.FormatInt(metaPktSize, 10),
280 func (ctx *Ctx) TxFreq(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
281 dstPath = filepath.Clean(dstPath)
282 if filepath.IsAbs(dstPath) {
283 return errors.New("Relative destination path required")
285 srcPath = filepath.Clean(srcPath)
286 if filepath.IsAbs(srcPath) {
287 return errors.New("Relative source path required")
289 pkt, err := NewPkt(PktTypeFreq, srcPath)
293 src := strings.NewReader(dstPath)
294 size := int64(src.Len())
295 _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
300 "nice": strconv.Itoa(int(nice)),
308 "nice": strconv.Itoa(int(nice)),
317 func (ctx *Ctx) TxMail(node *Node, nice uint8, recipient string, body []byte, minSize int64) error {
318 pkt, err := NewPkt(PktTypeMail, recipient)
322 var compressed bytes.Buffer
323 compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
327 if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
331 size := int64(compressed.Len())
332 _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
337 "nice": strconv.Itoa(int(nice)),
339 "size": strconv.FormatInt(size, 10),
345 "nice": strconv.Itoa(int(nice)),
347 "size": strconv.FormatInt(size, 10),
354 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
358 "nice": strconv.Itoa(int(nice)),
359 "size": strconv.FormatInt(size, 10),
361 tmp, err := ctx.NewTmpFileWHash()
365 if _, err = io.Copy(tmp.W, src); err != nil {
368 nodePath := filepath.Join(ctx.Spool, node.Id.String())
369 err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
374 "nice": strconv.Itoa(int(nice)),
375 "size": strconv.FormatInt(size, 10),
381 "nice": strconv.Itoa(int(nice)),
382 "size": strconv.FormatInt(size, 10),
386 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))