]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
Initial
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node-to-Node CoPy
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "errors"
26         "io"
27         "os"
28         "path"
29         "path/filepath"
30         "strconv"
31         "strings"
32
33         "golang.org/x/crypto/blake2b"
34 )
35
36 func (ctx *Ctx) Tx(node *Node, pkt *Pkt, nice uint8, size int64, src io.Reader) (*Node, error) {
37         tmp, err := ctx.NewTmpFileWHash()
38         if err != nil {
39                 return nil, err
40         }
41         hops := make([]*Node, 0, 1+len(node.Via))
42         hops = append(hops, node)
43         lastNode := node
44         for i := len(node.Via); i > 0; i-- {
45                 lastNode = ctx.Neigh[*node.Via[i-1]]
46                 hops = append(hops, lastNode)
47         }
48         errs := make(chan error)
49         curSize := size
50         pipeR, pipeW := io.Pipe()
51         go func(size int64, src io.Reader, dst io.WriteCloser) {
52                 ctx.LogD("tx", SDS{
53                         "node": hops[0].Id,
54                         "nice": strconv.Itoa(int(nice)),
55                         "size": strconv.FormatInt(size, 10),
56                 }, "wrote")
57                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, src, dst)
58                 dst.Close()
59         }(curSize, src, pipeW)
60
61         var pipeRPrev io.Reader
62         for i := 1; i < len(hops); i++ {
63                 pktTrans := Pkt{
64                         Magic:   MagicNNCPPv1,
65                         Type:    PktTypeTrns,
66                         PathLen: blake2b.Size256,
67                         Path:    new([MaxPathSize]byte),
68                 }
69                 copy(pktTrans.Path[:], hops[i-1].Id[:])
70                 curSize += PktOverhead + PktEncOverhead
71                 pipeRPrev = pipeR
72                 pipeR, pipeW = io.Pipe()
73                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
74                         ctx.LogD("tx", SDS{
75                                 "node": node.Id,
76                                 "nice": strconv.Itoa(int(nice)),
77                                 "size": strconv.FormatInt(size, 10),
78                         }, "trns wrote")
79                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, src, dst)
80                         dst.Close()
81                 }(hops[i], &pktTrans, curSize, pipeRPrev, pipeW)
82         }
83         go func() {
84                 _, err := io.Copy(tmp.W, pipeR)
85                 errs <- err
86         }()
87         for i := 0; i <= len(hops); i++ {
88                 err = <-errs
89                 if err != nil {
90                         tmp.Fd.Close()
91                         return nil, err
92                 }
93         }
94         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
95         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
96         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
97         return lastNode, err
98 }
99
100 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string) error {
101         dstPath = path.Clean(dstPath)
102         if path.IsAbs(dstPath) {
103                 return errors.New("Relative destination path required")
104         }
105         pkt, err := NewPkt(PktTypeFile, dstPath)
106         if err != nil {
107                 return err
108         }
109         src, err := os.Open(srcPath)
110         if err != nil {
111                 return err
112         }
113         defer src.Close()
114         srcStat, err := src.Stat()
115         if err != nil {
116                 return err
117         }
118         _, err = ctx.Tx(node, pkt, nice, srcStat.Size(), bufio.NewReader(src))
119         if err == nil {
120                 ctx.LogI("tx", SDS{
121                         "type": "file",
122                         "node": node.Id,
123                         "nice": strconv.Itoa(int(nice)),
124                         "src":  srcPath,
125                         "dst":  dstPath,
126                         "size": strconv.FormatInt(srcStat.Size(), 10),
127                 }, "sent")
128         } else {
129                 ctx.LogE("tx", SDS{
130                         "type": "file",
131                         "node": node.Id,
132                         "nice": strconv.Itoa(int(nice)),
133                         "src":  srcPath,
134                         "dst":  dstPath,
135                         "size": strconv.FormatInt(srcStat.Size(), 10),
136                         "err":  err,
137                 }, "sent")
138         }
139         return err
140 }
141
142 func (ctx *Ctx) TxFreq(node *Node, nice uint8, srcPath, dstPath string) error {
143         dstPath = path.Clean(dstPath)
144         if path.IsAbs(dstPath) {
145                 return errors.New("Relative destination path required")
146         }
147         srcPath = path.Clean(srcPath)
148         if path.IsAbs(srcPath) {
149                 return errors.New("Relative source path required")
150         }
151         pkt, err := NewPkt(PktTypeFreq, srcPath)
152         if err != nil {
153                 return err
154         }
155         src := strings.NewReader(dstPath)
156         size := int64(src.Len())
157         _, err = ctx.Tx(node, pkt, nice, size, src)
158         if err == nil {
159                 ctx.LogI("tx", SDS{
160                         "type": "freq",
161                         "node": node.Id,
162                         "nice": strconv.Itoa(int(nice)),
163                         "src":  srcPath,
164                         "dst":  dstPath,
165                 }, "sent")
166         } else {
167                 ctx.LogE("tx", SDS{
168                         "type": "freq",
169                         "node": node.Id,
170                         "nice": strconv.Itoa(int(nice)),
171                         "src":  srcPath,
172                         "dst":  dstPath,
173                         "err":  err,
174                 }, "sent")
175         }
176         return err
177 }
178
179 func (ctx *Ctx) TxMail(node *Node, nice uint8, recipient string, body []byte) error {
180         pkt, err := NewPkt(PktTypeMail, recipient)
181         if err != nil {
182                 return err
183         }
184         var compressed bytes.Buffer
185         compressor := zlib.NewWriter(&compressed)
186         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
187                 return err
188         }
189         compressor.Close()
190         size := int64(compressed.Len())
191         _, err = ctx.Tx(node, pkt, nice, size, &compressed)
192         if err == nil {
193                 ctx.LogI("tx", SDS{
194                         "type": "mail",
195                         "node": node.Id,
196                         "nice": strconv.Itoa(int(nice)),
197                         "dst":  recipient,
198                         "size": strconv.FormatInt(size, 10),
199                 }, "sent")
200         } else {
201                 ctx.LogE("tx", SDS{
202                         "type": "mail",
203                         "node": node.Id,
204                         "nice": strconv.Itoa(int(nice)),
205                         "dst":  recipient,
206                         "size": strconv.FormatInt(size, 10),
207                         "err":  err,
208                 }, "sent")
209         }
210         return err
211 }
212
213 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
214         ctx.LogD("tx", SDS{
215                 "type": "trns",
216                 "node": node.Id,
217                 "nice": strconv.Itoa(int(nice)),
218                 "size": strconv.FormatInt(size, 10),
219         }, "taken")
220         tmp, err := ctx.NewTmpFileWHash()
221         if err != nil {
222                 return err
223         }
224         if _, err = io.Copy(tmp.W, src); err != nil {
225                 return err
226         }
227         nodePath := filepath.Join(ctx.Spool, node.Id.String())
228         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
229         if err == nil {
230                 ctx.LogI("tx", SDS{
231                         "type": "trns",
232                         "node": node.Id,
233                         "nice": strconv.Itoa(int(nice)),
234                         "size": strconv.FormatInt(size, 10),
235                 }, "sent")
236         } else {
237                 ctx.LogI("tx", SDS{
238                         "type": "trns",
239                         "node": node.Id,
240                         "nice": strconv.Itoa(int(nice)),
241                         "size": strconv.FormatInt(size, 10),
242                         "err":  err,
243                 }, "sent")
244         }
245         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
246         return err
247 }