]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Replace TxFile with TxFileChunked
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "fmt"
24         "io"
25         "io/ioutil"
26         "log"
27         "mime"
28         "os"
29         "os/exec"
30         "path"
31         "path/filepath"
32         "strconv"
33         "strings"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/dustin/go-humanize"
37         "github.com/klauspost/compress/zstd"
38         "golang.org/x/crypto/blake2b"
39         "golang.org/x/crypto/poly1305"
40 )
41
42 const (
43         SeenSuffix = ".seen"
44 )
45
46 func newNotification(fromTo *FromToJSON, subject string) io.Reader {
47         return strings.NewReader(fmt.Sprintf(
48                 "From: %s\nTo: %s\nSubject: %s\n",
49                 fromTo.From,
50                 fromTo.To,
51                 mime.BEncoding.Encode("UTF-8", subject),
52         ))
53 }
54
55 func (ctx *Ctx) Toss(
56         nodeId *NodeId,
57         nice uint8,
58         dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
59 ) bool {
60         isBad := false
61         decompressor, err := zstd.NewReader(nil)
62         if err != nil {
63                 panic(err)
64         }
65         defer decompressor.Close()
66         for job := range ctx.Jobs(nodeId, TRx) {
67                 pktName := filepath.Base(job.Fd.Name())
68                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
69                 if job.PktEnc.Nice > nice {
70                         ctx.LogD("rx", SdsAdd(sds, SDS{
71                                 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
72                         }), "too nice")
73                         continue
74                 }
75                 pipeR, pipeW := io.Pipe()
76                 errs := make(chan error, 1)
77                 go func(job Job) {
78                         pipeWB := bufio.NewWriter(pipeW)
79                         _, _, err := PktEncRead(
80                                 ctx.Self,
81                                 ctx.Neigh,
82                                 bufio.NewReader(job.Fd),
83                                 pipeWB,
84                         )
85                         errs <- err
86                         pipeWB.Flush()
87                         pipeW.Close()
88                         job.Fd.Close()
89                         if err != nil {
90                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
91                         }
92                 }(job)
93                 var pkt Pkt
94                 var err error
95                 var pktSize int64
96                 var pktSizeBlocks int64
97                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
98                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
99                         isBad = true
100                         goto Closing
101                 }
102                 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
103                 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
104                 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
105                         pktSize -= poly1305.TagSize
106                 }
107                 pktSize -= pktSizeBlocks * poly1305.TagSize
108                 sds["size"] = strconv.FormatInt(pktSize, 10)
109                 ctx.LogD("rx", sds, "taken")
110                 switch pkt.Type {
111                 case PktTypeExec:
112                         if noExec {
113                                 goto Closing
114                         }
115                         path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
116                         handle := string(path[0])
117                         args := make([]string, 0, len(path)-1)
118                         for _, p := range path[1:] {
119                                 args = append(args, string(p))
120                         }
121                         sds := SdsAdd(sds, SDS{
122                                 "type": "exec",
123                                 "dst":  strings.Join(append([]string{handle}, args...), " "),
124                         })
125                         sender := ctx.Neigh[*job.PktEnc.Sender]
126                         cmdline, exists := sender.Exec[handle]
127                         if !exists || len(cmdline) == 0 {
128                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "")
129                                 isBad = true
130                                 goto Closing
131                         }
132                         if err = decompressor.Reset(pipeR); err != nil {
133                                 log.Fatalln(err)
134                         }
135                         if !dryRun {
136                                 cmd := exec.Command(
137                                         cmdline[0],
138                                         append(cmdline[1:len(cmdline)], args...)...,
139                                 )
140                                 cmd.Env = append(
141                                         cmd.Env,
142                                         "NNCP_SELF="+ctx.Self.Id.String(),
143                                         "NNCP_SENDER="+sender.Id.String(),
144                                         "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
145                                 )
146                                 cmd.Stdin = decompressor
147                                 if err = cmd.Run(); err != nil {
148                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
149                                         isBad = true
150                                         goto Closing
151                                 }
152                         }
153                         ctx.LogI("rx", sds, "")
154                         if !dryRun {
155                                 if doSeen {
156                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
157                                                 fd.Close()
158                                         }
159                                 }
160                                 if err = os.Remove(job.Fd.Name()); err != nil {
161                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
162                                         isBad = true
163                                 }
164                         }
165                 case PktTypeFile:
166                         if noFile {
167                                 goto Closing
168                         }
169                         dst := string(pkt.Path[:int(pkt.PathLen)])
170                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
171                         if filepath.IsAbs(dst) {
172                                 ctx.LogE("rx", sds, "non-relative destination path")
173                                 isBad = true
174                                 goto Closing
175                         }
176                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
177                         if incoming == nil {
178                                 ctx.LogE("rx", sds, "incoming is not allowed")
179                                 isBad = true
180                                 goto Closing
181                         }
182                         dir := filepath.Join(*incoming, path.Dir(dst))
183                         if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
184                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
185                                 isBad = true
186                                 goto Closing
187                         }
188                         if !dryRun {
189                                 tmp, err := TempFile(dir, "file")
190                                 if err != nil {
191                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
192                                         isBad = true
193                                         goto Closing
194                                 }
195                                 sds["tmp"] = tmp.Name()
196                                 ctx.LogD("rx", sds, "created")
197                                 bufW := bufio.NewWriter(tmp)
198                                 if _, err = io.Copy(bufW, pipeR); err != nil {
199                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
200                                         isBad = true
201                                         goto Closing
202                                 }
203                                 if err = bufW.Flush(); err != nil {
204                                         tmp.Close()
205                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
206                                         isBad = true
207                                         goto Closing
208                                 }
209                                 if err = tmp.Sync(); err != nil {
210                                         tmp.Close()
211                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
212                                         isBad = true
213                                         goto Closing
214                                 }
215                                 tmp.Close()
216                                 dstPathOrig := filepath.Join(*incoming, dst)
217                                 dstPath := dstPathOrig
218                                 dstPathCtr := 0
219                                 for {
220                                         if _, err = os.Stat(dstPath); err != nil {
221                                                 if os.IsNotExist(err) {
222                                                         break
223                                                 }
224                                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
225                                                 isBad = true
226                                                 goto Closing
227                                         }
228                                         dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
229                                         dstPathCtr++
230                                 }
231                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
232                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
233                                         isBad = true
234                                 }
235                                 delete(sds, "tmp")
236                         }
237                         ctx.LogI("rx", sds, "")
238                         if !dryRun {
239                                 if doSeen {
240                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
241                                                 fd.Close()
242                                         }
243                                 }
244                                 if err = os.Remove(job.Fd.Name()); err != nil {
245                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
246                                         isBad = true
247                                 }
248                                 sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
249                                 if exists && len(sendmail) > 0 && ctx.NotifyFile != nil {
250                                         cmd := exec.Command(
251                                                 sendmail[0],
252                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
253                                         )
254                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
255                                                 "File from %s: %s (%s)",
256                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
257                                                 dst,
258                                                 humanize.IBytes(uint64(pktSize)),
259                                         ))
260                                         cmd.Run()
261                                 }
262                         }
263                 case PktTypeFreq:
264                         if noFreq {
265                                 goto Closing
266                         }
267                         src := string(pkt.Path[:int(pkt.PathLen)])
268                         if filepath.IsAbs(src) {
269                                 ctx.LogE("rx", sds, "non-relative source path")
270                                 isBad = true
271                                 goto Closing
272                         }
273                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
274                         dstRaw, err := ioutil.ReadAll(pipeR)
275                         if err != nil {
276                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
277                                 isBad = true
278                                 goto Closing
279                         }
280                         dst := string(dstRaw)
281                         sds["dst"] = dst
282                         sender := ctx.Neigh[*job.PktEnc.Sender]
283                         freqPath := sender.FreqPath
284                         if freqPath == nil {
285                                 ctx.LogE("rx", sds, "freqing is not allowed")
286                                 isBad = true
287                                 goto Closing
288                         }
289                         if !dryRun {
290                                 err = ctx.TxFile(
291                                         sender,
292                                         pkt.Nice,
293                                         filepath.Join(*freqPath, src),
294                                         dst,
295                                         sender.FreqChunked,
296                                         sender.FreqMinSize,
297                                 )
298                                 if err != nil {
299                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
300                                         isBad = true
301                                         goto Closing
302                                 }
303                         }
304                         ctx.LogI("rx", sds, "")
305                         if !dryRun {
306                                 if doSeen {
307                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
308                                                 fd.Close()
309                                         }
310                                 }
311                                 if err = os.Remove(job.Fd.Name()); err != nil {
312                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
313                                         isBad = true
314                                 }
315                                 sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
316                                 if exists && len(sendmail) > 0 && ctx.NotifyFreq != nil {
317                                         cmd := exec.Command(
318                                                 sendmail[0],
319                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
320                                         )
321                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
322                                                 "Freq from %s: %s",
323                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
324                                                 src,
325                                         ))
326                                         cmd.Run()
327                                 }
328                         }
329                 case PktTypeTrns:
330                         if noTrns {
331                                 goto Closing
332                         }
333                         dst := new([blake2b.Size256]byte)
334                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
335                         nodeId := NodeId(*dst)
336                         node, known := ctx.Neigh[nodeId]
337                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
338                         if !known {
339                                 ctx.LogE("rx", sds, "unknown node")
340                                 isBad = true
341                                 goto Closing
342                         }
343                         ctx.LogD("rx", sds, "taken")
344                         if !dryRun {
345                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
346                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
347                                         isBad = true
348                                         goto Closing
349                                 }
350                         }
351                         ctx.LogI("rx", sds, "")
352                         if !dryRun {
353                                 if doSeen {
354                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
355                                                 fd.Close()
356                                         }
357                                 }
358                                 if err = os.Remove(job.Fd.Name()); err != nil {
359                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
360                                         isBad = true
361                                 }
362                         }
363                 default:
364                         ctx.LogE("rx", sds, "unknown type")
365                         isBad = true
366                 }
367         Closing:
368                 pipeR.Close()
369         }
370         return isBad
371 }