]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
Remote command execution
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "crypto/rand"
26         "errors"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34
35         "github.com/davecgh/go-xdr/xdr2"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 func (ctx *Ctx) Tx(node *Node, pkt *Pkt, nice uint8, size, minSize int64, src io.Reader) (*Node, error) {
40         tmp, err := ctx.NewTmpFileWHash()
41         if err != nil {
42                 return nil, err
43         }
44         hops := make([]*Node, 0, 1+len(node.Via))
45         hops = append(hops, node)
46         lastNode := node
47         for i := len(node.Via); i > 0; i-- {
48                 lastNode = ctx.Neigh[*node.Via[i-1]]
49                 hops = append(hops, lastNode)
50         }
51         padSize := minSize - size - int64(len(hops))*(PktOverhead+PktEncOverhead)
52         if padSize < 0 {
53                 padSize = 0
54         }
55         errs := make(chan error)
56         curSize := size
57         pipeR, pipeW := io.Pipe()
58         go func(size int64, src io.Reader, dst io.WriteCloser) {
59                 ctx.LogD("tx", SDS{
60                         "node": hops[0].Id,
61                         "nice": strconv.Itoa(int(nice)),
62                         "size": strconv.FormatInt(size, 10),
63                 }, "wrote")
64                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
65                 dst.Close()
66         }(curSize, src, pipeW)
67         curSize += padSize
68
69         var pipeRPrev io.Reader
70         for i := 1; i < len(hops); i++ {
71                 pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
72                 curSize += PktOverhead + PktEncOverhead
73                 pipeRPrev = pipeR
74                 pipeR, pipeW = io.Pipe()
75                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
76                         ctx.LogD("tx", SDS{
77                                 "node": node.Id,
78                                 "nice": strconv.Itoa(int(nice)),
79                                 "size": strconv.FormatInt(size, 10),
80                         }, "trns wrote")
81                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
82                         dst.Close()
83                 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
84         }
85         go func() {
86                 _, err := io.Copy(tmp.W, pipeR)
87                 errs <- err
88         }()
89         for i := 0; i <= len(hops); i++ {
90                 err = <-errs
91                 if err != nil {
92                         tmp.Fd.Close()
93                         return nil, err
94                 }
95         }
96         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
97         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
98         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
99         return lastNode, err
100 }
101
102 func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) {
103         var reader io.Reader
104         var src *os.File
105         var fileSize int64
106         var err error
107         if srcPath == "-" {
108                 src, err = ioutil.TempFile("", "nncp-file")
109                 if err != nil {
110                         return nil, nil, 0, err
111                 }
112                 os.Remove(src.Name())
113                 tmpW := bufio.NewWriter(src)
114                 tmpKey := new([32]byte)
115                 if _, err = rand.Read(tmpKey[:]); err != nil {
116                         return nil, nil, 0, err
117                 }
118                 written, err := ae(tmpKey, bufio.NewReader(os.Stdin), tmpW)
119                 if err != nil {
120                         return nil, nil, 0, err
121                 }
122                 fileSize = int64(written)
123                 tmpW.Flush()
124                 src.Seek(0, 0)
125                 r, w := io.Pipe()
126                 go ae(tmpKey, bufio.NewReader(src), w)
127                 reader = r
128         } else {
129                 src, err = os.Open(srcPath)
130                 if err != nil {
131                         return nil, nil, 0, err
132                 }
133                 srcStat, err := src.Stat()
134                 if err != nil {
135                         return nil, nil, 0, err
136                 }
137                 fileSize = srcStat.Size()
138                 reader = bufio.NewReader(src)
139         }
140         return reader, src, fileSize, nil
141 }
142
143 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
144         if dstPath == "" {
145                 if srcPath == "-" {
146                         return errors.New("Must provide destination filename")
147                 }
148                 dstPath = filepath.Base(srcPath)
149         }
150         dstPath = filepath.Clean(dstPath)
151         if filepath.IsAbs(dstPath) {
152                 return errors.New("Relative destination path required")
153         }
154         pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
155         if err != nil {
156                 return err
157         }
158         reader, src, fileSize, err := prepareTxFile(srcPath)
159         if src != nil {
160                 defer src.Close()
161         }
162         if err != nil {
163                 return err
164         }
165         _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
166         sds := SDS{
167                 "type": "file",
168                 "node": node.Id,
169                 "nice": strconv.Itoa(int(nice)),
170                 "src":  srcPath,
171                 "dst":  dstPath,
172                 "size": strconv.FormatInt(fileSize, 10),
173         }
174         if err == nil {
175                 ctx.LogI("tx", sds, "sent")
176         } else {
177                 sds["err"] = err
178                 ctx.LogE("tx", sds, "sent")
179         }
180         return err
181 }
182
183 func (ctx *Ctx) TxFileChunked(node *Node, nice uint8, srcPath, dstPath string, minSize int64, chunkSize int64) error {
184         if dstPath == "" {
185                 if srcPath == "-" {
186                         return errors.New("Must provide destination filename")
187                 }
188                 dstPath = filepath.Base(srcPath)
189         }
190         dstPath = filepath.Clean(dstPath)
191         if filepath.IsAbs(dstPath) {
192                 return errors.New("Relative destination path required")
193         }
194         reader, src, fileSize, err := prepareTxFile(srcPath)
195         if src != nil {
196                 defer src.Close()
197         }
198         if err != nil {
199                 return err
200         }
201
202         if fileSize <= chunkSize {
203                 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
204                 if err != nil {
205                         return err
206                 }
207                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
208                 sds := SDS{
209                         "type": "file",
210                         "node": node.Id,
211                         "nice": strconv.Itoa(int(nice)),
212                         "src":  srcPath,
213                         "dst":  dstPath,
214                         "size": strconv.FormatInt(fileSize, 10),
215                 }
216                 if err == nil {
217                         ctx.LogI("tx", sds, "sent")
218                 } else {
219                         sds["err"] = err
220                         ctx.LogE("tx", sds, "sent")
221                 }
222                 return err
223         }
224
225         leftSize := fileSize
226         metaPkt := ChunkedMeta{
227                 Magic:     MagicNNCPMv1,
228                 FileSize:  uint64(fileSize),
229                 ChunkSize: uint64(chunkSize),
230                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
231         }
232         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
233                 hsh := new([32]byte)
234                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
235         }
236         var sizeToSend int64
237         var hsh hash.Hash
238         var pkt *Pkt
239         var chunkNum int
240         var path string
241         for {
242                 if leftSize <= chunkSize {
243                         sizeToSend = leftSize
244                 } else {
245                         sizeToSend = chunkSize
246                 }
247                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
248                 pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
249                 if err != nil {
250                         return err
251                 }
252                 hsh, err = blake2b.New256(nil)
253                 if err != nil {
254                         return err
255                 }
256                 _, err = ctx.Tx(
257                         node,
258                         pkt,
259                         nice,
260                         sizeToSend,
261                         minSize,
262                         io.TeeReader(reader, hsh),
263                 )
264                 sds := SDS{
265                         "type": "file",
266                         "node": node.Id,
267                         "nice": strconv.Itoa(int(nice)),
268                         "src":  srcPath,
269                         "dst":  path,
270                         "size": strconv.FormatInt(sizeToSend, 10),
271                 }
272                 if err == nil {
273                         ctx.LogI("tx", sds, "sent")
274                 } else {
275                         sds["err"] = err
276                         ctx.LogE("tx", sds, "sent")
277                         return err
278                 }
279                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
280                 leftSize -= sizeToSend
281                 chunkNum++
282                 if leftSize == 0 {
283                         break
284                 }
285         }
286         var metaBuf bytes.Buffer
287         _, err = xdr.Marshal(&metaBuf, metaPkt)
288         if err != nil {
289                 return err
290         }
291         path = dstPath + ChunkedSuffixMeta
292         pkt, err = NewPkt(PktTypeFile, nice, []byte(path))
293         if err != nil {
294                 return err
295         }
296         metaPktSize := int64(metaBuf.Len())
297         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
298         sds := SDS{
299                 "type": "file",
300                 "node": node.Id,
301                 "nice": strconv.Itoa(int(nice)),
302                 "src":  srcPath,
303                 "dst":  path,
304                 "size": strconv.FormatInt(metaPktSize, 10),
305         }
306         if err == nil {
307                 ctx.LogI("tx", sds, "sent")
308         } else {
309                 sds["err"] = err
310                 ctx.LogE("tx", sds, "sent")
311         }
312         return err
313 }
314
315 func (ctx *Ctx) TxFreq(node *Node, nice, replyNice uint8, srcPath, dstPath string, minSize int64) error {
316         dstPath = filepath.Clean(dstPath)
317         if filepath.IsAbs(dstPath) {
318                 return errors.New("Relative destination path required")
319         }
320         srcPath = filepath.Clean(srcPath)
321         if filepath.IsAbs(srcPath) {
322                 return errors.New("Relative source path required")
323         }
324         pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
325         if err != nil {
326                 return err
327         }
328         src := strings.NewReader(dstPath)
329         size := int64(src.Len())
330         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
331         sds := SDS{
332                 "type":      "freq",
333                 "node":      node.Id,
334                 "nice":      strconv.Itoa(int(nice)),
335                 "replynice": strconv.Itoa(int(replyNice)),
336                 "src":       srcPath,
337                 "dst":       dstPath,
338         }
339         if err == nil {
340                 ctx.LogI("tx", sds, "sent")
341         } else {
342                 sds["err"] = err
343                 ctx.LogE("tx", sds, "sent")
344         }
345         return err
346 }
347
348 func (ctx *Ctx) TxExec(node *Node, nice, replyNice uint8, handle string, args []string, body []byte, minSize int64) error {
349         path := make([][]byte, 0, 1+len(args))
350         path = append(path, []byte(handle))
351         for _, arg := range args {
352                 path = append(path, []byte(arg))
353         }
354         pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0}))
355         if err != nil {
356                 return err
357         }
358         var compressed bytes.Buffer
359         compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
360         if err != nil {
361                 return err
362         }
363         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
364                 return err
365         }
366         compressor.Close()
367         size := int64(compressed.Len())
368         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
369         sds := SDS{
370                 "type":      "exec",
371                 "node":      node.Id,
372                 "nice":      strconv.Itoa(int(nice)),
373                 "replynice": strconv.Itoa(int(replyNice)),
374                 "dst":       strings.Join(append([]string{handle}, args...), " "),
375                 "size":      strconv.FormatInt(size, 10),
376         }
377         if err == nil {
378                 ctx.LogI("tx", sds, "sent")
379         } else {
380                 sds["err"] = err
381                 ctx.LogE("tx", sds, "sent")
382         }
383         return err
384 }
385
386 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
387         sds := SDS{
388                 "type": "trns",
389                 "node": node.Id,
390                 "nice": strconv.Itoa(int(nice)),
391                 "size": strconv.FormatInt(size, 10),
392         }
393         ctx.LogD("tx", sds, "taken")
394         tmp, err := ctx.NewTmpFileWHash()
395         if err != nil {
396                 return err
397         }
398         if _, err = io.Copy(tmp.W, src); err != nil {
399                 return err
400         }
401         nodePath := filepath.Join(ctx.Spool, node.Id.String())
402         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
403         if err == nil {
404                 ctx.LogI("tx", sds, "sent")
405         } else {
406                 sds["err"] = err
407                 ctx.LogI("tx", sds, "sent")
408         }
409         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
410         return err
411 }