]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
goimports invocation, xdr alias fixed
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "fmt"
24         "io"
25         "io/ioutil"
26         "log"
27         "mime"
28         "os"
29         "os/exec"
30         "path"
31         "path/filepath"
32         "strconv"
33         "strings"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/dustin/go-humanize"
37         "github.com/klauspost/compress/zstd"
38         "golang.org/x/crypto/blake2b"
39         "golang.org/x/crypto/poly1305"
40 )
41
42 const (
43         SeenSuffix = ".seen"
44 )
45
46 func newNotification(fromTo *FromToJSON, subject string) io.Reader {
47         return strings.NewReader(fmt.Sprintf(
48                 "From: %s\nTo: %s\nSubject: %s\n",
49                 fromTo.From,
50                 fromTo.To,
51                 mime.BEncoding.Encode("UTF-8", subject),
52         ))
53 }
54
55 func (ctx *Ctx) Toss(
56         nodeId *NodeId,
57         nice uint8,
58         dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
59 ) bool {
60         isBad := false
61         decompressor, err := zstd.NewReader(nil)
62         if err != nil {
63                 panic(err)
64         }
65         defer decompressor.Close()
66         for job := range ctx.Jobs(nodeId, TRx) {
67                 pktName := filepath.Base(job.Fd.Name())
68                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
69                 if job.PktEnc.Nice > nice {
70                         ctx.LogD("rx", SdsAdd(sds, SDS{
71                                 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
72                         }), "too nice")
73                         continue
74                 }
75                 pipeR, pipeW := io.Pipe()
76                 errs := make(chan error, 1)
77                 go func(job Job) {
78                         pipeWB := bufio.NewWriter(pipeW)
79                         _, _, err := PktEncRead(
80                                 ctx.Self,
81                                 ctx.Neigh,
82                                 bufio.NewReader(job.Fd),
83                                 pipeWB,
84                         )
85                         errs <- err
86                         pipeWB.Flush()
87                         pipeW.Close()
88                         job.Fd.Close()
89                         if err != nil {
90                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
91                         }
92                 }(job)
93                 var pkt Pkt
94                 var err error
95                 var pktSize int64
96                 var pktSizeBlocks int64
97                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
98                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
99                         isBad = true
100                         goto Closing
101                 }
102                 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
103                 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
104                 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
105                         pktSize -= poly1305.TagSize
106                 }
107                 pktSize -= pktSizeBlocks * poly1305.TagSize
108                 sds["size"] = strconv.FormatInt(pktSize, 10)
109                 ctx.LogD("rx", sds, "taken")
110                 switch pkt.Type {
111                 case PktTypeExec:
112                         if noExec {
113                                 goto Closing
114                         }
115                         path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
116                         handle := string(path[0])
117                         args := make([]string, 0, len(path)-1)
118                         for _, p := range path[1:] {
119                                 args = append(args, string(p))
120                         }
121                         sds := SdsAdd(sds, SDS{
122                                 "type": "exec",
123                                 "dst":  strings.Join(append([]string{handle}, args...), " "),
124                         })
125                         sender := ctx.Neigh[*job.PktEnc.Sender]
126                         cmdline, exists := sender.Exec[handle]
127                         if !exists || len(cmdline) == 0 {
128                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "")
129                                 isBad = true
130                                 goto Closing
131                         }
132                         if err = decompressor.Reset(pipeR); err != nil {
133                                 log.Fatalln(err)
134                         }
135                         if !dryRun {
136                                 cmd := exec.Command(
137                                         cmdline[0],
138                                         append(cmdline[1:len(cmdline)], args...)...,
139                                 )
140                                 cmd.Env = append(
141                                         cmd.Env,
142                                         "NNCP_SELF="+ctx.Self.Id.String(),
143                                         "NNCP_SENDER="+sender.Id.String(),
144                                         "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
145                                 )
146                                 cmd.Stdin = decompressor
147                                 if err = cmd.Run(); err != nil {
148                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
149                                         isBad = true
150                                         goto Closing
151                                 }
152                         }
153                         ctx.LogI("rx", sds, "")
154                         if !dryRun {
155                                 if doSeen {
156                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
157                                                 fd.Close()
158                                         }
159                                 }
160                                 if err = os.Remove(job.Fd.Name()); err != nil {
161                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
162                                         isBad = true
163                                 }
164                         }
165                 case PktTypeFile:
166                         if noFile {
167                                 goto Closing
168                         }
169                         dst := string(pkt.Path[:int(pkt.PathLen)])
170                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
171                         if filepath.IsAbs(dst) {
172                                 ctx.LogE("rx", sds, "non-relative destination path")
173                                 isBad = true
174                                 goto Closing
175                         }
176                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
177                         if incoming == nil {
178                                 ctx.LogE("rx", sds, "incoming is not allowed")
179                                 isBad = true
180                                 goto Closing
181                         }
182                         dir := filepath.Join(*incoming, path.Dir(dst))
183                         if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
184                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
185                                 isBad = true
186                                 goto Closing
187                         }
188                         if !dryRun {
189                                 tmp, err := TempFile(dir, "file")
190                                 if err != nil {
191                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
192                                         isBad = true
193                                         goto Closing
194                                 }
195                                 sds["tmp"] = tmp.Name()
196                                 ctx.LogD("rx", sds, "created")
197                                 bufW := bufio.NewWriter(tmp)
198                                 if _, err = io.Copy(bufW, pipeR); err != nil {
199                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
200                                         isBad = true
201                                         goto Closing
202                                 }
203                                 if err = bufW.Flush(); err != nil {
204                                         tmp.Close()
205                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
206                                         isBad = true
207                                         goto Closing
208                                 }
209                                 if err = tmp.Sync(); err != nil {
210                                         tmp.Close()
211                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
212                                         isBad = true
213                                         goto Closing
214                                 }
215                                 tmp.Close()
216                                 dstPathOrig := filepath.Join(*incoming, dst)
217                                 dstPath := dstPathOrig
218                                 dstPathCtr := 0
219                                 for {
220                                         if _, err = os.Stat(dstPath); err != nil {
221                                                 if os.IsNotExist(err) {
222                                                         break
223                                                 }
224                                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
225                                                 isBad = true
226                                                 goto Closing
227                                         }
228                                         dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
229                                         dstPathCtr++
230                                 }
231                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
232                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
233                                         isBad = true
234                                 }
235                                 delete(sds, "tmp")
236                         }
237                         ctx.LogI("rx", sds, "")
238                         if !dryRun {
239                                 if doSeen {
240                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
241                                                 fd.Close()
242                                         }
243                                 }
244                                 if err = os.Remove(job.Fd.Name()); err != nil {
245                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
246                                         isBad = true
247                                 }
248                                 sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
249                                 if exists && len(sendmail) > 0 && ctx.NotifyFile != nil {
250                                         cmd := exec.Command(
251                                                 sendmail[0],
252                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
253                                         )
254                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
255                                                 "File from %s: %s (%s)",
256                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
257                                                 dst,
258                                                 humanize.IBytes(uint64(pktSize)),
259                                         ))
260                                         cmd.Run()
261                                 }
262                         }
263                 case PktTypeFreq:
264                         if noFreq {
265                                 goto Closing
266                         }
267                         src := string(pkt.Path[:int(pkt.PathLen)])
268                         if filepath.IsAbs(src) {
269                                 ctx.LogE("rx", sds, "non-relative source path")
270                                 isBad = true
271                                 goto Closing
272                         }
273                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
274                         dstRaw, err := ioutil.ReadAll(pipeR)
275                         if err != nil {
276                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
277                                 isBad = true
278                                 goto Closing
279                         }
280                         dst := string(dstRaw)
281                         sds["dst"] = dst
282                         sender := ctx.Neigh[*job.PktEnc.Sender]
283                         freq := sender.Freq
284                         if freq == nil {
285                                 ctx.LogE("rx", sds, "freqing is not allowed")
286                                 isBad = true
287                                 goto Closing
288                         }
289                         if !dryRun {
290                                 if sender.FreqChunked == 0 {
291                                         err = ctx.TxFile(
292                                                 sender,
293                                                 pkt.Nice,
294                                                 filepath.Join(*freq, src),
295                                                 dst,
296                                                 sender.FreqMinSize,
297                                         )
298                                 } else {
299                                         err = ctx.TxFileChunked(
300                                                 sender,
301                                                 pkt.Nice,
302                                                 filepath.Join(*freq, src),
303                                                 dst,
304                                                 sender.FreqMinSize,
305                                                 sender.FreqChunked,
306                                         )
307                                 }
308                                 if err != nil {
309                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
310                                         isBad = true
311                                         goto Closing
312                                 }
313                         }
314                         ctx.LogI("rx", sds, "")
315                         if !dryRun {
316                                 if doSeen {
317                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
318                                                 fd.Close()
319                                         }
320                                 }
321                                 if err = os.Remove(job.Fd.Name()); err != nil {
322                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
323                                         isBad = true
324                                 }
325                                 sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
326                                 if exists && len(sendmail) > 0 && ctx.NotifyFreq != nil {
327                                         cmd := exec.Command(
328                                                 sendmail[0],
329                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
330                                         )
331                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
332                                                 "Freq from %s: %s",
333                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
334                                                 src,
335                                         ))
336                                         cmd.Run()
337                                 }
338                         }
339                 case PktTypeTrns:
340                         if noTrns {
341                                 goto Closing
342                         }
343                         dst := new([blake2b.Size256]byte)
344                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
345                         nodeId := NodeId(*dst)
346                         node, known := ctx.Neigh[nodeId]
347                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
348                         if !known {
349                                 ctx.LogE("rx", sds, "unknown node")
350                                 isBad = true
351                                 goto Closing
352                         }
353                         ctx.LogD("rx", sds, "taken")
354                         if !dryRun {
355                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
356                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
357                                         isBad = true
358                                         goto Closing
359                                 }
360                         }
361                         ctx.LogI("rx", sds, "")
362                         if !dryRun {
363                                 if doSeen {
364                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
365                                                 fd.Close()
366                                         }
367                                 }
368                                 if err = os.Remove(job.Fd.Name()); err != nil {
369                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
370                                         isBad = true
371                                 }
372                         }
373                 default:
374                         ctx.LogE("rx", sds, "unknown type")
375                         isBad = true
376                 }
377         Closing:
378                 pipeR.Close()
379         }
380         return isBad
381 }