]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
Initial nncp-reass utility
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "errors"
26         "hash"
27         "io"
28         "os"
29         "path/filepath"
30         "strconv"
31         "strings"
32
33         "github.com/davecgh/go-xdr/xdr2"
34         "golang.org/x/crypto/blake2b"
35 )
36
37 func (ctx *Ctx) Tx(node *Node, pkt *Pkt, nice uint8, size, minSize int64, src io.Reader) (*Node, error) {
38         tmp, err := ctx.NewTmpFileWHash()
39         if err != nil {
40                 return nil, err
41         }
42         hops := make([]*Node, 0, 1+len(node.Via))
43         hops = append(hops, node)
44         lastNode := node
45         for i := len(node.Via); i > 0; i-- {
46                 lastNode = ctx.Neigh[*node.Via[i-1]]
47                 hops = append(hops, lastNode)
48         }
49         padSize := minSize - size - int64(len(hops))*(PktOverhead+PktEncOverhead)
50         if padSize < 0 {
51                 padSize = 0
52         }
53         errs := make(chan error)
54         curSize := size
55         pipeR, pipeW := io.Pipe()
56         go func(size int64, src io.Reader, dst io.WriteCloser) {
57                 ctx.LogD("tx", SDS{
58                         "node": hops[0].Id,
59                         "nice": strconv.Itoa(int(nice)),
60                         "size": strconv.FormatInt(size, 10),
61                 }, "wrote")
62                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
63                 dst.Close()
64         }(curSize, src, pipeW)
65         curSize += padSize
66
67         var pipeRPrev io.Reader
68         for i := 1; i < len(hops); i++ {
69                 pktTrans := Pkt{
70                         Magic:   MagicNNCPPv1,
71                         Type:    PktTypeTrns,
72                         PathLen: blake2b.Size256,
73                         Path:    new([MaxPathSize]byte),
74                 }
75                 copy(pktTrans.Path[:], hops[i-1].Id[:])
76                 curSize += PktOverhead + PktEncOverhead
77                 pipeRPrev = pipeR
78                 pipeR, pipeW = io.Pipe()
79                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
80                         ctx.LogD("tx", SDS{
81                                 "node": node.Id,
82                                 "nice": strconv.Itoa(int(nice)),
83                                 "size": strconv.FormatInt(size, 10),
84                         }, "trns wrote")
85                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
86                         dst.Close()
87                 }(hops[i], &pktTrans, curSize, pipeRPrev, pipeW)
88         }
89         go func() {
90                 _, err := io.Copy(tmp.W, pipeR)
91                 errs <- err
92         }()
93         for i := 0; i <= len(hops); i++ {
94                 err = <-errs
95                 if err != nil {
96                         tmp.Fd.Close()
97                         return nil, err
98                 }
99         }
100         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
101         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
102         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
103         return lastNode, err
104 }
105
106 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
107         if dstPath == "" {
108                 dstPath = filepath.Base(srcPath)
109         }
110         dstPath = filepath.Clean(dstPath)
111         if filepath.IsAbs(dstPath) {
112                 return errors.New("Relative destination path required")
113         }
114         pkt, err := NewPkt(PktTypeFile, dstPath)
115         if err != nil {
116                 return err
117         }
118         src, err := os.Open(srcPath)
119         if err != nil {
120                 return err
121         }
122         defer src.Close()
123         srcStat, err := src.Stat()
124         if err != nil {
125                 return err
126         }
127         _, err = ctx.Tx(node, pkt, nice, srcStat.Size(), minSize, bufio.NewReader(src))
128         if err == nil {
129                 ctx.LogI("tx", SDS{
130                         "type": "file",
131                         "node": node.Id,
132                         "nice": strconv.Itoa(int(nice)),
133                         "src":  srcPath,
134                         "dst":  dstPath,
135                         "size": strconv.FormatInt(srcStat.Size(), 10),
136                 }, "sent")
137         } else {
138                 ctx.LogE("tx", SDS{
139                         "type": "file",
140                         "node": node.Id,
141                         "nice": strconv.Itoa(int(nice)),
142                         "src":  srcPath,
143                         "dst":  dstPath,
144                         "size": strconv.FormatInt(srcStat.Size(), 10),
145                         "err":  err,
146                 }, "sent")
147         }
148         return err
149 }
150
151 func (ctx *Ctx) TxFileChunked(node *Node, nice uint8, srcPath, dstPath string, minSize int64, chunkSize int64) error {
152         if dstPath == "" {
153                 dstPath = filepath.Base(srcPath)
154         }
155         dstPath = filepath.Clean(dstPath)
156         if filepath.IsAbs(dstPath) {
157                 return errors.New("Relative destination path required")
158         }
159         src, err := os.Open(srcPath)
160         if err != nil {
161                 return err
162         }
163         defer src.Close()
164         srcStat, err := src.Stat()
165         if err != nil {
166                 return err
167         }
168         srcReader := bufio.NewReader(src)
169         fileSize := srcStat.Size()
170         leftSize := fileSize
171         metaPkt := ChunkedMeta{
172                 Magic:     MagicNNCPMv1,
173                 FileSize:  uint64(fileSize),
174                 ChunkSize: uint64(chunkSize),
175                 Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1),
176         }
177         for i := int64(0); i < (fileSize/chunkSize)+1; i++ {
178                 hsh := new([32]byte)
179                 metaPkt.Checksums = append(metaPkt.Checksums, *hsh)
180         }
181         var sizeToSend int64
182         var hsh hash.Hash
183         var pkt *Pkt
184         var chunkNum int
185         var path string
186         for {
187                 if leftSize <= chunkSize {
188                         sizeToSend = leftSize
189                 } else {
190                         sizeToSend = chunkSize
191                 }
192                 path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
193                 pkt, err = NewPkt(PktTypeFile, path)
194                 if err != nil {
195                         return err
196                 }
197                 hsh, err = blake2b.New256(nil)
198                 if err != nil {
199                         return err
200                 }
201                 _, err = ctx.Tx(
202                         node,
203                         pkt,
204                         nice,
205                         sizeToSend,
206                         minSize,
207                         io.TeeReader(srcReader, hsh),
208                 )
209                 if err == nil {
210                         ctx.LogD("tx", SDS{
211                                 "type": "file",
212                                 "node": node.Id,
213                                 "nice": strconv.Itoa(int(nice)),
214                                 "src":  srcPath,
215                                 "dst":  path,
216                                 "size": strconv.FormatInt(sizeToSend, 10),
217                         }, "sent")
218                 } else {
219                         ctx.LogE("tx", SDS{
220                                 "type": "file",
221                                 "node": node.Id,
222                                 "nice": strconv.Itoa(int(nice)),
223                                 "src":  srcPath,
224                                 "dst":  path,
225                                 "size": strconv.FormatInt(sizeToSend, 10),
226                                 "err":  err,
227                         }, "sent")
228                         return err
229                 }
230                 hsh.Sum(metaPkt.Checksums[chunkNum][:0])
231                 leftSize -= sizeToSend
232                 chunkNum++
233                 if leftSize == 0 {
234                         break
235                 }
236         }
237         var metaBuf bytes.Buffer
238         _, err = xdr.Marshal(&metaBuf, metaPkt)
239         if err != nil {
240                 return err
241         }
242         path = dstPath + ChunkedSuffixMeta
243         pkt, err = NewPkt(PktTypeFile, path)
244         if err != nil {
245                 return err
246         }
247         metaPktSize := int64(metaBuf.Len())
248         _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf)
249         if err == nil {
250                 ctx.LogD("tx", SDS{
251                         "type": "file",
252                         "node": node.Id,
253                         "nice": strconv.Itoa(int(nice)),
254                         "src":  srcPath,
255                         "dst":  path,
256                         "size": strconv.FormatInt(metaPktSize, 10),
257                 }, "sent")
258                 ctx.LogI("tx", SDS{
259                         "type": "file",
260                         "node": node.Id,
261                         "nice": strconv.Itoa(int(nice)),
262                         "src":  srcPath,
263                         "dst":  dstPath,
264                         "size": strconv.FormatInt(fileSize, 10),
265                 }, "sent")
266         } else {
267                 ctx.LogE("tx", SDS{
268                         "type": "file",
269                         "node": node.Id,
270                         "nice": strconv.Itoa(int(nice)),
271                         "src":  srcPath,
272                         "dst":  path,
273                         "size": strconv.FormatInt(metaPktSize, 10),
274                         "err":  err,
275                 }, "sent")
276         }
277         return err
278 }
279
280 func (ctx *Ctx) TxFreq(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
281         dstPath = filepath.Clean(dstPath)
282         if filepath.IsAbs(dstPath) {
283                 return errors.New("Relative destination path required")
284         }
285         srcPath = filepath.Clean(srcPath)
286         if filepath.IsAbs(srcPath) {
287                 return errors.New("Relative source path required")
288         }
289         pkt, err := NewPkt(PktTypeFreq, srcPath)
290         if err != nil {
291                 return err
292         }
293         src := strings.NewReader(dstPath)
294         size := int64(src.Len())
295         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
296         if err == nil {
297                 ctx.LogI("tx", SDS{
298                         "type": "freq",
299                         "node": node.Id,
300                         "nice": strconv.Itoa(int(nice)),
301                         "src":  srcPath,
302                         "dst":  dstPath,
303                 }, "sent")
304         } else {
305                 ctx.LogE("tx", SDS{
306                         "type": "freq",
307                         "node": node.Id,
308                         "nice": strconv.Itoa(int(nice)),
309                         "src":  srcPath,
310                         "dst":  dstPath,
311                         "err":  err,
312                 }, "sent")
313         }
314         return err
315 }
316
317 func (ctx *Ctx) TxMail(node *Node, nice uint8, recipient string, body []byte, minSize int64) error {
318         pkt, err := NewPkt(PktTypeMail, recipient)
319         if err != nil {
320                 return err
321         }
322         var compressed bytes.Buffer
323         compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
324         if err != nil {
325                 return err
326         }
327         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
328                 return err
329         }
330         compressor.Close()
331         size := int64(compressed.Len())
332         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
333         if err == nil {
334                 ctx.LogI("tx", SDS{
335                         "type": "mail",
336                         "node": node.Id,
337                         "nice": strconv.Itoa(int(nice)),
338                         "dst":  recipient,
339                         "size": strconv.FormatInt(size, 10),
340                 }, "sent")
341         } else {
342                 ctx.LogE("tx", SDS{
343                         "type": "mail",
344                         "node": node.Id,
345                         "nice": strconv.Itoa(int(nice)),
346                         "dst":  recipient,
347                         "size": strconv.FormatInt(size, 10),
348                         "err":  err,
349                 }, "sent")
350         }
351         return err
352 }
353
354 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
355         ctx.LogD("tx", SDS{
356                 "type": "trns",
357                 "node": node.Id,
358                 "nice": strconv.Itoa(int(nice)),
359                 "size": strconv.FormatInt(size, 10),
360         }, "taken")
361         tmp, err := ctx.NewTmpFileWHash()
362         if err != nil {
363                 return err
364         }
365         if _, err = io.Copy(tmp.W, src); err != nil {
366                 return err
367         }
368         nodePath := filepath.Join(ctx.Spool, node.Id.String())
369         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
370         if err == nil {
371                 ctx.LogI("tx", SDS{
372                         "type": "trns",
373                         "node": node.Id,
374                         "nice": strconv.Itoa(int(nice)),
375                         "size": strconv.FormatInt(size, 10),
376                 }, "sent")
377         } else {
378                 ctx.LogI("tx", SDS{
379                         "type": "trns",
380                         "node": node.Id,
381                         "nice": strconv.Itoa(int(nice)),
382                         "size": strconv.FormatInt(size, 10),
383                         "err":  err,
384                 }, "sent")
385         }
386         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
387         return err
388 }