]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Merge branch 'develop'
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36
37         xdr "github.com/davecgh/go-xdr/xdr2"
38         "github.com/dustin/go-humanize"
39         "github.com/klauspost/compress/zstd"
40         "golang.org/x/crypto/blake2b"
41         "golang.org/x/crypto/poly1305"
42 )
43
44 const (
45         SeenSuffix = ".seen"
46 )
47
48 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
49         lines := []string{
50                 "From: " + fromTo.From,
51                 "To: " + fromTo.To,
52                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
53         }
54         if len(body) > 0 {
55                 lines = append(lines, []string{
56                         "MIME-Version: 1.0",
57                         "Content-Type: text/plain; charset=utf-8",
58                         "Content-Transfer-Encoding: base64",
59                         "",
60                         base64.StdEncoding.EncodeToString(body),
61                 }...)
62         }
63         return strings.NewReader(strings.Join(lines, "\n"))
64 }
65
66 func (ctx *Ctx) Toss(
67         nodeId *NodeId,
68         nice uint8,
69         dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
70 ) bool {
71         dirLock, err := ctx.LockDir(nodeId, "toss")
72         if err != nil {
73                 ctx.LogE("rx", SDS{}, err, "lock")
74                 return false
75         }
76         defer ctx.UnlockDir(dirLock)
77         isBad := false
78         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
79         decompressor, err := zstd.NewReader(nil)
80         if err != nil {
81                 panic(err)
82         }
83         defer decompressor.Close()
84         for job := range ctx.Jobs(nodeId, TRx) {
85                 pktName := filepath.Base(job.Fd.Name())
86                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
87                 if job.PktEnc.Nice > nice {
88                         ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice")
89                         continue
90                 }
91                 pipeR, pipeW := io.Pipe()
92                 go func(job Job) error {
93                         pipeWB := bufio.NewWriter(pipeW)
94                         _, _, err := PktEncRead(
95                                 ctx.Self,
96                                 ctx.Neigh,
97                                 bufio.NewReader(job.Fd),
98                                 pipeWB,
99                         )
100                         job.Fd.Close() // #nosec G104
101                         if err != nil {
102                                 ctx.LogE("rx", sds, err, "decryption")
103                                 return pipeW.CloseWithError(err)
104                         }
105                         if err = pipeWB.Flush(); err != nil {
106                                 ctx.LogE("rx", sds, err, "decryption flush")
107                                 return pipeW.CloseWithError(err)
108                         }
109                         return pipeW.Close()
110                 }(job)
111                 var pkt Pkt
112                 var err error
113                 var pktSize int64
114                 var pktSizeBlocks int64
115                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
116                         ctx.LogE("rx", sds, err, "unmarshal")
117                         isBad = true
118                         goto Closing
119                 }
120                 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
121                 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
122                 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
123                         pktSize -= poly1305.TagSize
124                 }
125                 pktSize -= pktSizeBlocks * poly1305.TagSize
126                 sds["size"] = pktSize
127                 ctx.LogD("rx", sds, "taken")
128                 switch pkt.Type {
129                 case PktTypeExec:
130                         if noExec {
131                                 goto Closing
132                         }
133                         path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
134                         handle := string(path[0])
135                         args := make([]string, 0, len(path)-1)
136                         for _, p := range path[1:] {
137                                 args = append(args, string(p))
138                         }
139                         argsStr := strings.Join(append([]string{handle}, args...), " ")
140                         sds := SdsAdd(sds, SDS{
141                                 "type": "exec",
142                                 "dst":  argsStr,
143                         })
144                         sender := ctx.Neigh[*job.PktEnc.Sender]
145                         cmdline, exists := sender.Exec[handle]
146                         if !exists || len(cmdline) == 0 {
147                                 ctx.LogE("rx", sds, errors.New("No handle found"), "")
148                                 isBad = true
149                                 goto Closing
150                         }
151                         if err = decompressor.Reset(pipeR); err != nil {
152                                 log.Fatalln(err)
153                         }
154                         if !dryRun {
155                                 cmd := exec.Command(
156                                         cmdline[0],
157                                         append(cmdline[1:len(cmdline)], args...)...,
158                                 )
159                                 cmd.Env = append(
160                                         cmd.Env,
161                                         "NNCP_SELF="+ctx.Self.Id.String(),
162                                         "NNCP_SENDER="+sender.Id.String(),
163                                         "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
164                                 )
165                                 cmd.Stdin = decompressor
166                                 output, err := cmd.Output()
167                                 if err != nil {
168                                         ctx.LogE("rx", sds, err, "handle")
169                                         isBad = true
170                                         goto Closing
171                                 }
172                                 if len(sendmail) > 0 && ctx.NotifyExec != nil {
173                                         notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
174                                         if !exists {
175                                                 notify, exists = ctx.NotifyExec["*."+handle]
176                                         }
177                                         if exists {
178                                                 cmd := exec.Command(
179                                                         sendmail[0],
180                                                         append(sendmail[1:len(sendmail)], notify.To)...,
181                                                 )
182                                                 cmd.Stdin = newNotification(notify, fmt.Sprintf(
183                                                         "Exec from %s: %s", sender.Name, argsStr,
184                                                 ), output)
185                                                 if err = cmd.Run(); err != nil {
186                                                         ctx.LogE("rx", sds, err, "notify")
187                                                 }
188                                         }
189                                 }
190                         }
191                         ctx.LogI("rx", sds, "")
192                         if !dryRun {
193                                 if doSeen {
194                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
195                                                 fd.Close() // #nosec G104
196                                         }
197                                 }
198                                 if err = os.Remove(job.Fd.Name()); err != nil {
199                                         ctx.LogE("rx", sds, err, "remove")
200                                         isBad = true
201                                 }
202                         }
203                 case PktTypeFile:
204                         if noFile {
205                                 goto Closing
206                         }
207                         dst := string(pkt.Path[:int(pkt.PathLen)])
208                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
209                         if filepath.IsAbs(dst) {
210                                 ctx.LogE("rx", sds, errors.New("non-relative destination path"), "")
211                                 isBad = true
212                                 goto Closing
213                         }
214                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
215                         if incoming == nil {
216                                 ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "")
217                                 isBad = true
218                                 goto Closing
219                         }
220                         dir := filepath.Join(*incoming, path.Dir(dst))
221                         if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
222                                 ctx.LogE("rx", sds, err, "mkdir")
223                                 isBad = true
224                                 goto Closing
225                         }
226                         if !dryRun {
227                                 tmp, err := TempFile(dir, "file")
228                                 if err != nil {
229                                         ctx.LogE("rx", sds, err, "mktemp")
230                                         isBad = true
231                                         goto Closing
232                                 }
233                                 sds["tmp"] = tmp.Name()
234                                 ctx.LogD("rx", sds, "created")
235                                 bufW := bufio.NewWriter(tmp)
236                                 if _, err = CopyProgressed(
237                                         bufW, pipeR, "Rx file",
238                                         SdsAdd(sds, SDS{"fullsize": sds["size"]}),
239                                         ctx.ShowPrgrs,
240                                 ); err != nil {
241                                         ctx.LogE("rx", sds, err, "copy")
242                                         isBad = true
243                                         goto Closing
244                                 }
245                                 if err = bufW.Flush(); err != nil {
246                                         tmp.Close() // #nosec G104
247                                         ctx.LogE("rx", sds, err, "copy")
248                                         isBad = true
249                                         goto Closing
250                                 }
251                                 if err = tmp.Sync(); err != nil {
252                                         tmp.Close() // #nosec G104
253                                         ctx.LogE("rx", sds, err, "copy")
254                                         isBad = true
255                                         goto Closing
256                                 }
257                                 if err = tmp.Close(); err != nil {
258                                         ctx.LogE("rx", sds, err, "copy")
259                                         isBad = true
260                                         goto Closing
261                                 }
262                                 dstPathOrig := filepath.Join(*incoming, dst)
263                                 dstPath := dstPathOrig
264                                 dstPathCtr := 0
265                                 for {
266                                         if _, err = os.Stat(dstPath); err != nil {
267                                                 if os.IsNotExist(err) {
268                                                         break
269                                                 }
270                                                 ctx.LogE("rx", sds, err, "stat")
271                                                 isBad = true
272                                                 goto Closing
273                                         }
274                                         dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
275                                         dstPathCtr++
276                                 }
277                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
278                                         ctx.LogE("rx", sds, err, "rename")
279                                         isBad = true
280                                 }
281                                 if err = DirSync(*incoming); err != nil {
282                                         ctx.LogE("rx", sds, err, "sync")
283                                         isBad = true
284                                 }
285                                 delete(sds, "tmp")
286                         }
287                         ctx.LogI("rx", sds, "")
288                         if !dryRun {
289                                 if doSeen {
290                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
291                                                 fd.Close() // #nosec G104
292                                         }
293                                 }
294                                 if err = os.Remove(job.Fd.Name()); err != nil {
295                                         ctx.LogE("rx", sds, err, "remove")
296                                         isBad = true
297                                 }
298                                 if len(sendmail) > 0 && ctx.NotifyFile != nil {
299                                         cmd := exec.Command(
300                                                 sendmail[0],
301                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
302                                         )
303                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
304                                                 "File from %s: %s (%s)",
305                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
306                                                 dst,
307                                                 humanize.IBytes(uint64(pktSize)),
308                                         ), nil)
309                                         if err = cmd.Run(); err != nil {
310                                                 ctx.LogE("rx", sds, err, "notify")
311                                         }
312                                 }
313                         }
314                 case PktTypeFreq:
315                         if noFreq {
316                                 goto Closing
317                         }
318                         src := string(pkt.Path[:int(pkt.PathLen)])
319                         if filepath.IsAbs(src) {
320                                 ctx.LogE("rx", sds, errors.New("non-relative source path"), "")
321                                 isBad = true
322                                 goto Closing
323                         }
324                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
325                         dstRaw, err := ioutil.ReadAll(pipeR)
326                         if err != nil {
327                                 ctx.LogE("rx", sds, err, "read")
328                                 isBad = true
329                                 goto Closing
330                         }
331                         dst := string(dstRaw)
332                         sds["dst"] = dst
333                         sender := ctx.Neigh[*job.PktEnc.Sender]
334                         freqPath := sender.FreqPath
335                         if freqPath == nil {
336                                 ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "")
337                                 isBad = true
338                                 goto Closing
339                         }
340                         if !dryRun {
341                                 err = ctx.TxFile(
342                                         sender,
343                                         pkt.Nice,
344                                         filepath.Join(*freqPath, src),
345                                         dst,
346                                         sender.FreqChunked,
347                                         sender.FreqMinSize,
348                                         sender.FreqMaxSize,
349                                 )
350                                 if err != nil {
351                                         ctx.LogE("rx", sds, err, "tx file")
352                                         isBad = true
353                                         goto Closing
354                                 }
355                         }
356                         ctx.LogI("rx", sds, "")
357                         if !dryRun {
358                                 if doSeen {
359                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
360                                                 fd.Close() // #nosec G104
361                                         }
362                                 }
363                                 if err = os.Remove(job.Fd.Name()); err != nil {
364                                         ctx.LogE("rx", sds, err, "remove")
365                                         isBad = true
366                                 }
367                                 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
368                                         cmd := exec.Command(
369                                                 sendmail[0],
370                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
371                                         )
372                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
373                                                 "Freq from %s: %s", sender.Name, src,
374                                         ), nil)
375                                         if err = cmd.Run(); err != nil {
376                                                 ctx.LogE("rx", sds, err, "notify")
377                                         }
378                                 }
379                         }
380                 case PktTypeTrns:
381                         if noTrns {
382                                 goto Closing
383                         }
384                         dst := new([blake2b.Size256]byte)
385                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
386                         nodeId := NodeId(*dst)
387                         node, known := ctx.Neigh[nodeId]
388                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
389                         if !known {
390                                 ctx.LogE("rx", sds, errors.New("unknown node"), "")
391                                 isBad = true
392                                 goto Closing
393                         }
394                         ctx.LogD("rx", sds, "taken")
395                         if !dryRun {
396                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
397                                         ctx.LogE("rx", sds, err, "tx trns")
398                                         isBad = true
399                                         goto Closing
400                                 }
401                         }
402                         ctx.LogI("rx", sds, "")
403                         if !dryRun {
404                                 if doSeen {
405                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
406                                                 fd.Close() // #nosec G104
407                                         }
408                                 }
409                                 if err = os.Remove(job.Fd.Name()); err != nil {
410                                         ctx.LogE("rx", sds, err, "remove")
411                                         isBad = true
412                                 }
413                         }
414                 default:
415                         ctx.LogE("rx", sds, errors.New("unknown type"), "")
416                         isBad = true
417                 }
418         Closing:
419                 pipeR.Close() // #nosec G104
420         }
421         return isBad
422 }