]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
Maximal compression level for mail by default
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "errors"
26         "io"
27         "os"
28         "path/filepath"
29         "strconv"
30         "strings"
31
32         "golang.org/x/crypto/blake2b"
33 )
34
35 func (ctx *Ctx) Tx(node *Node, pkt *Pkt, nice uint8, size, minSize int64, src io.Reader) (*Node, error) {
36         tmp, err := ctx.NewTmpFileWHash()
37         if err != nil {
38                 return nil, err
39         }
40         hops := make([]*Node, 0, 1+len(node.Via))
41         hops = append(hops, node)
42         lastNode := node
43         for i := len(node.Via); i > 0; i-- {
44                 lastNode = ctx.Neigh[*node.Via[i-1]]
45                 hops = append(hops, lastNode)
46         }
47         padSize := minSize - size - int64(len(hops))*(PktOverhead+PktEncOverhead)
48         if padSize < 0 {
49                 padSize = 0
50         }
51         errs := make(chan error)
52         curSize := size
53         pipeR, pipeW := io.Pipe()
54         go func(size int64, src io.Reader, dst io.WriteCloser) {
55                 ctx.LogD("tx", SDS{
56                         "node": hops[0].Id,
57                         "nice": strconv.Itoa(int(nice)),
58                         "size": strconv.FormatInt(size, 10),
59                 }, "wrote")
60                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
61                 dst.Close()
62         }(curSize, src, pipeW)
63         curSize += padSize
64
65         var pipeRPrev io.Reader
66         for i := 1; i < len(hops); i++ {
67                 pktTrans := Pkt{
68                         Magic:   MagicNNCPPv1,
69                         Type:    PktTypeTrns,
70                         PathLen: blake2b.Size256,
71                         Path:    new([MaxPathSize]byte),
72                 }
73                 copy(pktTrans.Path[:], hops[i-1].Id[:])
74                 curSize += PktOverhead + PktEncOverhead
75                 pipeRPrev = pipeR
76                 pipeR, pipeW = io.Pipe()
77                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
78                         ctx.LogD("tx", SDS{
79                                 "node": node.Id,
80                                 "nice": strconv.Itoa(int(nice)),
81                                 "size": strconv.FormatInt(size, 10),
82                         }, "trns wrote")
83                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
84                         dst.Close()
85                 }(hops[i], &pktTrans, curSize, pipeRPrev, pipeW)
86         }
87         go func() {
88                 _, err := io.Copy(tmp.W, pipeR)
89                 errs <- err
90         }()
91         for i := 0; i <= len(hops); i++ {
92                 err = <-errs
93                 if err != nil {
94                         tmp.Fd.Close()
95                         return nil, err
96                 }
97         }
98         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
99         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
100         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
101         return lastNode, err
102 }
103
104 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
105         if dstPath == "" {
106                 dstPath = filepath.Base(srcPath)
107         }
108         dstPath = filepath.Clean(dstPath)
109         if filepath.IsAbs(dstPath) {
110                 return errors.New("Relative destination path required")
111         }
112         pkt, err := NewPkt(PktTypeFile, dstPath)
113         if err != nil {
114                 return err
115         }
116         src, err := os.Open(srcPath)
117         if err != nil {
118                 return err
119         }
120         defer src.Close()
121         srcStat, err := src.Stat()
122         if err != nil {
123                 return err
124         }
125         _, err = ctx.Tx(node, pkt, nice, srcStat.Size(), minSize, bufio.NewReader(src))
126         if err == nil {
127                 ctx.LogI("tx", SDS{
128                         "type": "file",
129                         "node": node.Id,
130                         "nice": strconv.Itoa(int(nice)),
131                         "src":  srcPath,
132                         "dst":  dstPath,
133                         "size": strconv.FormatInt(srcStat.Size(), 10),
134                 }, "sent")
135         } else {
136                 ctx.LogE("tx", SDS{
137                         "type": "file",
138                         "node": node.Id,
139                         "nice": strconv.Itoa(int(nice)),
140                         "src":  srcPath,
141                         "dst":  dstPath,
142                         "size": strconv.FormatInt(srcStat.Size(), 10),
143                         "err":  err,
144                 }, "sent")
145         }
146         return err
147 }
148
149 func (ctx *Ctx) TxFreq(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error {
150         dstPath = filepath.Clean(dstPath)
151         if filepath.IsAbs(dstPath) {
152                 return errors.New("Relative destination path required")
153         }
154         srcPath = filepath.Clean(srcPath)
155         if filepath.IsAbs(srcPath) {
156                 return errors.New("Relative source path required")
157         }
158         pkt, err := NewPkt(PktTypeFreq, srcPath)
159         if err != nil {
160                 return err
161         }
162         src := strings.NewReader(dstPath)
163         size := int64(src.Len())
164         _, err = ctx.Tx(node, pkt, nice, size, minSize, src)
165         if err == nil {
166                 ctx.LogI("tx", SDS{
167                         "type": "freq",
168                         "node": node.Id,
169                         "nice": strconv.Itoa(int(nice)),
170                         "src":  srcPath,
171                         "dst":  dstPath,
172                 }, "sent")
173         } else {
174                 ctx.LogE("tx", SDS{
175                         "type": "freq",
176                         "node": node.Id,
177                         "nice": strconv.Itoa(int(nice)),
178                         "src":  srcPath,
179                         "dst":  dstPath,
180                         "err":  err,
181                 }, "sent")
182         }
183         return err
184 }
185
186 func (ctx *Ctx) TxMail(node *Node, nice uint8, recipient string, body []byte, minSize int64) error {
187         pkt, err := NewPkt(PktTypeMail, recipient)
188         if err != nil {
189                 return err
190         }
191         var compressed bytes.Buffer
192         compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
193         if err != nil {
194                 return err
195         }
196         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
197                 return err
198         }
199         compressor.Close()
200         size := int64(compressed.Len())
201         _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed)
202         if err == nil {
203                 ctx.LogI("tx", SDS{
204                         "type": "mail",
205                         "node": node.Id,
206                         "nice": strconv.Itoa(int(nice)),
207                         "dst":  recipient,
208                         "size": strconv.FormatInt(size, 10),
209                 }, "sent")
210         } else {
211                 ctx.LogE("tx", SDS{
212                         "type": "mail",
213                         "node": node.Id,
214                         "nice": strconv.Itoa(int(nice)),
215                         "dst":  recipient,
216                         "size": strconv.FormatInt(size, 10),
217                         "err":  err,
218                 }, "sent")
219         }
220         return err
221 }
222
223 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
224         ctx.LogD("tx", SDS{
225                 "type": "trns",
226                 "node": node.Id,
227                 "nice": strconv.Itoa(int(nice)),
228                 "size": strconv.FormatInt(size, 10),
229         }, "taken")
230         tmp, err := ctx.NewTmpFileWHash()
231         if err != nil {
232                 return err
233         }
234         if _, err = io.Copy(tmp.W, src); err != nil {
235                 return err
236         }
237         nodePath := filepath.Join(ctx.Spool, node.Id.String())
238         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
239         if err == nil {
240                 ctx.LogI("tx", SDS{
241                         "type": "trns",
242                         "node": node.Id,
243                         "nice": strconv.Itoa(int(nice)),
244                         "size": strconv.FormatInt(size, 10),
245                 }, "sent")
246         } else {
247                 ctx.LogI("tx", SDS{
248                         "type": "trns",
249                         "node": node.Id,
250                         "nice": strconv.Itoa(int(nice)),
251                         "size": strconv.FormatInt(size, 10),
252                         "err":  err,
253                 }, "sent")
254         }
255         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
256         return err
257 }