]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/tx.go
Ability to omit destination path
[nncp.git] / src / cypherpunks.ru / nncp / tx.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "bytes"
24         "compress/zlib"
25         "errors"
26         "io"
27         "os"
28         "path/filepath"
29         "strconv"
30         "strings"
31
32         "golang.org/x/crypto/blake2b"
33 )
34
35 func (ctx *Ctx) Tx(node *Node, pkt *Pkt, nice uint8, size int64, src io.Reader) (*Node, error) {
36         tmp, err := ctx.NewTmpFileWHash()
37         if err != nil {
38                 return nil, err
39         }
40         hops := make([]*Node, 0, 1+len(node.Via))
41         hops = append(hops, node)
42         lastNode := node
43         for i := len(node.Via); i > 0; i-- {
44                 lastNode = ctx.Neigh[*node.Via[i-1]]
45                 hops = append(hops, lastNode)
46         }
47         errs := make(chan error)
48         curSize := size
49         pipeR, pipeW := io.Pipe()
50         go func(size int64, src io.Reader, dst io.WriteCloser) {
51                 ctx.LogD("tx", SDS{
52                         "node": hops[0].Id,
53                         "nice": strconv.Itoa(int(nice)),
54                         "size": strconv.FormatInt(size, 10),
55                 }, "wrote")
56                 errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, src, dst)
57                 dst.Close()
58         }(curSize, src, pipeW)
59
60         var pipeRPrev io.Reader
61         for i := 1; i < len(hops); i++ {
62                 pktTrans := Pkt{
63                         Magic:   MagicNNCPPv1,
64                         Type:    PktTypeTrns,
65                         PathLen: blake2b.Size256,
66                         Path:    new([MaxPathSize]byte),
67                 }
68                 copy(pktTrans.Path[:], hops[i-1].Id[:])
69                 curSize += PktOverhead + PktEncOverhead
70                 pipeRPrev = pipeR
71                 pipeR, pipeW = io.Pipe()
72                 go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
73                         ctx.LogD("tx", SDS{
74                                 "node": node.Id,
75                                 "nice": strconv.Itoa(int(nice)),
76                                 "size": strconv.FormatInt(size, 10),
77                         }, "trns wrote")
78                         errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, src, dst)
79                         dst.Close()
80                 }(hops[i], &pktTrans, curSize, pipeRPrev, pipeW)
81         }
82         go func() {
83                 _, err := io.Copy(tmp.W, pipeR)
84                 errs <- err
85         }()
86         for i := 0; i <= len(hops); i++ {
87                 err = <-errs
88                 if err != nil {
89                         tmp.Fd.Close()
90                         return nil, err
91                 }
92         }
93         nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
94         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
95         os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
96         return lastNode, err
97 }
98
99 func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string) error {
100         if dstPath == "" {
101                 dstPath = filepath.Base(srcPath)
102         }
103         dstPath = filepath.Clean(dstPath)
104         if filepath.IsAbs(dstPath) {
105                 return errors.New("Relative destination path required")
106         }
107         pkt, err := NewPkt(PktTypeFile, dstPath)
108         if err != nil {
109                 return err
110         }
111         src, err := os.Open(srcPath)
112         if err != nil {
113                 return err
114         }
115         defer src.Close()
116         srcStat, err := src.Stat()
117         if err != nil {
118                 return err
119         }
120         _, err = ctx.Tx(node, pkt, nice, srcStat.Size(), bufio.NewReader(src))
121         if err == nil {
122                 ctx.LogI("tx", SDS{
123                         "type": "file",
124                         "node": node.Id,
125                         "nice": strconv.Itoa(int(nice)),
126                         "src":  srcPath,
127                         "dst":  dstPath,
128                         "size": strconv.FormatInt(srcStat.Size(), 10),
129                 }, "sent")
130         } else {
131                 ctx.LogE("tx", SDS{
132                         "type": "file",
133                         "node": node.Id,
134                         "nice": strconv.Itoa(int(nice)),
135                         "src":  srcPath,
136                         "dst":  dstPath,
137                         "size": strconv.FormatInt(srcStat.Size(), 10),
138                         "err":  err,
139                 }, "sent")
140         }
141         return err
142 }
143
144 func (ctx *Ctx) TxFreq(node *Node, nice uint8, srcPath, dstPath string) error {
145         dstPath = filepath.Clean(dstPath)
146         if filepath.IsAbs(dstPath) {
147                 return errors.New("Relative destination path required")
148         }
149         srcPath = filepath.Clean(srcPath)
150         if filepath.IsAbs(srcPath) {
151                 return errors.New("Relative source path required")
152         }
153         pkt, err := NewPkt(PktTypeFreq, srcPath)
154         if err != nil {
155                 return err
156         }
157         src := strings.NewReader(dstPath)
158         size := int64(src.Len())
159         _, err = ctx.Tx(node, pkt, nice, size, src)
160         if err == nil {
161                 ctx.LogI("tx", SDS{
162                         "type": "freq",
163                         "node": node.Id,
164                         "nice": strconv.Itoa(int(nice)),
165                         "src":  srcPath,
166                         "dst":  dstPath,
167                 }, "sent")
168         } else {
169                 ctx.LogE("tx", SDS{
170                         "type": "freq",
171                         "node": node.Id,
172                         "nice": strconv.Itoa(int(nice)),
173                         "src":  srcPath,
174                         "dst":  dstPath,
175                         "err":  err,
176                 }, "sent")
177         }
178         return err
179 }
180
181 func (ctx *Ctx) TxMail(node *Node, nice uint8, recipient string, body []byte) error {
182         pkt, err := NewPkt(PktTypeMail, recipient)
183         if err != nil {
184                 return err
185         }
186         var compressed bytes.Buffer
187         compressor := zlib.NewWriter(&compressed)
188         if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil {
189                 return err
190         }
191         compressor.Close()
192         size := int64(compressed.Len())
193         _, err = ctx.Tx(node, pkt, nice, size, &compressed)
194         if err == nil {
195                 ctx.LogI("tx", SDS{
196                         "type": "mail",
197                         "node": node.Id,
198                         "nice": strconv.Itoa(int(nice)),
199                         "dst":  recipient,
200                         "size": strconv.FormatInt(size, 10),
201                 }, "sent")
202         } else {
203                 ctx.LogE("tx", SDS{
204                         "type": "mail",
205                         "node": node.Id,
206                         "nice": strconv.Itoa(int(nice)),
207                         "dst":  recipient,
208                         "size": strconv.FormatInt(size, 10),
209                         "err":  err,
210                 }, "sent")
211         }
212         return err
213 }
214
215 func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
216         ctx.LogD("tx", SDS{
217                 "type": "trns",
218                 "node": node.Id,
219                 "nice": strconv.Itoa(int(nice)),
220                 "size": strconv.FormatInt(size, 10),
221         }, "taken")
222         tmp, err := ctx.NewTmpFileWHash()
223         if err != nil {
224                 return err
225         }
226         if _, err = io.Copy(tmp.W, src); err != nil {
227                 return err
228         }
229         nodePath := filepath.Join(ctx.Spool, node.Id.String())
230         err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
231         if err == nil {
232                 ctx.LogI("tx", SDS{
233                         "type": "trns",
234                         "node": node.Id,
235                         "nice": strconv.Itoa(int(nice)),
236                         "size": strconv.FormatInt(size, 10),
237                 }, "sent")
238         } else {
239                 ctx.LogI("tx", SDS{
240                         "type": "trns",
241                         "node": node.Id,
242                         "nice": strconv.Itoa(int(nice)),
243                         "size": strconv.FormatInt(size, 10),
244                         "err":  err,
245                 }, "sent")
246         }
247         os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
248         return err
249 }