]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
a7e8b15eadb6215f6551b7dcdd59490bae5eb0a7
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "crypto/rand"
26         "errors"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34
35         "github.com/davecgh/go-xdr/xdr2"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 func (ctx *Ctx) Tx(node *Node, pkt *Pkt, nice uint8, size, minSize int64, src io.Reader) (*Node, error) {
40         tmp, err := ctx.NewTmpFileWHash()
41         if err != nil {
42                 return nil, err
43         }
44         hops := make([]*Node, 0, 1+len(node.Via))
45         hops = append(hops, node)
46         lastNode := node
47         for i := len(node.Via); i > 0; i-- {
48                 lastNode = ctx.Neigh[*node.Via[i-1]]
49                 hops = append(hops, lastNode)
50         }
51         padSize := minSize - size - int64(len(hops))*(PktOverhead+PktEncOverhead)
52         if padSize < 0 {
53                 padSize = 0
54         }
55         errs := make(chan error)
56         curSize := size
57         pipeR, pipeW := io.Pipe()
58         go func(size int64, src io.Reader, dst io.WriteCloser) {
59                 ctx.LogD("tx", SDS{
60                         "node": hops[0].Id,
61                         "nice": strconv.Itoa(int(nice)),
62                         "size": strconv.FormatInt(size, 10),
63                 }, "wrote")
64                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
65                 dst.Close()
66         }(curSize, src, pipeW)
67         curSize += padSize
68
69         var pipeRPrev io.Reader
70         for i := 1; i < len(hops); i++ {
71                 pktTrans := Pkt{
72                         Magic:   MagicNNCPPv2,
73                         Type:    PktTypeTrns,
74                         PathLen: blake2b.Size256,
75                         Path:    new([MaxPathSize]byte),
76                 }
77                 copy(pktTrans.Path[:], hops[i-1].Id[:])
78                 curSize += PktOverhead + PktEncOverhead
79                 pipeRPrev = pipeR
80                 pipeR, pipeW = io.Pipe()
81                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
82                         ctx.LogD("tx", SDS{
83                                 "node": node.Id,
84                                 "nice": strconv.Itoa(int(nice)),
85                                 "size": strconv.FormatInt(size, 10),
86                         }, "trns wrote")
87                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
88                         dst.Close()
89                 }(hops[i], &pktTrans, curSize, pipeRPrev, pipeW)
90         }
91         go func() {
92                 _, err := io.Copy(tmp.W, pipeR)
93                 errs <- err
94         }()
95         for i := 0; i <= len(hops); i++ {
96                 err = <-errs
97                 if err != nil {
98                         tmp.Fd.Close()
99                         return nil, err
100                 }
101         }
102         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
103         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
104         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
105         return lastNode, err
106 }
107
108 func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) {
109         var reader io.Reader
110         var src *os.File
111         var fileSize int64
112         var err error
113         if srcPath == "-" {
114                 src, err = ioutil.TempFile("", "nncp-file")
115                 if err != nil {
116                         return nil, nil, 0, err
117                 }
118                 os.Remove(src.Name())
119                 tmpW := bufio.NewWriter(src)
120                 tmpKey := new([32]byte)
121                 if _, err = rand.Read(tmpKey[:]); err != nil {
122                         return nil, nil, 0, err
123                 }
124                 written, err := ae(tmpKey, bufio.NewReader(os.Stdin), tmpW)
125                 if err != nil {
126                         return nil, nil, 0, err
127                 }
128                 fileSize = int64(written)
129                 tmpW.Flush()
130                 src.Seek(0, 0)
131                 r, w := io.Pipe()
132                 go ae(tmpKey, bufio.NewReader(src), w)
133                 reader = r
134         } else {
135                 src, err = os.Open(srcPath)
136                 if err != nil {
137                         return nil, nil, 0, err
138                 }
139                 srcStat, err := src.Stat()
140                 if err != nil {
141                         return nil, nil, 0, err
142                 }
143                 fileSize = srcStat.Size()
144                 reader = bufio.NewReader(src)
145         }
146         return reader, src, fileSize, nil
147 }
148
149 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
150         if dstPath == "" {
151                 if srcPath == "-" {
152                         return errors.New("Must provide destination filename")
153                 }
154                 dstPath = filepath.Base(srcPath)
155         }
156         dstPath = filepath.Clean(dstPath)
157         if filepath.IsAbs(dstPath) {
158                 return errors.New("Relative destination path required")
159         }
160         pkt, err := NewPkt(PktTypeFile, nice, dstPath)
161         if err != nil {
162                 return err
163         }
164         reader, src, fileSize, err := prepareTxFile(srcPath)
165         if src != nil {
166                 defer src.Close()
167         }
168         if err != nil {
169                 return err
170         }
171         _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
172         if err == nil {
173                 ctx.LogI("tx", SDS{
174                         "type": "file",
175                         "node": node.Id,
176                         "nice": strconv.Itoa(int(nice)),
177                         "src":  srcPath,
178                         "dst":  dstPath,
179                         "size": strconv.FormatInt(fileSize, 10),
180                 }, "sent")
181         } else {
182                 ctx.LogE("tx", SDS{
183                         "type": "file",
184                         "node": node.Id,
185                         "nice": strconv.Itoa(int(nice)),
186                         "src":  srcPath,
187                         "dst":  dstPath,
188                         "size": strconv.FormatInt(fileSize, 10),
189                         "err":  err,
190                 }, "sent")
191         }
192         return err
193 }
194
195 func (ctx *Ctx) TxFileChunked(node *Node, nice uint8, srcPath, dstPath string, minSize int64, chunkSize int64) error {
196         if dstPath == "" {
197                 if srcPath == "-" {
198                         return errors.New("Must provide destination filename")
199                 }
200                 dstPath = filepath.Base(srcPath)
201         }
202         dstPath = filepath.Clean(dstPath)
203         if filepath.IsAbs(dstPath) {
204                 return errors.New("Relative destination path required")
205         }
206         reader, src, fileSize, err := prepareTxFile(srcPath)
207         if src != nil {
208                 defer src.Close()
209         }
210         if err != nil {
211                 return err
212         }
213
214         if fileSize <= chunkSize {
215                 pkt, err := NewPkt(PktTypeFile, nice, dstPath)
216                 if err != nil {
217                         return err
218                 }
219                 _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
220                 if err == nil {
221                         ctx.LogI("tx", SDS{
222                                 "type": "file",
223                                 "node": node.Id,
224                                 "nice": strconv.Itoa(int(nice)),
225                                 "src":  srcPath,
226                                 "dst":  dstPath,
227                                 "size": strconv.FormatInt(fileSize, 10),
228                         }, "sent")
229                 } else {
230                         ctx.LogE("tx", SDS{
231                                 "type": "file",
232                                 "node": node.Id,
233                                 "nice": strconv.Itoa(int(nice)),
234                                 "src":  srcPath,
235                                 "dst":  dstPath,
236                                 "size": strconv.FormatInt(fileSize, 10),
237                                 "err":  err,
238                         }, "sent")
239                 }
240                 return err
241         }
242
243         leftSize := fileSize
244         metaPkt := ChunkedMeta{
245                 Magic:     MagicNNCPMv1,
246                 FileSize:  uint64(fileSize),
247                 ChunkSize: uint64(chunkSize),
248                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
249         }
250         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
251                 hsh := new([32]byte)
252                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
253         }
254         var sizeToSend int64
255         var hsh hash.Hash
256         var pkt *Pkt
257         var chunkNum int
258         var path string
259         for {
260                 if leftSize <= chunkSize {
261                         sizeToSend = leftSize
262                 } else {
263                         sizeToSend = chunkSize
264                 }
265                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
266                 pkt, err = NewPkt(PktTypeFile, nice, path)
267                 if err != nil {
268                         return err
269                 }
270                 hsh, err = blake2b.New256(nil)
271                 if err != nil {
272                         return err
273                 }
274                 _, err = ctx.Tx(
275                         node,
276                         pkt,
277                         nice,
278                         sizeToSend,
279                         minSize,
280                         io.TeeReader(reader, hsh),
281                 )
282                 if err == nil {
283                         ctx.LogI("tx", SDS{
284                                 "type": "file",
285                                 "node": node.Id,
286                                 "nice": strconv.Itoa(int(nice)),
287                                 "src":  srcPath,
288                                 "dst":  path,
289                                 "size": strconv.FormatInt(sizeToSend, 10),
290                         }, "sent")
291                 } else {
292                         ctx.LogE("tx", SDS{
293                                 "type": "file",
294                                 "node": node.Id,
295                                 "nice": strconv.Itoa(int(nice)),
296                                 "src":  srcPath,
297                                 "dst":  path,
298                                 "size": strconv.FormatInt(sizeToSend, 10),
299                                 "err":  err,
300                         }, "sent")
301                         return err
302                 }
303                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
304                 leftSize -= sizeToSend
305                 chunkNum++
306                 if leftSize == 0 {
307                         break
308                 }
309         }
310         var metaBuf bytes.Buffer
311         _, err = xdr.Marshal(&metaBuf, metaPkt)
312         if err != nil {
313                 return err
314         }
315         path = dstPath + ChunkedSuffixMeta
316         pkt, err = NewPkt(PktTypeFile, nice, path)
317         if err != nil {
318                 return err
319         }
320         metaPktSize := int64(metaBuf.Len())
321         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
322         if err == nil {
323                 ctx.LogI("tx", SDS{
324                         "type": "file",
325                         "node": node.Id,
326                         "nice": strconv.Itoa(int(nice)),
327                         "src":  srcPath,
328                         "dst":  path,
329                         "size": strconv.FormatInt(metaPktSize, 10),
330                 }, "sent")
331         } else {
332                 ctx.LogE("tx", SDS{
333                         "type": "file",
334                         "node": node.Id,
335                         "nice": strconv.Itoa(int(nice)),
336                         "src":  srcPath,
337                         "dst":  path,
338                         "size": strconv.FormatInt(metaPktSize, 10),
339                         "err":  err,
340                 }, "sent")
341         }
342         return err
343 }
344
345 func (ctx *Ctx) TxFreq(node *Node, nice, replyNice uint8, srcPath, dstPath string, minSize int64) error {
346         dstPath = filepath.Clean(dstPath)
347         if filepath.IsAbs(dstPath) {
348                 return errors.New("Relative destination path required")
349         }
350         srcPath = filepath.Clean(srcPath)
351         if filepath.IsAbs(srcPath) {
352                 return errors.New("Relative source path required")
353         }
354         pkt, err := NewPkt(PktTypeFreq, replyNice, srcPath)
355         if err != nil {
356                 return err
357         }
358         src := strings.NewReader(dstPath)
359         size := int64(src.Len())
360         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
361         if err == nil {
362                 ctx.LogI("tx", SDS{
363                         "type": "freq",
364                         "node": node.Id,
365                         "nice": strconv.Itoa(int(nice)),
366                         "src":  srcPath,
367                         "dst":  dstPath,
368                 }, "sent")
369         } else {
370                 ctx.LogE("tx", SDS{
371                         "type": "freq",
372                         "node": node.Id,
373                         "nice": strconv.Itoa(int(nice)),
374                         "src":  srcPath,
375                         "dst":  dstPath,
376                         "err":  err,
377                 }, "sent")
378         }
379         return err
380 }
381
382 func (ctx *Ctx) TxMail(node *Node, nice uint8, recipient string, body []byte, minSize int64) error {
383         pkt, err := NewPkt(PktTypeMail, nice, recipient)
384         if err != nil {
385                 return err
386         }
387         var compressed bytes.Buffer
388         compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
389         if err != nil {
390                 return err
391         }
392         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
393                 return err
394         }
395         compressor.Close()
396         size := int64(compressed.Len())
397         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
398         if err == nil {
399                 ctx.LogI("tx", SDS{
400                         "type": "mail",
401                         "node": node.Id,
402                         "nice": strconv.Itoa(int(nice)),
403                         "dst":  recipient,
404                         "size": strconv.FormatInt(size, 10),
405                 }, "sent")
406         } else {
407                 ctx.LogE("tx", SDS{
408                         "type": "mail",
409                         "node": node.Id,
410                         "nice": strconv.Itoa(int(nice)),
411                         "dst":  recipient,
412                         "size": strconv.FormatInt(size, 10),
413                         "err":  err,
414                 }, "sent")
415         }
416         return err
417 }
418
419 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
420         ctx.LogD("tx", SDS{
421                 "type": "trns",
422                 "node": node.Id,
423                 "nice": strconv.Itoa(int(nice)),
424                 "size": strconv.FormatInt(size, 10),
425         }, "taken")
426         tmp, err := ctx.NewTmpFileWHash()
427         if err != nil {
428                 return err
429         }
430         if _, err = io.Copy(tmp.W, src); err != nil {
431                 return err
432         }
433         nodePath := filepath.Join(ctx.Spool, node.Id.String())
434         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
435         if err == nil {
436                 ctx.LogI("tx", SDS{
437                         "type": "trns",
438                         "node": node.Id,
439                         "nice": strconv.Itoa(int(nice)),
440                         "size": strconv.FormatInt(size, 10),
441                 }, "sent")
442         } else {
443                 ctx.LogI("tx", SDS{
444                         "type": "trns",
445                         "node": node.Id,
446                         "nice": strconv.Itoa(int(nice)),
447                         "size": strconv.FormatInt(size, 10),
448                         "err":  err,
449                 }, "sent")
450         }
451         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
452         return err
453 }