]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
7faef491cf0f279de5e239a7c77e9c32ee4e11d6
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "crypto/cipher"
26         "crypto/rand"
27         "errors"
28         "hash"
29         "io"
30         "io/ioutil"
31         "os"
32         "path/filepath"
33         "strconv"
34         "strings"
35
36         "github.com/davecgh/go-xdr/xdr2"
37         "golang.org/x/crypto/blake2b"
38         "golang.org/x/crypto/twofish"
39 )
40
41 func (ctx *Ctx) Tx(node *Node, pkt *Pkt, nice uint8, size, minSize int64, src io.Reader) (*Node, error) {
42         tmp, err := ctx.NewTmpFileWHash()
43         if err != nil {
44                 return nil, err
45         }
46         hops := make([]*Node, 0, 1+len(node.Via))
47         hops = append(hops, node)
48         lastNode := node
49         for i := len(node.Via); i > 0; i-- {
50                 lastNode = ctx.Neigh[*node.Via[i-1]]
51                 hops = append(hops, lastNode)
52         }
53         padSize := minSize - size - int64(len(hops))*(PktOverhead+PktEncOverhead)
54         if padSize < 0 {
55                 padSize = 0
56         }
57         errs := make(chan error)
58         curSize := size
59         pipeR, pipeW := io.Pipe()
60         go func(size int64, src io.Reader, dst io.WriteCloser) {
61                 ctx.LogD("tx", SDS{
62                         "node": hops[0].Id,
63                         "nice": strconv.Itoa(int(nice)),
64                         "size": strconv.FormatInt(size, 10),
65                 }, "wrote")
66                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
67                 dst.Close()
68         }(curSize, src, pipeW)
69         curSize += padSize
70
71         var pipeRPrev io.Reader
72         for i := 1; i < len(hops); i++ {
73                 pktTrans := Pkt{
74                         Magic:   MagicNNCPPv1,
75                         Type:    PktTypeTrns,
76                         PathLen: blake2b.Size256,
77                         Path:    new([MaxPathSize]byte),
78                 }
79                 copy(pktTrans.Path[:], hops[i-1].Id[:])
80                 curSize += PktOverhead + PktEncOverhead
81                 pipeRPrev = pipeR
82                 pipeR, pipeW = io.Pipe()
83                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
84                         ctx.LogD("tx", SDS{
85                                 "node": node.Id,
86                                 "nice": strconv.Itoa(int(nice)),
87                                 "size": strconv.FormatInt(size, 10),
88                         }, "trns wrote")
89                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
90                         dst.Close()
91                 }(hops[i], &pktTrans, curSize, pipeRPrev, pipeW)
92         }
93         go func() {
94                 _, err := io.Copy(tmp.W, pipeR)
95                 errs <- err
96         }()
97         for i := 0; i <= len(hops); i++ {
98                 err = <-errs
99                 if err != nil {
100                         tmp.Fd.Close()
101                         return nil, err
102                 }
103         }
104         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
105         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
106         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
107         return lastNode, err
108 }
109
110 func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) {
111         var reader io.Reader
112         var src *os.File
113         var fileSize int64
114         var err error
115         if srcPath == "-" {
116                 src, err = ioutil.TempFile("", "nncp-file")
117                 if err != nil {
118                         return nil, nil, 0, err
119                 }
120                 os.Remove(src.Name())
121                 tmpW := bufio.NewWriter(src)
122
123                 tmpKey := make([]byte, 32)
124                 if _, err = rand.Read(tmpKey); err != nil {
125                         return nil, nil, 0, err
126                 }
127                 ciph, err := twofish.NewCipher(tmpKey)
128                 if err != nil {
129                         return nil, nil, 0, err
130                 }
131                 ctr := cipher.NewCTR(ciph, make([]byte, twofish.BlockSize))
132                 encrypter := &cipher.StreamWriter{S: ctr, W: tmpW}
133                 fileSize, err = io.Copy(encrypter, bufio.NewReader(os.Stdin))
134                 if err != nil {
135                         return nil, nil, 0, err
136                 }
137                 tmpW.Flush()
138                 src.Seek(0, 0)
139                 ctr = cipher.NewCTR(ciph, make([]byte, twofish.BlockSize))
140                 reader = &cipher.StreamReader{S: ctr, R: bufio.NewReader(src)}
141         } else {
142                 src, err = os.Open(srcPath)
143                 if err != nil {
144                         return nil, nil, 0, err
145                 }
146                 srcStat, err := src.Stat()
147                 if err != nil {
148                         return nil, nil, 0, err
149                 }
150                 fileSize = srcStat.Size()
151                 reader = bufio.NewReader(src)
152         }
153         return reader, src, fileSize, nil
154 }
155
156 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
157         if dstPath == "" {
158                 if srcPath == "-" {
159                         return errors.New("Must provide destination filename")
160                 }
161                 dstPath = filepath.Base(srcPath)
162         }
163         dstPath = filepath.Clean(dstPath)
164         if filepath.IsAbs(dstPath) {
165                 return errors.New("Relative destination path required")
166         }
167         pkt, err := NewPkt(PktTypeFile, dstPath)
168         if err != nil {
169                 return err
170         }
171         reader, src, fileSize, err := prepareTxFile(srcPath)
172         if src != nil {
173                 defer src.Close()
174         }
175         if err != nil {
176                 return err
177         }
178         _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader)
179         if err == nil {
180                 ctx.LogI("tx", SDS{
181                         "type": "file",
182                         "node": node.Id,
183                         "nice": strconv.Itoa(int(nice)),
184                         "src":  srcPath,
185                         "dst":  dstPath,
186                         "size": strconv.FormatInt(fileSize, 10),
187                 }, "sent")
188         } else {
189                 ctx.LogE("tx", SDS{
190                         "type": "file",
191                         "node": node.Id,
192                         "nice": strconv.Itoa(int(nice)),
193                         "src":  srcPath,
194                         "dst":  dstPath,
195                         "size": strconv.FormatInt(fileSize, 10),
196                         "err":  err,
197                 }, "sent")
198         }
199         return err
200 }
201
202 func (ctx *Ctx) TxFileChunked(node *Node, nice uint8, srcPath, dstPath string, minSize int64, chunkSize int64) error {
203         if dstPath == "" {
204                 if srcPath == "-" {
205                         return errors.New("Must provide destination filename")
206                 }
207                 dstPath = filepath.Base(srcPath)
208         }
209         dstPath = filepath.Clean(dstPath)
210         if filepath.IsAbs(dstPath) {
211                 return errors.New("Relative destination path required")
212         }
213         reader, src, fileSize, err := prepareTxFile(srcPath)
214         if src != nil {
215                 defer src.Close()
216         }
217         if err != nil {
218                 return err
219         }
220
221         leftSize := fileSize
222         metaPkt := ChunkedMeta{
223                 Magic:     MagicNNCPMv1,
224                 FileSize:  uint64(fileSize),
225                 ChunkSize: uint64(chunkSize),
226                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
227         }
228         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
229                 hsh := new([32]byte)
230                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
231         }
232         var sizeToSend int64
233         var hsh hash.Hash
234         var pkt *Pkt
235         var chunkNum int
236         var path string
237         for {
238                 if leftSize <= chunkSize {
239                         sizeToSend = leftSize
240                 } else {
241                         sizeToSend = chunkSize
242                 }
243                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
244                 pkt, err = NewPkt(PktTypeFile, path)
245                 if err != nil {
246                         return err
247                 }
248                 hsh, err = blake2b.New256(nil)
249                 if err != nil {
250                         return err
251                 }
252                 _, err = ctx.Tx(
253                         node,
254                         pkt,
255                         nice,
256                         sizeToSend,
257                         minSize,
258                         io.TeeReader(reader, hsh),
259                 )
260                 if err == nil {
261                         ctx.LogI("tx", SDS{
262                                 "type": "file",
263                                 "node": node.Id,
264                                 "nice": strconv.Itoa(int(nice)),
265                                 "src":  srcPath,
266                                 "dst":  path,
267                                 "size": strconv.FormatInt(sizeToSend, 10),
268                         }, "sent")
269                 } else {
270                         ctx.LogE("tx", SDS{
271                                 "type": "file",
272                                 "node": node.Id,
273                                 "nice": strconv.Itoa(int(nice)),
274                                 "src":  srcPath,
275                                 "dst":  path,
276                                 "size": strconv.FormatInt(sizeToSend, 10),
277                                 "err":  err,
278                         }, "sent")
279                         return err
280                 }
281                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
282                 leftSize -= sizeToSend
283                 chunkNum++
284                 if leftSize == 0 {
285                         break
286                 }
287         }
288         var metaBuf bytes.Buffer
289         _, err = xdr.Marshal(&metaBuf, metaPkt)
290         if err != nil {
291                 return err
292         }
293         path = dstPath + ChunkedSuffixMeta
294         pkt, err = NewPkt(PktTypeFile, path)
295         if err != nil {
296                 return err
297         }
298         metaPktSize := int64(metaBuf.Len())
299         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
300         if err == nil {
301                 ctx.LogI("tx", SDS{
302                         "type": "file",
303                         "node": node.Id,
304                         "nice": strconv.Itoa(int(nice)),
305                         "src":  srcPath,
306                         "dst":  path,
307                         "size": strconv.FormatInt(metaPktSize, 10),
308                 }, "sent")
309         } else {
310                 ctx.LogE("tx", SDS{
311                         "type": "file",
312                         "node": node.Id,
313                         "nice": strconv.Itoa(int(nice)),
314                         "src":  srcPath,
315                         "dst":  path,
316                         "size": strconv.FormatInt(metaPktSize, 10),
317                         "err":  err,
318                 }, "sent")
319         }
320         return err
321 }
322
323 func (ctx *Ctx) TxFreq(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
324         dstPath = filepath.Clean(dstPath)
325         if filepath.IsAbs(dstPath) {
326                 return errors.New("Relative destination path required")
327         }
328         srcPath = filepath.Clean(srcPath)
329         if filepath.IsAbs(srcPath) {
330                 return errors.New("Relative source path required")
331         }
332         pkt, err := NewPkt(PktTypeFreq, srcPath)
333         if err != nil {
334                 return err
335         }
336         src := strings.NewReader(dstPath)
337         size := int64(src.Len())
338         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
339         if err == nil {
340                 ctx.LogI("tx", SDS{
341                         "type": "freq",
342                         "node": node.Id,
343                         "nice": strconv.Itoa(int(nice)),
344                         "src":  srcPath,
345                         "dst":  dstPath,
346                 }, "sent")
347         } else {
348                 ctx.LogE("tx", SDS{
349                         "type": "freq",
350                         "node": node.Id,
351                         "nice": strconv.Itoa(int(nice)),
352                         "src":  srcPath,
353                         "dst":  dstPath,
354                         "err":  err,
355                 }, "sent")
356         }
357         return err
358 }
359
360 func (ctx *Ctx) TxMail(node *Node, nice uint8, recipient string, body []byte, minSize int64) error {
361         pkt, err := NewPkt(PktTypeMail, recipient)
362         if err != nil {
363                 return err
364         }
365         var compressed bytes.Buffer
366         compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
367         if err != nil {
368                 return err
369         }
370         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
371                 return err
372         }
373         compressor.Close()
374         size := int64(compressed.Len())
375         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
376         if err == nil {
377                 ctx.LogI("tx", SDS{
378                         "type": "mail",
379                         "node": node.Id,
380                         "nice": strconv.Itoa(int(nice)),
381                         "dst":  recipient,
382                         "size": strconv.FormatInt(size, 10),
383                 }, "sent")
384         } else {
385                 ctx.LogE("tx", SDS{
386                         "type": "mail",
387                         "node": node.Id,
388                         "nice": strconv.Itoa(int(nice)),
389                         "dst":  recipient,
390                         "size": strconv.FormatInt(size, 10),
391                         "err":  err,
392                 }, "sent")
393         }
394         return err
395 }
396
397 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
398         ctx.LogD("tx", SDS{
399                 "type": "trns",
400                 "node": node.Id,
401                 "nice": strconv.Itoa(int(nice)),
402                 "size": strconv.FormatInt(size, 10),
403         }, "taken")
404         tmp, err := ctx.NewTmpFileWHash()
405         if err != nil {
406                 return err
407         }
408         if _, err = io.Copy(tmp.W, src); err != nil {
409                 return err
410         }
411         nodePath := filepath.Join(ctx.Spool, node.Id.String())
412         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
413         if err == nil {
414                 ctx.LogI("tx", SDS{
415                         "type": "trns",
416                         "node": node.Id,
417                         "nice": strconv.Itoa(int(nice)),
418                         "size": strconv.FormatInt(size, 10),
419                 }, "sent")
420         } else {
421                 ctx.LogI("tx", SDS{
422                         "type": "trns",
423                         "node": node.Id,
424                         "nice": strconv.Itoa(int(nice)),
425                         "size": strconv.FormatInt(size, 10),
426                         "err":  err,
427                 }, "sent")
428         }
429         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
430         return err
431 }