]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
b3f8da5b00031021921633415fcc133f514f676c
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "crypto/rand"
26         "errors"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34
35         "github.com/davecgh/go-xdr/xdr2"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 func (ctx *Ctx) Tx(
40         node *Node,
41         pkt *Pkt,
42         nice uint8,
43         size, minSize int64,
44         src io.Reader) (*Node, error) {
45         tmp, err := ctx.NewTmpFileWHash()
46         if err != nil {
47                 return nil, err
48         }
49         hops := make([]*Node, 0, 1+len(node.Via))
50         hops = append(hops, node)
51         lastNode := node
52         for i := len(node.Via); i > 0; i-- {
53                 lastNode = ctx.Neigh[*node.Via[i-1]]
54                 hops = append(hops, lastNode)
55         }
56         padSize := minSize - size - int64(len(hops))*(PktOverhead+PktEncOverhead)
57         if padSize < 0 {
58                 padSize = 0
59         }
60         errs := make(chan error)
61         curSize := size
62         pipeR, pipeW := io.Pipe()
63         go func(size int64, src io.Reader, dst io.WriteCloser) {
64                 ctx.LogD("tx", SDS{
65                         "node": hops[0].Id,
66                         "nice": strconv.Itoa(int(nice)),
67                         "size": strconv.FormatInt(size, 10),
68                 }, "wrote")
69                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
70                 dst.Close()
71         }(curSize, src, pipeW)
72         curSize += padSize
73
74         var pipeRPrev io.Reader
75         for i := 1; i < len(hops); i++ {
76                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
77                 curSize += PktOverhead + PktEncOverhead
78                 pipeRPrev = pipeR
79                 pipeR, pipeW = io.Pipe()
80                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
81                         ctx.LogD("tx", SDS{
82                                 "node": node.Id,
83                                 "nice": strconv.Itoa(int(nice)),
84                                 "size": strconv.FormatInt(size, 10),
85                         }, "trns wrote")
86                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
87                         dst.Close()
88                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
89         }
90         go func() {
91                 _, err := io.Copy(tmp.W, pipeR)
92                 errs <- err
93         }()
94         for i := 0; i <= len(hops); i++ {
95                 err = <-errs
96                 if err != nil {
97                         tmp.Fd.Close()
98                         return nil, err
99                 }
100         }
101         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
102         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
103         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
104         return lastNode, err
105 }
106
107 func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) {
108         var reader io.Reader
109         var src *os.File
110         var fileSize int64
111         var err error
112         if srcPath == "-" {
113                 src, err = ioutil.TempFile("", "nncp-file")
114                 if err != nil {
115                         return nil, nil, 0, err
116                 }
117                 os.Remove(src.Name())
118                 tmpW := bufio.NewWriter(src)
119                 tmpKey := new([32]byte)
120                 if _, err = rand.Read(tmpKey[:]); err != nil {
121                         return nil, nil, 0, err
122                 }
123                 written, err := ae(tmpKey, bufio.NewReader(os.Stdin), tmpW)
124                 if err != nil {
125                         return nil, nil, 0, err
126                 }
127                 fileSize = int64(written)
128                 if err = tmpW.Flush(); err != nil {
129                         return nil, nil, 0, err
130                 }
131                 src.Seek(0, io.SeekStart)
132                 r, w := io.Pipe()
133                 go ae(tmpKey, bufio.NewReader(src), w)
134                 reader = r
135         } else {
136                 src, err = os.Open(srcPath)
137                 if err != nil {
138                         return nil, nil, 0, err
139                 }
140                 srcStat, err := src.Stat()
141                 if err != nil {
142                         return nil, nil, 0, err
143                 }
144                 fileSize = srcStat.Size()
145                 reader = bufio.NewReader(src)
146         }
147         return reader, src, fileSize, nil
148 }
149
150 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
151         if dstPath == "" {
152                 if srcPath == "-" {
153                         return errors.New("Must provide destination filename")
154                 }
155                 dstPath = filepath.Base(srcPath)
156         }
157         dstPath = filepath.Clean(dstPath)
158         if filepath.IsAbs(dstPath) {
159                 return errors.New("Relative destination path required")
160         }
161         pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
162         if err != nil {
163                 return err
164         }
165         reader, src, fileSize, err := prepareTxFile(srcPath)
166         if src != nil {
167                 defer src.Close()
168         }
169         if err != nil {
170                 return err
171         }
172         _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
173         sds := SDS{
174                 "type": "file",
175                 "node": node.Id,
176                 "nice": strconv.Itoa(int(nice)),
177                 "src":  srcPath,
178                 "dst":  dstPath,
179                 "size": strconv.FormatInt(fileSize, 10),
180         }
181         if err == nil {
182                 ctx.LogI("tx", sds, "sent")
183         } else {
184                 sds["err"] = err
185                 ctx.LogE("tx", sds, "sent")
186         }
187         return err
188 }
189
190 func (ctx *Ctx) TxFileChunked(
191         node *Node,
192         nice uint8,
193         srcPath, dstPath string,
194         minSize int64,
195         chunkSize int64) error {
196         if dstPath == "" {
197                 if srcPath == "-" {
198                         return errors.New("Must provide destination filename")
199                 }
200                 dstPath = filepath.Base(srcPath)
201         }
202         dstPath = filepath.Clean(dstPath)
203         if filepath.IsAbs(dstPath) {
204                 return errors.New("Relative destination path required")
205         }
206         reader, src, fileSize, err := prepareTxFile(srcPath)
207         if src != nil {
208                 defer src.Close()
209         }
210         if err != nil {
211                 return err
212         }
213
214         if fileSize <= chunkSize {
215                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
216                 if err != nil {
217                         return err
218                 }
219                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
220                 sds := SDS{
221                         "type": "file",
222                         "node": node.Id,
223                         "nice": strconv.Itoa(int(nice)),
224                         "src":  srcPath,
225                         "dst":  dstPath,
226                         "size": strconv.FormatInt(fileSize, 10),
227                 }
228                 if err == nil {
229                         ctx.LogI("tx", sds, "sent")
230                 } else {
231                         sds["err"] = err
232                         ctx.LogE("tx", sds, "sent")
233                 }
234                 return err
235         }
236
237         leftSize := fileSize
238         metaPkt := ChunkedMeta{
239                 Magic:     MagicNNCPMv1,
240                 FileSize:  uint64(fileSize),
241                 ChunkSize: uint64(chunkSize),
242                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
243         }
244         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
245                 hsh := new([32]byte)
246                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
247         }
248         var sizeToSend int64
249         var hsh hash.Hash
250         var pkt *Pkt
251         var chunkNum int
252         var path string
253         for {
254                 if leftSize <= chunkSize {
255                         sizeToSend = leftSize
256                 } else {
257                         sizeToSend = chunkSize
258                 }
259                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
260                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
261                 if err != nil {
262                         return err
263                 }
264                 hsh, err = blake2b.New256(nil)
265                 if err != nil {
266                         return err
267                 }
268                 _, err = ctx.Tx(
269                         node,
270                         pkt,
271                         nice,
272                         sizeToSend,
273                         minSize,
274                         io.TeeReader(reader, hsh),
275                 )
276                 sds := SDS{
277                         "type": "file",
278                         "node": node.Id,
279                         "nice": strconv.Itoa(int(nice)),
280                         "src":  srcPath,
281                         "dst":  path,
282                         "size": strconv.FormatInt(sizeToSend, 10),
283                 }
284                 if err == nil {
285                         ctx.LogI("tx", sds, "sent")
286                 } else {
287                         sds["err"] = err
288                         ctx.LogE("tx", sds, "sent")
289                         return err
290                 }
291                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
292                 leftSize -= sizeToSend
293                 chunkNum++
294                 if leftSize == 0 {
295                         break
296                 }
297         }
298         var metaBuf bytes.Buffer
299         _, err = xdr.Marshal(&metaBuf, metaPkt)
300         if err != nil {
301                 return err
302         }
303         path = dstPath + ChunkedSuffixMeta
304         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
305         if err != nil {
306                 return err
307         }
308         metaPktSize := int64(metaBuf.Len())
309         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
310         sds := SDS{
311                 "type": "file",
312                 "node": node.Id,
313                 "nice": strconv.Itoa(int(nice)),
314                 "src":  srcPath,
315                 "dst":  path,
316                 "size": strconv.FormatInt(metaPktSize, 10),
317         }
318         if err == nil {
319                 ctx.LogI("tx", sds, "sent")
320         } else {
321                 sds["err"] = err
322                 ctx.LogE("tx", sds, "sent")
323         }
324         return err
325 }
326
327 func (ctx *Ctx) TxFreq(
328         node *Node,
329         nice, replyNice uint8,
330         srcPath, dstPath string,
331         minSize int64) error {
332         dstPath = filepath.Clean(dstPath)
333         if filepath.IsAbs(dstPath) {
334                 return errors.New("Relative destination path required")
335         }
336         srcPath = filepath.Clean(srcPath)
337         if filepath.IsAbs(srcPath) {
338                 return errors.New("Relative source path required")
339         }
340         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
341         if err != nil {
342                 return err
343         }
344         src := strings.NewReader(dstPath)
345         size := int64(src.Len())
346         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
347         sds := SDS{
348                 "type":      "freq",
349                 "node":      node.Id,
350                 "nice":      strconv.Itoa(int(nice)),
351                 "replynice": strconv.Itoa(int(replyNice)),
352                 "src":       srcPath,
353                 "dst":       dstPath,
354         }
355         if err == nil {
356                 ctx.LogI("tx", sds, "sent")
357         } else {
358                 sds["err"] = err
359                 ctx.LogE("tx", sds, "sent")
360         }
361         return err
362 }
363
364 func (ctx *Ctx) TxExec(
365         node *Node,
366         nice, replyNice uint8,
367         handle string,
368         args []string,
369         body []byte,
370         minSize int64) error {
371         path := make([][]byte, 0, 1+len(args))
372         path = append(path, []byte(handle))
373         for _, arg := range args {
374                 path = append(path, []byte(arg))
375         }
376         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
377         if err != nil {
378                 return err
379         }
380         var compressed bytes.Buffer
381         compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
382         if err != nil {
383                 return err
384         }
385         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
386                 return err
387         }
388         compressor.Close()
389         size := int64(compressed.Len())
390         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
391         sds := SDS{
392                 "type":      "exec",
393                 "node":      node.Id,
394                 "nice":      strconv.Itoa(int(nice)),
395                 "replynice": strconv.Itoa(int(replyNice)),
396                 "dst":       strings.Join(append([]string{handle}, args...), " "),
397                 "size":      strconv.FormatInt(size, 10),
398         }
399         if err == nil {
400                 ctx.LogI("tx", sds, "sent")
401         } else {
402                 sds["err"] = err
403                 ctx.LogE("tx", sds, "sent")
404         }
405         return err
406 }
407
408 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
409         sds := SDS{
410                 "type": "trns",
411                 "node": node.Id,
412                 "nice": strconv.Itoa(int(nice)),
413                 "size": strconv.FormatInt(size, 10),
414         }
415         ctx.LogD("tx", sds, "taken")
416         tmp, err := ctx.NewTmpFileWHash()
417         if err != nil {
418                 return err
419         }
420         if _, err = io.Copy(tmp.W, src); err != nil {
421                 return err
422         }
423         nodePath := filepath.Join(ctx.Spool, node.Id.String())
424         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
425         if err == nil {
426                 ctx.LogI("tx", sds, "sent")
427         } else {
428                 sds["err"] = err
429                 ctx.LogI("tx", sds, "sent")
430         }
431         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
432         return err
433 }