]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
Forbid any later GNU GPL versions autousage
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "compress/zlib"
24         "crypto/rand"
25         "errors"
26         "hash"
27         "io"
28         "io/ioutil"
29         "os"
30         "path/filepath"
31         "strconv"
32         "strings"
33
34         "github.com/davecgh/go-xdr/xdr2"
35         "golang.org/x/crypto/blake2b"
36         "golang.org/x/crypto/chacha20poly1305"
37 )
38
39 func (ctx *Ctx) Tx(
40         node *Node,
41         pkt *Pkt,
42         nice uint8,
43         size, minSize int64,
44         src io.Reader,
45 ) (*Node, error) {
46         tmp, err := ctx.NewTmpFileWHash()
47         if err != nil {
48                 return nil, err
49         }
50         hops := make([]*Node, 0, 1+len(node.Via))
51         hops = append(hops, node)
52         lastNode := node
53         for i := len(node.Via); i > 0; i-- {
54                 lastNode = ctx.Neigh[*node.Via[i-1]]
55                 hops = append(hops, lastNode)
56         }
57         expectedSize := size
58         for i := 0; i < len(hops); i++ {
59                 expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
60         }
61         padSize := minSize - expectedSize
62         if padSize < 0 {
63                 padSize = 0
64         }
65         errs := make(chan error)
66         curSize := size
67         pipeR, pipeW := io.Pipe()
68         go func(size int64, src io.Reader, dst io.WriteCloser) {
69                 ctx.LogD("tx", SDS{
70                         "node": hops[0].Id,
71                         "nice": strconv.Itoa(int(nice)),
72                         "size": strconv.FormatInt(size, 10),
73                 }, "wrote")
74                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
75                 dst.Close()
76         }(curSize, src, pipeW)
77         curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
78
79         var pipeRPrev io.Reader
80         for i := 1; i < len(hops); i++ {
81                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
82                 pipeRPrev = pipeR
83                 pipeR, pipeW = io.Pipe()
84                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
85                         ctx.LogD("tx", SDS{
86                                 "node": node.Id,
87                                 "nice": strconv.Itoa(int(nice)),
88                                 "size": strconv.FormatInt(size, 10),
89                         }, "trns wrote")
90                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
91                         dst.Close()
92                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
93                 curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
94         }
95         go func() {
96                 _, err := io.Copy(tmp.W, pipeR)
97                 errs <- err
98         }()
99         for i := 0; i <= len(hops); i++ {
100                 err = <-errs
101                 if err != nil {
102                         tmp.Fd.Close()
103                         return nil, err
104                 }
105         }
106         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
107         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
108         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
109         return lastNode, err
110 }
111
112 func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) {
113         var reader io.Reader
114         var src *os.File
115         var fileSize int64
116         var err error
117         if srcPath == "-" {
118                 src, err = ioutil.TempFile("", "nncp-file")
119                 if err != nil {
120                         return nil, nil, 0, err
121                 }
122                 os.Remove(src.Name())
123                 tmpW := bufio.NewWriter(src)
124                 tmpKey := make([]byte, chacha20poly1305.KeySize)
125                 if _, err = rand.Read(tmpKey[:]); err != nil {
126                         return nil, nil, 0, err
127                 }
128                 aead, err := chacha20poly1305.New(tmpKey)
129                 if err != nil {
130                         return nil, nil, 0, err
131                 }
132                 nonce := make([]byte, aead.NonceSize())
133                 written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW)
134                 if err != nil {
135                         return nil, nil, 0, err
136                 }
137                 fileSize = int64(written)
138                 if err = tmpW.Flush(); err != nil {
139                         return nil, nil, 0, err
140                 }
141                 src.Seek(0, io.SeekStart)
142                 r, w := io.Pipe()
143                 go func() {
144                         if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil {
145                                 panic(err)
146                         }
147                 }()
148                 reader = r
149         } else {
150                 src, err = os.Open(srcPath)
151                 if err != nil {
152                         return nil, nil, 0, err
153                 }
154                 srcStat, err := src.Stat()
155                 if err != nil {
156                         return nil, nil, 0, err
157                 }
158                 fileSize = srcStat.Size()
159                 reader = bufio.NewReader(src)
160         }
161         return reader, src, fileSize, nil
162 }
163
164 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
165         if dstPath == "" {
166                 if srcPath == "-" {
167                         return errors.New("Must provide destination filename")
168                 }
169                 dstPath = filepath.Base(srcPath)
170         }
171         dstPath = filepath.Clean(dstPath)
172         if filepath.IsAbs(dstPath) {
173                 return errors.New("Relative destination path required")
174         }
175         pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
176         if err != nil {
177                 return err
178         }
179         reader, src, fileSize, err := prepareTxFile(srcPath)
180         if src != nil {
181                 defer src.Close()
182         }
183         if err != nil {
184                 return err
185         }
186         _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
187         sds := SDS{
188                 "type": "file",
189                 "node": node.Id,
190                 "nice": strconv.Itoa(int(nice)),
191                 "src":  srcPath,
192                 "dst":  dstPath,
193                 "size": strconv.FormatInt(fileSize, 10),
194         }
195         if err == nil {
196                 ctx.LogI("tx", sds, "sent")
197         } else {
198                 sds["err"] = err
199                 ctx.LogE("tx", sds, "sent")
200         }
201         return err
202 }
203
204 func (ctx *Ctx) TxFileChunked(
205         node *Node,
206         nice uint8,
207         srcPath, dstPath string,
208         minSize int64,
209         chunkSize int64,
210 ) error {
211         if dstPath == "" {
212                 if srcPath == "-" {
213                         return errors.New("Must provide destination filename")
214                 }
215                 dstPath = filepath.Base(srcPath)
216         }
217         dstPath = filepath.Clean(dstPath)
218         if filepath.IsAbs(dstPath) {
219                 return errors.New("Relative destination path required")
220         }
221         reader, src, fileSize, err := prepareTxFile(srcPath)
222         if src != nil {
223                 defer src.Close()
224         }
225         if err != nil {
226                 return err
227         }
228
229         if fileSize <= chunkSize {
230                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
231                 if err != nil {
232                         return err
233                 }
234                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
235                 sds := SDS{
236                         "type": "file",
237                         "node": node.Id,
238                         "nice": strconv.Itoa(int(nice)),
239                         "src":  srcPath,
240                         "dst":  dstPath,
241                         "size": strconv.FormatInt(fileSize, 10),
242                 }
243                 if err == nil {
244                         ctx.LogI("tx", sds, "sent")
245                 } else {
246                         sds["err"] = err
247                         ctx.LogE("tx", sds, "sent")
248                 }
249                 return err
250         }
251
252         leftSize := fileSize
253         metaPkt := ChunkedMeta{
254                 Magic:     MagicNNCPMv1,
255                 FileSize:  uint64(fileSize),
256                 ChunkSize: uint64(chunkSize),
257                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
258         }
259         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
260                 hsh := new([32]byte)
261                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
262         }
263         var sizeToSend int64
264         var hsh hash.Hash
265         var pkt *Pkt
266         var chunkNum int
267         var path string
268         for {
269                 if leftSize <= chunkSize {
270                         sizeToSend = leftSize
271                 } else {
272                         sizeToSend = chunkSize
273                 }
274                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
275                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
276                 if err != nil {
277                         return err
278                 }
279                 hsh, err = blake2b.New256(nil)
280                 if err != nil {
281                         return err
282                 }
283                 _, err = ctx.Tx(
284                         node,
285                         pkt,
286                         nice,
287                         sizeToSend,
288                         minSize,
289                         io.TeeReader(reader, hsh),
290                 )
291                 sds := SDS{
292                         "type": "file",
293                         "node": node.Id,
294                         "nice": strconv.Itoa(int(nice)),
295                         "src":  srcPath,
296                         "dst":  path,
297                         "size": strconv.FormatInt(sizeToSend, 10),
298                 }
299                 if err == nil {
300                         ctx.LogI("tx", sds, "sent")
301                 } else {
302                         sds["err"] = err
303                         ctx.LogE("tx", sds, "sent")
304                         return err
305                 }
306                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
307                 leftSize -= sizeToSend
308                 chunkNum++
309                 if leftSize == 0 {
310                         break
311                 }
312         }
313         var metaBuf bytes.Buffer
314         _, err = xdr.Marshal(&metaBuf, metaPkt)
315         if err != nil {
316                 return err
317         }
318         path = dstPath + ChunkedSuffixMeta
319         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
320         if err != nil {
321                 return err
322         }
323         metaPktSize := int64(metaBuf.Len())
324         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
325         sds := SDS{
326                 "type": "file",
327                 "node": node.Id,
328                 "nice": strconv.Itoa(int(nice)),
329                 "src":  srcPath,
330                 "dst":  path,
331                 "size": strconv.FormatInt(metaPktSize, 10),
332         }
333         if err == nil {
334                 ctx.LogI("tx", sds, "sent")
335         } else {
336                 sds["err"] = err
337                 ctx.LogE("tx", sds, "sent")
338         }
339         return err
340 }
341
342 func (ctx *Ctx) TxFreq(
343         node *Node,
344         nice, replyNice uint8,
345         srcPath, dstPath string,
346         minSize int64) error {
347         dstPath = filepath.Clean(dstPath)
348         if filepath.IsAbs(dstPath) {
349                 return errors.New("Relative destination path required")
350         }
351         srcPath = filepath.Clean(srcPath)
352         if filepath.IsAbs(srcPath) {
353                 return errors.New("Relative source path required")
354         }
355         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
356         if err != nil {
357                 return err
358         }
359         src := strings.NewReader(dstPath)
360         size := int64(src.Len())
361         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
362         sds := SDS{
363                 "type":      "freq",
364                 "node":      node.Id,
365                 "nice":      strconv.Itoa(int(nice)),
366                 "replynice": strconv.Itoa(int(replyNice)),
367                 "src":       srcPath,
368                 "dst":       dstPath,
369         }
370         if err == nil {
371                 ctx.LogI("tx", sds, "sent")
372         } else {
373                 sds["err"] = err
374                 ctx.LogE("tx", sds, "sent")
375         }
376         return err
377 }
378
379 func (ctx *Ctx) TxExec(
380         node *Node,
381         nice, replyNice uint8,
382         handle string,
383         args []string,
384         body []byte,
385         minSize int64,
386 ) error {
387         path := make([][]byte, 0, 1+len(args))
388         path = append(path, []byte(handle))
389         for _, arg := range args {
390                 path = append(path, []byte(arg))
391         }
392         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
393         if err != nil {
394                 return err
395         }
396         var compressed bytes.Buffer
397         compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
398         if err != nil {
399                 return err
400         }
401         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
402                 return err
403         }
404         compressor.Close()
405         size := int64(compressed.Len())
406         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
407         sds := SDS{
408                 "type":      "exec",
409                 "node":      node.Id,
410                 "nice":      strconv.Itoa(int(nice)),
411                 "replynice": strconv.Itoa(int(replyNice)),
412                 "dst":       strings.Join(append([]string{handle}, args...), " "),
413                 "size":      strconv.FormatInt(size, 10),
414         }
415         if err == nil {
416                 ctx.LogI("tx", sds, "sent")
417         } else {
418                 sds["err"] = err
419                 ctx.LogE("tx", sds, "sent")
420         }
421         return err
422 }
423
424 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
425         sds := SDS{
426                 "type": "trns",
427                 "node": node.Id,
428                 "nice": strconv.Itoa(int(nice)),
429                 "size": strconv.FormatInt(size, 10),
430         }
431         ctx.LogD("tx", sds, "taken")
432         tmp, err := ctx.NewTmpFileWHash()
433         if err != nil {
434                 return err
435         }
436         if _, err = io.Copy(tmp.W, src); err != nil {
437                 return err
438         }
439         nodePath := filepath.Join(ctx.Spool, node.Id.String())
440         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
441         if err == nil {
442                 ctx.LogI("tx", sds, "sent")
443         } else {
444                 sds["err"] = err
445                 ctx.LogI("tx", sds, "sent")
446         }
447         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
448         return err
449 }