]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
Raise copyright years
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "crypto/rand"
26         "errors"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34
35         "github.com/davecgh/go-xdr/xdr2"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 func (ctx *Ctx) Tx(
40         node *Node,
41         pkt *Pkt,
42         nice uint8,
43         size, minSize int64,
44         src io.Reader) (*Node, error) {
45         tmp, err := ctx.NewTmpFileWHash()
46         if err != nil {
47                 return nil, err
48         }
49         hops := make([]*Node, 0, 1+len(node.Via))
50         hops = append(hops, node)
51         lastNode := node
52         for i := len(node.Via); i > 0; i-- {
53                 lastNode = ctx.Neigh[*node.Via[i-1]]
54                 hops = append(hops, lastNode)
55         }
56         padSize := minSize - size - int64(len(hops))*(PktOverhead+PktEncOverhead)
57         if padSize < 0 {
58                 padSize = 0
59         }
60         errs := make(chan error)
61         curSize := size
62         pipeR, pipeW := io.Pipe()
63         go func(size int64, src io.Reader, dst io.WriteCloser) {
64                 ctx.LogD("tx", SDS{
65                         "node": hops[0].Id,
66                         "nice": strconv.Itoa(int(nice)),
67                         "size": strconv.FormatInt(size, 10),
68                 }, "wrote")
69                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
70                 dst.Close()
71         }(curSize, src, pipeW)
72         curSize += padSize
73
74         var pipeRPrev io.Reader
75         for i := 1; i < len(hops); i++ {
76                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
77                 curSize += PktOverhead + PktEncOverhead
78                 pipeRPrev = pipeR
79                 pipeR, pipeW = io.Pipe()
80                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
81                         ctx.LogD("tx", SDS{
82                                 "node": node.Id,
83                                 "nice": strconv.Itoa(int(nice)),
84                                 "size": strconv.FormatInt(size, 10),
85                         }, "trns wrote")
86                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
87                         dst.Close()
88                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
89         }
90         go func() {
91                 _, err := io.Copy(tmp.W, pipeR)
92                 errs <- err
93         }()
94         for i := 0; i <= len(hops); i++ {
95                 err = <-errs
96                 if err != nil {
97                         tmp.Fd.Close()
98                         return nil, err
99                 }
100         }
101         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
102         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
103         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
104         return lastNode, err
105 }
106
107 func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) {
108         var reader io.Reader
109         var src *os.File
110         var fileSize int64
111         var err error
112         if srcPath == "-" {
113                 src, err = ioutil.TempFile("", "nncp-file")
114                 if err != nil {
115                         return nil, nil, 0, err
116                 }
117                 os.Remove(src.Name())
118                 tmpW := bufio.NewWriter(src)
119                 tmpKey := new([32]byte)
120                 if _, err = rand.Read(tmpKey[:]); err != nil {
121                         return nil, nil, 0, err
122                 }
123                 written, err := ae(tmpKey, bufio.NewReader(os.Stdin), tmpW)
124                 if err != nil {
125                         return nil, nil, 0, err
126                 }
127                 fileSize = int64(written)
128                 tmpW.Flush()
129                 src.Seek(0, 0)
130                 r, w := io.Pipe()
131                 go ae(tmpKey, bufio.NewReader(src), w)
132                 reader = r
133         } else {
134                 src, err = os.Open(srcPath)
135                 if err != nil {
136                         return nil, nil, 0, err
137                 }
138                 srcStat, err := src.Stat()
139                 if err != nil {
140                         return nil, nil, 0, err
141                 }
142                 fileSize = srcStat.Size()
143                 reader = bufio.NewReader(src)
144         }
145         return reader, src, fileSize, nil
146 }
147
148 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
149         if dstPath == "" {
150                 if srcPath == "-" {
151                         return errors.New("Must provide destination filename")
152                 }
153                 dstPath = filepath.Base(srcPath)
154         }
155         dstPath = filepath.Clean(dstPath)
156         if filepath.IsAbs(dstPath) {
157                 return errors.New("Relative destination path required")
158         }
159         pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
160         if err != nil {
161                 return err
162         }
163         reader, src, fileSize, err := prepareTxFile(srcPath)
164         if src != nil {
165                 defer src.Close()
166         }
167         if err != nil {
168                 return err
169         }
170         _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
171         sds := SDS{
172                 "type": "file",
173                 "node": node.Id,
174                 "nice": strconv.Itoa(int(nice)),
175                 "src":  srcPath,
176                 "dst":  dstPath,
177                 "size": strconv.FormatInt(fileSize, 10),
178         }
179         if err == nil {
180                 ctx.LogI("tx", sds, "sent")
181         } else {
182                 sds["err"] = err
183                 ctx.LogE("tx", sds, "sent")
184         }
185         return err
186 }
187
188 func (ctx *Ctx) TxFileChunked(
189         node *Node,
190         nice uint8,
191         srcPath, dstPath string,
192         minSize int64,
193         chunkSize int64) error {
194         if dstPath == "" {
195                 if srcPath == "-" {
196                         return errors.New("Must provide destination filename")
197                 }
198                 dstPath = filepath.Base(srcPath)
199         }
200         dstPath = filepath.Clean(dstPath)
201         if filepath.IsAbs(dstPath) {
202                 return errors.New("Relative destination path required")
203         }
204         reader, src, fileSize, err := prepareTxFile(srcPath)
205         if src != nil {
206                 defer src.Close()
207         }
208         if err != nil {
209                 return err
210         }
211
212         if fileSize <= chunkSize {
213                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
214                 if err != nil {
215                         return err
216                 }
217                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
218                 sds := SDS{
219                         "type": "file",
220                         "node": node.Id,
221                         "nice": strconv.Itoa(int(nice)),
222                         "src":  srcPath,
223                         "dst":  dstPath,
224                         "size": strconv.FormatInt(fileSize, 10),
225                 }
226                 if err == nil {
227                         ctx.LogI("tx", sds, "sent")
228                 } else {
229                         sds["err"] = err
230                         ctx.LogE("tx", sds, "sent")
231                 }
232                 return err
233         }
234
235         leftSize := fileSize
236         metaPkt := ChunkedMeta{
237                 Magic:     MagicNNCPMv1,
238                 FileSize:  uint64(fileSize),
239                 ChunkSize: uint64(chunkSize),
240                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
241         }
242         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
243                 hsh := new([32]byte)
244                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
245         }
246         var sizeToSend int64
247         var hsh hash.Hash
248         var pkt *Pkt
249         var chunkNum int
250         var path string
251         for {
252                 if leftSize <= chunkSize {
253                         sizeToSend = leftSize
254                 } else {
255                         sizeToSend = chunkSize
256                 }
257                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
258                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
259                 if err != nil {
260                         return err
261                 }
262                 hsh, err = blake2b.New256(nil)
263                 if err != nil {
264                         return err
265                 }
266                 _, err = ctx.Tx(
267                         node,
268                         pkt,
269                         nice,
270                         sizeToSend,
271                         minSize,
272                         io.TeeReader(reader, hsh),
273                 )
274                 sds := SDS{
275                         "type": "file",
276                         "node": node.Id,
277                         "nice": strconv.Itoa(int(nice)),
278                         "src":  srcPath,
279                         "dst":  path,
280                         "size": strconv.FormatInt(sizeToSend, 10),
281                 }
282                 if err == nil {
283                         ctx.LogI("tx", sds, "sent")
284                 } else {
285                         sds["err"] = err
286                         ctx.LogE("tx", sds, "sent")
287                         return err
288                 }
289                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
290                 leftSize -= sizeToSend
291                 chunkNum++
292                 if leftSize == 0 {
293                         break
294                 }
295         }
296         var metaBuf bytes.Buffer
297         _, err = xdr.Marshal(&metaBuf, metaPkt)
298         if err != nil {
299                 return err
300         }
301         path = dstPath + ChunkedSuffixMeta
302         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
303         if err != nil {
304                 return err
305         }
306         metaPktSize := int64(metaBuf.Len())
307         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
308         sds := SDS{
309                 "type": "file",
310                 "node": node.Id,
311                 "nice": strconv.Itoa(int(nice)),
312                 "src":  srcPath,
313                 "dst":  path,
314                 "size": strconv.FormatInt(metaPktSize, 10),
315         }
316         if err == nil {
317                 ctx.LogI("tx", sds, "sent")
318         } else {
319                 sds["err"] = err
320                 ctx.LogE("tx", sds, "sent")
321         }
322         return err
323 }
324
325 func (ctx *Ctx) TxFreq(
326         node *Node,
327         nice, replyNice uint8,
328         srcPath, dstPath string,
329         minSize int64) error {
330         dstPath = filepath.Clean(dstPath)
331         if filepath.IsAbs(dstPath) {
332                 return errors.New("Relative destination path required")
333         }
334         srcPath = filepath.Clean(srcPath)
335         if filepath.IsAbs(srcPath) {
336                 return errors.New("Relative source path required")
337         }
338         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
339         if err != nil {
340                 return err
341         }
342         src := strings.NewReader(dstPath)
343         size := int64(src.Len())
344         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
345         sds := SDS{
346                 "type":      "freq",
347                 "node":      node.Id,
348                 "nice":      strconv.Itoa(int(nice)),
349                 "replynice": strconv.Itoa(int(replyNice)),
350                 "src":       srcPath,
351                 "dst":       dstPath,
352         }
353         if err == nil {
354                 ctx.LogI("tx", sds, "sent")
355         } else {
356                 sds["err"] = err
357                 ctx.LogE("tx", sds, "sent")
358         }
359         return err
360 }
361
362 func (ctx *Ctx) TxExec(
363         node *Node,
364         nice, replyNice uint8,
365         handle string,
366         args []string,
367         body []byte,
368         minSize int64) error {
369         path := make([][]byte, 0, 1+len(args))
370         path = append(path, []byte(handle))
371         for _, arg := range args {
372                 path = append(path, []byte(arg))
373         }
374         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
375         if err != nil {
376                 return err
377         }
378         var compressed bytes.Buffer
379         compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
380         if err != nil {
381                 return err
382         }
383         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
384                 return err
385         }
386         compressor.Close()
387         size := int64(compressed.Len())
388         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
389         sds := SDS{
390                 "type":      "exec",
391                 "node":      node.Id,
392                 "nice":      strconv.Itoa(int(nice)),
393                 "replynice": strconv.Itoa(int(replyNice)),
394                 "dst":       strings.Join(append([]string{handle}, args...), " "),
395                 "size":      strconv.FormatInt(size, 10),
396         }
397         if err == nil {
398                 ctx.LogI("tx", sds, "sent")
399         } else {
400                 sds["err"] = err
401                 ctx.LogE("tx", sds, "sent")
402         }
403         return err
404 }
405
406 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
407         sds := SDS{
408                 "type": "trns",
409                 "node": node.Id,
410                 "nice": strconv.Itoa(int(nice)),
411                 "size": strconv.FormatInt(size, 10),
412         }
413         ctx.LogD("tx", sds, "taken")
414         tmp, err := ctx.NewTmpFileWHash()
415         if err != nil {
416                 return err
417         }
418         if _, err = io.Copy(tmp.W, src); err != nil {
419                 return err
420         }
421         nodePath := filepath.Join(ctx.Spool, node.Id.String())
422         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
423         if err == nil {
424                 ctx.LogI("tx", sds, "sent")
425         } else {
426                 sds["err"] = err
427                 ctx.LogI("tx", sds, "sent")
428         }
429         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
430         return err
431 }