]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
-minsize option
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "errors"
26         "io"
27         "os"
28         "path/filepath"
29         "strconv"
30         "strings"
31
32         "golang.org/x/crypto/blake2b"
33 )
34
35 func (ctx *Ctx) Tx(node *Node, pkt *Pkt, nice uint8, size, minSize int64, src io.Reader) (*Node, error) {
36         tmp, err := ctx.NewTmpFileWHash()
37         if err != nil {
38                 return nil, err
39         }
40         hops := make([]*Node, 0, 1+len(node.Via))
41         hops = append(hops, node)
42         lastNode := node
43         for i := len(node.Via); i > 0; i-- {
44                 lastNode = ctx.Neigh[*node.Via[i-1]]
45                 hops = append(hops, lastNode)
46         }
47         padSize := minSize - size - int64(len(hops)) * (PktOverhead + PktEncOverhead)
48         if padSize < 0 {
49                 padSize = 0
50         }
51         errs := make(chan error)
52         curSize := size
53         pipeR, pipeW := io.Pipe()
54         go func(size int64, src io.Reader, dst io.WriteCloser) {
55                 ctx.LogD("tx", SDS{
56                         "node": hops[0].Id,
57                         "nice": strconv.Itoa(int(nice)),
58                         "size": strconv.FormatInt(size, 10),
59                 }, "wrote")
60                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
61                 dst.Close()
62         }(curSize, src, pipeW)
63         curSize += padSize
64
65         var pipeRPrev io.Reader
66         for i := 1; i < len(hops); i++ {
67                 pktTrans := Pkt{
68                         Magic:   MagicNNCPPv1,
69                         Type:    PktTypeTrns,
70                         PathLen: blake2b.Size256,
71                         Path:    new([MaxPathSize]byte),
72                 }
73                 copy(pktTrans.Path[:], hops[i-1].Id[:])
74                 curSize += PktOverhead + PktEncOverhead
75                 pipeRPrev = pipeR
76                 pipeR, pipeW = io.Pipe()
77                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
78                         ctx.LogD("tx", SDS{
79                                 "node": node.Id,
80                                 "nice": strconv.Itoa(int(nice)),
81                                 "size": strconv.FormatInt(size, 10),
82                         }, "trns wrote")
83                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
84                         dst.Close()
85                 }(hops[i], &pktTrans, curSize, pipeRPrev, pipeW)
86         }
87         go func() {
88                 _, err := io.Copy(tmp.W, pipeR)
89                 errs <- err
90         }()
91         for i := 0; i <= len(hops); i++ {
92                 err = <-errs
93                 if err != nil {
94                         tmp.Fd.Close()
95                         return nil, err
96                 }
97         }
98         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
99         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
100         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
101         return lastNode, err
102 }
103
104 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
105         if dstPath == "" {
106                 dstPath = filepath.Base(srcPath)
107         }
108         dstPath = filepath.Clean(dstPath)
109         if filepath.IsAbs(dstPath) {
110                 return errors.New("Relative destination path required")
111         }
112         pkt, err := NewPkt(PktTypeFile, dstPath)
113         if err != nil {
114                 return err
115         }
116         src, err := os.Open(srcPath)
117         if err != nil {
118                 return err
119         }
120         defer src.Close()
121         srcStat, err := src.Stat()
122         if err != nil {
123                 return err
124         }
125         _, err = ctx.Tx(node, pkt, nice, srcStat.Size(), minSize, bufio.NewReader(src))
126         if err == nil {
127                 ctx.LogI("tx", SDS{
128                         "type": "file",
129                         "node": node.Id,
130                         "nice": strconv.Itoa(int(nice)),
131                         "src":  srcPath,
132                         "dst":  dstPath,
133                         "size": strconv.FormatInt(srcStat.Size(), 10),
134                 }, "sent")
135         } else {
136                 ctx.LogE("tx", SDS{
137                         "type": "file",
138                         "node": node.Id,
139                         "nice": strconv.Itoa(int(nice)),
140                         "src":  srcPath,
141                         "dst":  dstPath,
142                         "size": strconv.FormatInt(srcStat.Size(), 10),
143                         "err":  err,
144                 }, "sent")
145         }
146         return err
147 }
148
149 func (ctx *Ctx) TxFreq(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
150         dstPath = filepath.Clean(dstPath)
151         if filepath.IsAbs(dstPath) {
152                 return errors.New("Relative destination path required")
153         }
154         srcPath = filepath.Clean(srcPath)
155         if filepath.IsAbs(srcPath) {
156                 return errors.New("Relative source path required")
157         }
158         pkt, err := NewPkt(PktTypeFreq, srcPath)
159         if err != nil {
160                 return err
161         }
162         src := strings.NewReader(dstPath)
163         size := int64(src.Len())
164         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
165         if err == nil {
166                 ctx.LogI("tx", SDS{
167                         "type": "freq",
168                         "node": node.Id,
169                         "nice": strconv.Itoa(int(nice)),
170                         "src":  srcPath,
171                         "dst":  dstPath,
172                 }, "sent")
173         } else {
174                 ctx.LogE("tx", SDS{
175                         "type": "freq",
176                         "node": node.Id,
177                         "nice": strconv.Itoa(int(nice)),
178                         "src":  srcPath,
179                         "dst":  dstPath,
180                         "err":  err,
181                 }, "sent")
182         }
183         return err
184 }
185
186 func (ctx *Ctx) TxMail(node *Node, nice uint8, recipient string, body []byte, minSize int64) error {
187         pkt, err := NewPkt(PktTypeMail, recipient)
188         if err != nil {
189                 return err
190         }
191         var compressed bytes.Buffer
192         compressor := zlib.NewWriter(&compressed)
193         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
194                 return err
195         }
196         compressor.Close()
197         size := int64(compressed.Len())
198         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
199         if err == nil {
200                 ctx.LogI("tx", SDS{
201                         "type": "mail",
202                         "node": node.Id,
203                         "nice": strconv.Itoa(int(nice)),
204                         "dst":  recipient,
205                         "size": strconv.FormatInt(size, 10),
206                 }, "sent")
207         } else {
208                 ctx.LogE("tx", SDS{
209                         "type": "mail",
210                         "node": node.Id,
211                         "nice": strconv.Itoa(int(nice)),
212                         "dst":  recipient,
213                         "size": strconv.FormatInt(size, 10),
214                         "err":  err,
215                 }, "sent")
216         }
217         return err
218 }
219
220 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
221         ctx.LogD("tx", SDS{
222                 "type": "trns",
223                 "node": node.Id,
224                 "nice": strconv.Itoa(int(nice)),
225                 "size": strconv.FormatInt(size, 10),
226         }, "taken")
227         tmp, err := ctx.NewTmpFileWHash()
228         if err != nil {
229                 return err
230         }
231         if _, err = io.Copy(tmp.W, src); err != nil {
232                 return err
233         }
234         nodePath := filepath.Join(ctx.Spool, node.Id.String())
235         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
236         if err == nil {
237                 ctx.LogI("tx", SDS{
238                         "type": "trns",
239                         "node": node.Id,
240                         "nice": strconv.Itoa(int(nice)),
241                         "size": strconv.FormatInt(size, 10),
242                 }, "sent")
243         } else {
244                 ctx.LogI("tx", SDS{
245                         "type": "trns",
246                         "node": node.Id,
247                         "nice": strconv.Itoa(int(nice)),
248                         "size": strconv.FormatInt(size, 10),
249                         "err":  err,
250                 }, "sent")
251         }
252         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
253         return err
254 }