]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
Replace Twofish/HKDF with ChaCha20/BLAKE2X for speed and simplicity
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "crypto/rand"
26         "errors"
27         "hash"
28         "io"
29         "io/ioutil"
30         "os"
31         "path/filepath"
32         "strconv"
33         "strings"
34
35         "github.com/davecgh/go-xdr/xdr2"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 func (ctx *Ctx) Tx(node *Node, pkt *Pkt, nice uint8, size, minSize int64, src io.Reader) (*Node, error) {
40         tmp, err := ctx.NewTmpFileWHash()
41         if err != nil {
42                 return nil, err
43         }
44         hops := make([]*Node, 0, 1+len(node.Via))
45         hops = append(hops, node)
46         lastNode := node
47         for i := len(node.Via); i > 0; i-- {
48                 lastNode = ctx.Neigh[*node.Via[i-1]]
49                 hops = append(hops, lastNode)
50         }
51         padSize := minSize - size - int64(len(hops))*(PktOverhead+PktEncOverhead)
52         if padSize < 0 {
53                 padSize = 0
54         }
55         errs := make(chan error)
56         curSize := size
57         pipeR, pipeW := io.Pipe()
58         go func(size int64, src io.Reader, dst io.WriteCloser) {
59                 ctx.LogD("tx", SDS{
60                         "node": hops[0].Id,
61                         "nice": strconv.Itoa(int(nice)),
62                         "size": strconv.FormatInt(size, 10),
63                 }, "wrote")
64                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
65                 dst.Close()
66         }(curSize, src, pipeW)
67         curSize += padSize
68
69         var pipeRPrev io.Reader
70         for i := 1; i < len(hops); i++ {
71                 pktTrans := Pkt{
72                         Magic:   MagicNNCPPv1,
73                         Type:    PktTypeTrns,
74                         PathLen: blake2b.Size256,
75                         Path:    new([MaxPathSize]byte),
76                 }
77                 copy(pktTrans.Path[:], hops[i-1].Id[:])
78                 curSize += PktOverhead + PktEncOverhead
79                 pipeRPrev = pipeR
80                 pipeR, pipeW = io.Pipe()
81                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
82                         ctx.LogD("tx", SDS{
83                                 "node": node.Id,
84                                 "nice": strconv.Itoa(int(nice)),
85                                 "size": strconv.FormatInt(size, 10),
86                         }, "trns wrote")
87                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
88                         dst.Close()
89                 }(hops[i], &pktTrans, curSize, pipeRPrev, pipeW)
90         }
91         go func() {
92                 _, err := io.Copy(tmp.W, pipeR)
93                 errs <- err
94         }()
95         for i := 0; i <= len(hops); i++ {
96                 err = <-errs
97                 if err != nil {
98                         tmp.Fd.Close()
99                         return nil, err
100                 }
101         }
102         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
103         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
104         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
105         return lastNode, err
106 }
107
108 func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) {
109         var reader io.Reader
110         var src *os.File
111         var fileSize int64
112         var err error
113         if srcPath == "-" {
114                 src, err = ioutil.TempFile("", "nncp-file")
115                 if err != nil {
116                         return nil, nil, 0, err
117                 }
118                 os.Remove(src.Name())
119                 tmpW := bufio.NewWriter(src)
120                 tmpKey := new([32]byte)
121                 if _, err = rand.Read(tmpKey[:]); err != nil {
122                         return nil, nil, 0, err
123                 }
124                 written, err := ae(tmpKey, bufio.NewReader(os.Stdin), tmpW)
125                 if err != nil {
126                         return nil, nil, 0, err
127                 }
128                 fileSize = int64(written)
129                 tmpW.Flush()
130                 src.Seek(0, 0)
131                 r, w := io.Pipe()
132                 go ae(tmpKey, bufio.NewReader(src), w)
133                 reader = r
134         } else {
135                 src, err = os.Open(srcPath)
136                 if err != nil {
137                         return nil, nil, 0, err
138                 }
139                 srcStat, err := src.Stat()
140                 if err != nil {
141                         return nil, nil, 0, err
142                 }
143                 fileSize = srcStat.Size()
144                 reader = bufio.NewReader(src)
145         }
146         return reader, src, fileSize, nil
147 }
148
149 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
150         if dstPath == "" {
151                 if srcPath == "-" {
152                         return errors.New("Must provide destination filename")
153                 }
154                 dstPath = filepath.Base(srcPath)
155         }
156         dstPath = filepath.Clean(dstPath)
157         if filepath.IsAbs(dstPath) {
158                 return errors.New("Relative destination path required")
159         }
160         pkt, err := NewPkt(PktTypeFile, dstPath)
161         if err != nil {
162                 return err
163         }
164         reader, src, fileSize, err := prepareTxFile(srcPath)
165         if src != nil {
166                 defer src.Close()
167         }
168         if err != nil {
169                 return err
170         }
171         _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
172         if err == nil {
173                 ctx.LogI("tx", SDS{
174                         "type": "file",
175                         "node": node.Id,
176                         "nice": strconv.Itoa(int(nice)),
177                         "src":  srcPath,
178                         "dst":  dstPath,
179                         "size": strconv.FormatInt(fileSize, 10),
180                 }, "sent")
181         } else {
182                 ctx.LogE("tx", SDS{
183                         "type": "file",
184                         "node": node.Id,
185                         "nice": strconv.Itoa(int(nice)),
186                         "src":  srcPath,
187                         "dst":  dstPath,
188                         "size": strconv.FormatInt(fileSize, 10),
189                         "err":  err,
190                 }, "sent")
191         }
192         return err
193 }
194
195 func (ctx *Ctx) TxFileChunked(node *Node, nice uint8, srcPath, dstPath string, minSize int64, chunkSize int64) error {
196         if dstPath == "" {
197                 if srcPath == "-" {
198                         return errors.New("Must provide destination filename")
199                 }
200                 dstPath = filepath.Base(srcPath)
201         }
202         dstPath = filepath.Clean(dstPath)
203         if filepath.IsAbs(dstPath) {
204                 return errors.New("Relative destination path required")
205         }
206         reader, src, fileSize, err := prepareTxFile(srcPath)
207         if src != nil {
208                 defer src.Close()
209         }
210         if err != nil {
211                 return err
212         }
213
214         leftSize := fileSize
215         metaPkt := ChunkedMeta{
216                 Magic:     MagicNNCPMv1,
217                 FileSize:  uint64(fileSize),
218                 ChunkSize: uint64(chunkSize),
219                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
220         }
221         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
222                 hsh := new([32]byte)
223                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
224         }
225         var sizeToSend int64
226         var hsh hash.Hash
227         var pkt *Pkt
228         var chunkNum int
229         var path string
230         for {
231                 if leftSize <= chunkSize {
232                         sizeToSend = leftSize
233                 } else {
234                         sizeToSend = chunkSize
235                 }
236                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
237                 pkt, err = NewPkt(PktTypeFile, path)
238                 if err != nil {
239                         return err
240                 }
241                 hsh, err = blake2b.New256(nil)
242                 if err != nil {
243                         return err
244                 }
245                 _, err = ctx.Tx(
246                         node,
247                         pkt,
248                         nice,
249                         sizeToSend,
250                         minSize,
251                         io.TeeReader(reader, hsh),
252                 )
253                 if err == nil {
254                         ctx.LogI("tx", SDS{
255                                 "type": "file",
256                                 "node": node.Id,
257                                 "nice": strconv.Itoa(int(nice)),
258                                 "src":  srcPath,
259                                 "dst":  path,
260                                 "size": strconv.FormatInt(sizeToSend, 10),
261                         }, "sent")
262                 } else {
263                         ctx.LogE("tx", SDS{
264                                 "type": "file",
265                                 "node": node.Id,
266                                 "nice": strconv.Itoa(int(nice)),
267                                 "src":  srcPath,
268                                 "dst":  path,
269                                 "size": strconv.FormatInt(sizeToSend, 10),
270                                 "err":  err,
271                         }, "sent")
272                         return err
273                 }
274                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
275                 leftSize -= sizeToSend
276                 chunkNum++
277                 if leftSize == 0 {
278                         break
279                 }
280         }
281         var metaBuf bytes.Buffer
282         _, err = xdr.Marshal(&metaBuf, metaPkt)
283         if err != nil {
284                 return err
285         }
286         path = dstPath + ChunkedSuffixMeta
287         pkt, err = NewPkt(PktTypeFile, path)
288         if err != nil {
289                 return err
290         }
291         metaPktSize := int64(metaBuf.Len())
292         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
293         if err == nil {
294                 ctx.LogI("tx", SDS{
295                         "type": "file",
296                         "node": node.Id,
297                         "nice": strconv.Itoa(int(nice)),
298                         "src":  srcPath,
299                         "dst":  path,
300                         "size": strconv.FormatInt(metaPktSize, 10),
301                 }, "sent")
302         } else {
303                 ctx.LogE("tx", SDS{
304                         "type": "file",
305                         "node": node.Id,
306                         "nice": strconv.Itoa(int(nice)),
307                         "src":  srcPath,
308                         "dst":  path,
309                         "size": strconv.FormatInt(metaPktSize, 10),
310                         "err":  err,
311                 }, "sent")
312         }
313         return err
314 }
315
316 func (ctx *Ctx) TxFreq(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
317         dstPath = filepath.Clean(dstPath)
318         if filepath.IsAbs(dstPath) {
319                 return errors.New("Relative destination path required")
320         }
321         srcPath = filepath.Clean(srcPath)
322         if filepath.IsAbs(srcPath) {
323                 return errors.New("Relative source path required")
324         }
325         pkt, err := NewPkt(PktTypeFreq, srcPath)
326         if err != nil {
327                 return err
328         }
329         src := strings.NewReader(dstPath)
330         size := int64(src.Len())
331         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
332         if err == nil {
333                 ctx.LogI("tx", SDS{
334                         "type": "freq",
335                         "node": node.Id,
336                         "nice": strconv.Itoa(int(nice)),
337                         "src":  srcPath,
338                         "dst":  dstPath,
339                 }, "sent")
340         } else {
341                 ctx.LogE("tx", SDS{
342                         "type": "freq",
343                         "node": node.Id,
344                         "nice": strconv.Itoa(int(nice)),
345                         "src":  srcPath,
346                         "dst":  dstPath,
347                         "err":  err,
348                 }, "sent")
349         }
350         return err
351 }
352
353 func (ctx *Ctx) TxMail(node *Node, nice uint8, recipient string, body []byte, minSize int64) error {
354         pkt, err := NewPkt(PktTypeMail, recipient)
355         if err != nil {
356                 return err
357         }
358         var compressed bytes.Buffer
359         compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
360         if err != nil {
361                 return err
362         }
363         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
364                 return err
365         }
366         compressor.Close()
367         size := int64(compressed.Len())
368         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
369         if err == nil {
370                 ctx.LogI("tx", SDS{
371                         "type": "mail",
372                         "node": node.Id,
373                         "nice": strconv.Itoa(int(nice)),
374                         "dst":  recipient,
375                         "size": strconv.FormatInt(size, 10),
376                 }, "sent")
377         } else {
378                 ctx.LogE("tx", SDS{
379                         "type": "mail",
380                         "node": node.Id,
381                         "nice": strconv.Itoa(int(nice)),
382                         "dst":  recipient,
383                         "size": strconv.FormatInt(size, 10),
384                         "err":  err,
385                 }, "sent")
386         }
387         return err
388 }
389
390 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
391         ctx.LogD("tx", SDS{
392                 "type": "trns",
393                 "node": node.Id,
394                 "nice": strconv.Itoa(int(nice)),
395                 "size": strconv.FormatInt(size, 10),
396         }, "taken")
397         tmp, err := ctx.NewTmpFileWHash()
398         if err != nil {
399                 return err
400         }
401         if _, err = io.Copy(tmp.W, src); err != nil {
402                 return err
403         }
404         nodePath := filepath.Join(ctx.Spool, node.Id.String())
405         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
406         if err == nil {
407                 ctx.LogI("tx", SDS{
408                         "type": "trns",
409                         "node": node.Id,
410                         "nice": strconv.Itoa(int(nice)),
411                         "size": strconv.FormatInt(size, 10),
412                 }, "sent")
413         } else {
414                 ctx.LogI("tx", SDS{
415                         "type": "trns",
416                         "node": node.Id,
417                         "nice": strconv.Itoa(int(nice)),
418                         "size": strconv.FormatInt(size, 10),
419                         "err":  err,
420                 }, "sent")
421         }
422         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
423         return err
424 }