]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
fcf1c83126123649526e54c39bbff7e22f93dcdd
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "crypto/rand"
26         "errors"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34
35         "github.com/davecgh/go-xdr/xdr2"
36         "golang.org/x/crypto/blake2b"
37         "golang.org/x/crypto/chacha20poly1305"
38 )
39
40 func (ctx *Ctx) Tx(
41         node *Node,
42         pkt *Pkt,
43         nice uint8,
44         size, minSize int64,
45         src io.Reader,
46 ) (*Node, error) {
47         tmp, err := ctx.NewTmpFileWHash()
48         if err != nil {
49                 return nil, err
50         }
51         hops := make([]*Node, 0, 1+len(node.Via))
52         hops = append(hops, node)
53         lastNode := node
54         for i := len(node.Via); i > 0; i-- {
55                 lastNode = ctx.Neigh[*node.Via[i-1]]
56                 hops = append(hops, lastNode)
57         }
58         expectedSize := size
59         for i := 0; i < len(hops); i++ {
60                 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
61         }
62         padSize := minSize - expectedSize
63         if padSize < 0 {
64                 padSize = 0
65         }
66         errs := make(chan error)
67         curSize := size
68         pipeR, pipeW := io.Pipe()
69         go func(size int64, src io.Reader, dst io.WriteCloser) {
70                 ctx.LogD("tx", SDS{
71                         "node": hops[0].Id,
72                         "nice": strconv.Itoa(int(nice)),
73                         "size": strconv.FormatInt(size, 10),
74                 }, "wrote")
75                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
76                 dst.Close()
77         }(curSize, src, pipeW)
78         curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
79
80         var pipeRPrev io.Reader
81         for i := 1; i < len(hops); i++ {
82                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
83                 pipeRPrev = pipeR
84                 pipeR, pipeW = io.Pipe()
85                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
86                         ctx.LogD("tx", SDS{
87                                 "node": node.Id,
88                                 "nice": strconv.Itoa(int(nice)),
89                                 "size": strconv.FormatInt(size, 10),
90                         }, "trns wrote")
91                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
92                         dst.Close()
93                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
94                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
95         }
96         go func() {
97                 _, err := io.Copy(tmp.W, pipeR)
98                 errs <- err
99         }()
100         for i := 0; i <= len(hops); i++ {
101                 err = <-errs
102                 if err != nil {
103                         tmp.Fd.Close()
104                         return nil, err
105                 }
106         }
107         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
108         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
109         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
110         return lastNode, err
111 }
112
113 func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) {
114         var reader io.Reader
115         var src *os.File
116         var fileSize int64
117         var err error
118         if srcPath == "-" {
119                 src, err = ioutil.TempFile("", "nncp-file")
120                 if err != nil {
121                         return nil, nil, 0, err
122                 }
123                 os.Remove(src.Name())
124                 tmpW := bufio.NewWriter(src)
125                 tmpKey := make([]byte, chacha20poly1305.KeySize)
126                 if _, err = rand.Read(tmpKey[:]); err != nil {
127                         return nil, nil, 0, err
128                 }
129                 aead, err := chacha20poly1305.New(tmpKey)
130                 if err != nil {
131                         return nil, nil, 0, err
132                 }
133                 nonce := make([]byte, aead.NonceSize())
134                 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
135                 if err != nil {
136                         return nil, nil, 0, err
137                 }
138                 fileSize = int64(written)
139                 if err = tmpW.Flush(); err != nil {
140                         return nil, nil, 0, err
141                 }
142                 src.Seek(0, io.SeekStart)
143                 r, w := io.Pipe()
144                 go func() {
145                         if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
146                                 panic(err)
147                         }
148                 }()
149                 reader = r
150         } else {
151                 src, err = os.Open(srcPath)
152                 if err != nil {
153                         return nil, nil, 0, err
154                 }
155                 srcStat, err := src.Stat()
156                 if err != nil {
157                         return nil, nil, 0, err
158                 }
159                 fileSize = srcStat.Size()
160                 reader = bufio.NewReader(src)
161         }
162         return reader, src, fileSize, nil
163 }
164
165 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
166         if dstPath == "" {
167                 if srcPath == "-" {
168                         return errors.New("Must provide destination filename")
169                 }
170                 dstPath = filepath.Base(srcPath)
171         }
172         dstPath = filepath.Clean(dstPath)
173         if filepath.IsAbs(dstPath) {
174                 return errors.New("Relative destination path required")
175         }
176         pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
177         if err != nil {
178                 return err
179         }
180         reader, src, fileSize, err := prepareTxFile(srcPath)
181         if src != nil {
182                 defer src.Close()
183         }
184         if err != nil {
185                 return err
186         }
187         _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
188         sds := SDS{
189                 "type": "file",
190                 "node": node.Id,
191                 "nice": strconv.Itoa(int(nice)),
192                 "src":  srcPath,
193                 "dst":  dstPath,
194                 "size": strconv.FormatInt(fileSize, 10),
195         }
196         if err == nil {
197                 ctx.LogI("tx", sds, "sent")
198         } else {
199                 sds["err"] = err
200                 ctx.LogE("tx", sds, "sent")
201         }
202         return err
203 }
204
205 func (ctx *Ctx) TxFileChunked(
206         node *Node,
207         nice uint8,
208         srcPath, dstPath string,
209         minSize int64,
210         chunkSize int64,
211 ) error {
212         if dstPath == "" {
213                 if srcPath == "-" {
214                         return errors.New("Must provide destination filename")
215                 }
216                 dstPath = filepath.Base(srcPath)
217         }
218         dstPath = filepath.Clean(dstPath)
219         if filepath.IsAbs(dstPath) {
220                 return errors.New("Relative destination path required")
221         }
222         reader, src, fileSize, err := prepareTxFile(srcPath)
223         if src != nil {
224                 defer src.Close()
225         }
226         if err != nil {
227                 return err
228         }
229
230         if fileSize <= chunkSize {
231                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
232                 if err != nil {
233                         return err
234                 }
235                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
236                 sds := SDS{
237                         "type": "file",
238                         "node": node.Id,
239                         "nice": strconv.Itoa(int(nice)),
240                         "src":  srcPath,
241                         "dst":  dstPath,
242                         "size": strconv.FormatInt(fileSize, 10),
243                 }
244                 if err == nil {
245                         ctx.LogI("tx", sds, "sent")
246                 } else {
247                         sds["err"] = err
248                         ctx.LogE("tx", sds, "sent")
249                 }
250                 return err
251         }
252
253         leftSize := fileSize
254         metaPkt := ChunkedMeta{
255                 Magic:     MagicNNCPMv1,
256                 FileSize:  uint64(fileSize),
257                 ChunkSize: uint64(chunkSize),
258                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
259         }
260         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
261                 hsh := new([32]byte)
262                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
263         }
264         var sizeToSend int64
265         var hsh hash.Hash
266         var pkt *Pkt
267         var chunkNum int
268         var path string
269         for {
270                 if leftSize <= chunkSize {
271                         sizeToSend = leftSize
272                 } else {
273                         sizeToSend = chunkSize
274                 }
275                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
276                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
277                 if err != nil {
278                         return err
279                 }
280                 hsh, err = blake2b.New256(nil)
281                 if err != nil {
282                         return err
283                 }
284                 _, err = ctx.Tx(
285                         node,
286                         pkt,
287                         nice,
288                         sizeToSend,
289                         minSize,
290                         io.TeeReader(reader, hsh),
291                 )
292                 sds := SDS{
293                         "type": "file",
294                         "node": node.Id,
295                         "nice": strconv.Itoa(int(nice)),
296                         "src":  srcPath,
297                         "dst":  path,
298                         "size": strconv.FormatInt(sizeToSend, 10),
299                 }
300                 if err == nil {
301                         ctx.LogI("tx", sds, "sent")
302                 } else {
303                         sds["err"] = err
304                         ctx.LogE("tx", sds, "sent")
305                         return err
306                 }
307                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
308                 leftSize -= sizeToSend
309                 chunkNum++
310                 if leftSize == 0 {
311                         break
312                 }
313         }
314         var metaBuf bytes.Buffer
315         _, err = xdr.Marshal(&metaBuf, metaPkt)
316         if err != nil {
317                 return err
318         }
319         path = dstPath + ChunkedSuffixMeta
320         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
321         if err != nil {
322                 return err
323         }
324         metaPktSize := int64(metaBuf.Len())
325         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
326         sds := SDS{
327                 "type": "file",
328                 "node": node.Id,
329                 "nice": strconv.Itoa(int(nice)),
330                 "src":  srcPath,
331                 "dst":  path,
332                 "size": strconv.FormatInt(metaPktSize, 10),
333         }
334         if err == nil {
335                 ctx.LogI("tx", sds, "sent")
336         } else {
337                 sds["err"] = err
338                 ctx.LogE("tx", sds, "sent")
339         }
340         return err
341 }
342
343 func (ctx *Ctx) TxFreq(
344         node *Node,
345         nice, replyNice uint8,
346         srcPath, dstPath string,
347         minSize int64) error {
348         dstPath = filepath.Clean(dstPath)
349         if filepath.IsAbs(dstPath) {
350                 return errors.New("Relative destination path required")
351         }
352         srcPath = filepath.Clean(srcPath)
353         if filepath.IsAbs(srcPath) {
354                 return errors.New("Relative source path required")
355         }
356         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
357         if err != nil {
358                 return err
359         }
360         src := strings.NewReader(dstPath)
361         size := int64(src.Len())
362         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
363         sds := SDS{
364                 "type":      "freq",
365                 "node":      node.Id,
366                 "nice":      strconv.Itoa(int(nice)),
367                 "replynice": strconv.Itoa(int(replyNice)),
368                 "src":       srcPath,
369                 "dst":       dstPath,
370         }
371         if err == nil {
372                 ctx.LogI("tx", sds, "sent")
373         } else {
374                 sds["err"] = err
375                 ctx.LogE("tx", sds, "sent")
376         }
377         return err
378 }
379
380 func (ctx *Ctx) TxExec(
381         node *Node,
382         nice, replyNice uint8,
383         handle string,
384         args []string,
385         body []byte,
386         minSize int64,
387 ) error {
388         path := make([][]byte, 0, 1+len(args))
389         path = append(path, []byte(handle))
390         for _, arg := range args {
391                 path = append(path, []byte(arg))
392         }
393         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
394         if err != nil {
395                 return err
396         }
397         var compressed bytes.Buffer
398         compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
399         if err != nil {
400                 return err
401         }
402         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
403                 return err
404         }
405         compressor.Close()
406         size := int64(compressed.Len())
407         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
408         sds := SDS{
409                 "type":      "exec",
410                 "node":      node.Id,
411                 "nice":      strconv.Itoa(int(nice)),
412                 "replynice": strconv.Itoa(int(replyNice)),
413                 "dst":       strings.Join(append([]string{handle}, args...), " "),
414                 "size":      strconv.FormatInt(size, 10),
415         }
416         if err == nil {
417                 ctx.LogI("tx", sds, "sent")
418         } else {
419                 sds["err"] = err
420                 ctx.LogE("tx", sds, "sent")
421         }
422         return err
423 }
424
425 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
426         sds := SDS{
427                 "type": "trns",
428                 "node": node.Id,
429                 "nice": strconv.Itoa(int(nice)),
430                 "size": strconv.FormatInt(size, 10),
431         }
432         ctx.LogD("tx", sds, "taken")
433         tmp, err := ctx.NewTmpFileWHash()
434         if err != nil {
435                 return err
436         }
437         if _, err = io.Copy(tmp.W, src); err != nil {
438                 return err
439         }
440         nodePath := filepath.Join(ctx.Spool, node.Id.String())
441         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
442         if err == nil {
443                 ctx.LogI("tx", sds, "sent")
444         } else {
445                 sds["err"] = err
446                 ctx.LogI("tx", sds, "sent")
447         }
448         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
449         return err
450 }