]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Log handler's stdout/stderr
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenDir = "seen"
47 )
48
49 func jobPath2Seen(jobPath string) string {
50         return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
51 }
52
53 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
54         lines := []string{
55                 "From: " + fromTo.From,
56                 "To: " + fromTo.To,
57                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
58         }
59         if len(body) > 0 {
60                 lines = append(
61                         lines,
62                         "MIME-Version: 1.0",
63                         "Content-Type: text/plain; charset=utf-8",
64                         "Content-Transfer-Encoding: base64",
65                         "",
66                         base64.StdEncoding.EncodeToString(body),
67                 )
68         }
69         return strings.NewReader(strings.Join(lines, "\n"))
70 }
71
72 func pktSizeWithoutEnc(pktSize int64) int64 {
73         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
74         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
75         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
76                 pktSize -= poly1305.TagSize
77         }
78         pktSize -= pktSizeBlocks * poly1305.TagSize
79         return pktSize
80 }
81
82 var JobRepeatProcess = errors.New("needs processing repeat")
83
84 func jobProcess(
85         ctx *Ctx,
86         pipeR *io.PipeReader,
87         pktName string,
88         les LEs,
89         sender *Node,
90         nice uint8,
91         pktSize uint64,
92         jobPath string,
93         decompressor *zstd.Decoder,
94         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
95 ) error {
96         defer pipeR.Close()
97         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
98         var pkt Pkt
99         _, err := xdr.Unmarshal(pipeR, &pkt)
100         if err != nil {
101                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
102                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
103                 })
104                 return err
105         }
106         les = append(les, LE{"Size", int64(pktSize)})
107         ctx.LogD("rx", les, func(les LEs) string {
108                 return fmt.Sprintf(
109                         "Tossing %s/%s (%s)",
110                         sender.Name, pktName,
111                         humanize.IBytes(pktSize),
112                 )
113         })
114         switch pkt.Type {
115         case PktTypeExec, PktTypeExecFat:
116                 if noExec {
117                         return nil
118                 }
119                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
120                 handle := string(path[0])
121                 args := make([]string, 0, len(path)-1)
122                 for _, p := range path[1:] {
123                         args = append(args, string(p))
124                 }
125                 argsStr := strings.Join(append([]string{handle}, args...), " ")
126                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
127                 cmdline := sender.Exec[handle]
128                 if len(cmdline) == 0 {
129                         err = errors.New("No handle found")
130                         ctx.LogE(
131                                 "rx-no-handle", les, err,
132                                 func(les LEs) string {
133                                         return fmt.Sprintf(
134                                                 "Tossing exec %s/%s (%s): %s",
135                                                 sender.Name, pktName,
136                                                 humanize.IBytes(pktSize), argsStr,
137                                         )
138                                 },
139                         )
140                         return err
141                 }
142                 if pkt.Type == PktTypeExec {
143                         if err = decompressor.Reset(pipeR); err != nil {
144                                 log.Fatalln(err)
145                         }
146                 }
147                 if !dryRun {
148                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
149                         cmd.Env = append(
150                                 cmd.Env,
151                                 "NNCP_SELF="+ctx.Self.Id.String(),
152                                 "NNCP_SENDER="+sender.Id.String(),
153                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
154                         )
155                         if pkt.Type == PktTypeExec {
156                                 cmd.Stdin = decompressor
157                         } else {
158                                 cmd.Stdin = pipeR
159                         }
160                         output, err := cmd.CombinedOutput()
161                         if err != nil {
162                                 les = append(les, LE{"Output", strings.Split(
163                                         strings.Trim(string(output), "\n"), "\n"),
164                                 })
165                                 ctx.LogE("rx-handle", les, err, func(les LEs) string {
166                                         return fmt.Sprintf(
167                                                 "Tossing exec %s/%s (%s): %s: handling",
168                                                 sender.Name, pktName,
169                                                 humanize.IBytes(uint64(pktSize)), argsStr,
170                                         )
171                                 })
172                                 return err
173                         }
174                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
175                                 notify := ctx.NotifyExec[sender.Name+"."+handle]
176                                 if notify == nil {
177                                         notify = ctx.NotifyExec["*."+handle]
178                                 }
179                                 if notify != nil {
180                                         cmd := exec.Command(
181                                                 sendmail[0],
182                                                 append(sendmail[1:], notify.To)...,
183                                         )
184                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
185                                                 "Exec from %s: %s", sender.Name, argsStr,
186                                         ), output)
187                                         if err = cmd.Run(); err != nil {
188                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
189                                                         return fmt.Sprintf(
190                                                                 "Tossing exec %s/%s (%s): %s: notifying",
191                                                                 sender.Name, pktName,
192                                                                 humanize.IBytes(pktSize), argsStr,
193                                                         )
194                                                 })
195                                         }
196                                 }
197                         }
198                 }
199                 ctx.LogI("rx", les, func(les LEs) string {
200                         return fmt.Sprintf(
201                                 "Got exec from %s to %s (%s)",
202                                 sender.Name, argsStr,
203                                 humanize.IBytes(pktSize),
204                         )
205                 })
206                 if !dryRun && jobPath != "" {
207                         if doSeen {
208                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
209                                         return err
210                                 }
211                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
212                                         fd.Close()
213                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
214                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
215                                                         return fmt.Sprintf(
216                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
217                                                                 sender.Name, pktName,
218                                                                 humanize.IBytes(pktSize),
219                                                                 filepath.Base(jobPath),
220                                                         )
221                                                 })
222                                                 return err
223                                         }
224                                 }
225                         }
226                         if err = os.Remove(jobPath); err != nil {
227                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
228                                         return fmt.Sprintf(
229                                                 "Tossing exec %s/%s (%s): %s: notifying",
230                                                 sender.Name, pktName,
231                                                 humanize.IBytes(pktSize), argsStr,
232                                         )
233                                 })
234                                 return err
235                         } else if ctx.HdrUsage {
236                                 os.Remove(JobPath2Hdr(jobPath))
237                         }
238                 }
239
240         case PktTypeFile:
241                 if noFile {
242                         return nil
243                 }
244                 dst := string(pkt.Path[:int(pkt.PathLen)])
245                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
246                 if filepath.IsAbs(dst) {
247                         err = errors.New("non-relative destination path")
248                         ctx.LogE(
249                                 "rx-non-rel", les, err,
250                                 func(les LEs) string {
251                                         return fmt.Sprintf(
252                                                 "Tossing file %s/%s (%s): %s",
253                                                 sender.Name, pktName,
254                                                 humanize.IBytes(pktSize), dst,
255                                         )
256                                 },
257                         )
258                         return err
259                 }
260                 incoming := sender.Incoming
261                 if incoming == nil {
262                         err = errors.New("incoming is not allowed")
263                         ctx.LogE(
264                                 "rx-no-incoming", les, err,
265                                 func(les LEs) string {
266                                         return fmt.Sprintf(
267                                                 "Tossing file %s/%s (%s): %s",
268                                                 sender.Name, pktName,
269                                                 humanize.IBytes(pktSize), dst,
270                                         )
271                                 },
272                         )
273                         return err
274                 }
275                 dir := filepath.Join(*incoming, path.Dir(dst))
276                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
277                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
278                                 return fmt.Sprintf(
279                                         "Tossing file %s/%s (%s): %s: mkdir",
280                                         sender.Name, pktName,
281                                         humanize.IBytes(pktSize), dst,
282                                 )
283                         })
284                         return err
285                 }
286                 if !dryRun {
287                         tmp, err := TempFile(dir, "file")
288                         if err != nil {
289                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
290                                         return fmt.Sprintf(
291                                                 "Tossing file %s/%s (%s): %s: mktemp",
292                                                 sender.Name, pktName,
293                                                 humanize.IBytes(pktSize), dst,
294                                         )
295                                 })
296                                 return err
297                         }
298                         les = append(les, LE{"Tmp", tmp.Name()})
299                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
300                                 return fmt.Sprintf(
301                                         "Tossing file %s/%s (%s): %s: created: %s",
302                                         sender.Name, pktName,
303                                         humanize.IBytes(pktSize), dst, tmp.Name(),
304                                 )
305                         })
306                         bufW := bufio.NewWriter(tmp)
307                         if _, err = CopyProgressed(
308                                 bufW, pipeR, "Rx file",
309                                 append(les, LE{"FullSize", int64(pktSize)}),
310                                 ctx.ShowPrgrs,
311                         ); err != nil {
312                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
313                                         return fmt.Sprintf(
314                                                 "Tossing file %s/%s (%s): %s: copying",
315                                                 sender.Name, pktName,
316                                                 humanize.IBytes(pktSize), dst,
317                                         )
318                                 })
319                                 return err
320                         }
321                         if err = bufW.Flush(); err != nil {
322                                 tmp.Close()
323                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
324                                         return fmt.Sprintf(
325                                                 "Tossing file %s/%s (%s): %s: flushing",
326                                                 sender.Name, pktName,
327                                                 humanize.IBytes(pktSize), dst,
328                                         )
329                                 })
330                                 return err
331                         }
332                         if err = tmp.Sync(); err != nil {
333                                 tmp.Close()
334                                 ctx.LogE("rx-sync", les, err, func(les LEs) string {
335                                         return fmt.Sprintf(
336                                                 "Tossing file %s/%s (%s): %s: syncing",
337                                                 sender.Name, pktName,
338                                                 humanize.IBytes(pktSize), dst,
339                                         )
340                                 })
341                                 return err
342                         }
343                         if err = tmp.Close(); err != nil {
344                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
345                                         return fmt.Sprintf(
346                                                 "Tossing file %s/%s (%s): %s: closing",
347                                                 sender.Name, pktName,
348                                                 humanize.IBytes(pktSize), dst,
349                                         )
350                                 })
351                                 return err
352                         }
353                         dstPathOrig := filepath.Join(*incoming, dst)
354                         dstPath := dstPathOrig
355                         dstPathCtr := 0
356                         for {
357                                 if _, err = os.Stat(dstPath); err != nil {
358                                         if os.IsNotExist(err) {
359                                                 break
360                                         }
361                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
362                                                 return fmt.Sprintf(
363                                                         "Tossing file %s/%s (%s): %s: stating: %s",
364                                                         sender.Name, pktName,
365                                                         humanize.IBytes(pktSize), dst, dstPath,
366                                                 )
367                                         })
368                                         return err
369                                 }
370                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
371                                 dstPathCtr++
372                         }
373                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
374                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
375                                         return fmt.Sprintf(
376                                                 "Tossing file %s/%s (%s): %s: renaming",
377                                                 sender.Name, pktName,
378                                                 humanize.IBytes(pktSize), dst,
379                                         )
380                                 })
381                                 return err
382                         }
383                         if err = DirSync(*incoming); err != nil {
384                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
385                                         return fmt.Sprintf(
386                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
387                                                 sender.Name, pktName,
388                                                 humanize.IBytes(pktSize), dst,
389                                         )
390                                 })
391                                 return err
392                         }
393                         les = les[:len(les)-1] // delete Tmp
394                 }
395                 ctx.LogI("rx", les, func(les LEs) string {
396                         return fmt.Sprintf(
397                                 "Got file %s (%s) from %s",
398                                 dst, humanize.IBytes(pktSize), sender.Name,
399                         )
400                 })
401                 if !dryRun {
402                         if jobPath != "" {
403                                 if doSeen {
404                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
405                                                 return err
406                                         }
407                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
408                                                 fd.Close()
409                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
410                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
411                                                                 return fmt.Sprintf(
412                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
413                                                                         sender.Name, pktName,
414                                                                         humanize.IBytes(pktSize),
415                                                                         filepath.Base(jobPath),
416                                                                 )
417                                                         })
418                                                         return err
419                                                 }
420                                         }
421                                 }
422                                 if err = os.Remove(jobPath); err != nil {
423                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
424                                                 return fmt.Sprintf(
425                                                         "Tossing file %s/%s (%s): %s: removing",
426                                                         sender.Name, pktName,
427                                                         humanize.IBytes(pktSize), dst,
428                                                 )
429                                         })
430                                         return err
431                                 } else if ctx.HdrUsage {
432                                         os.Remove(JobPath2Hdr(jobPath))
433                                 }
434                         }
435                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
436                                 cmd := exec.Command(
437                                         sendmail[0],
438                                         append(sendmail[1:], ctx.NotifyFile.To)...,
439                                 )
440                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
441                                         "File from %s: %s (%s)",
442                                         sender.Name, dst, humanize.IBytes(pktSize),
443                                 ), nil)
444                                 if err = cmd.Run(); err != nil {
445                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
446                                                 return fmt.Sprintf(
447                                                         "Tossing file %s/%s (%s): %s: notifying",
448                                                         sender.Name, pktName,
449                                                         humanize.IBytes(pktSize), dst,
450                                                 )
451                                         })
452                                 }
453                         }
454                 }
455
456         case PktTypeFreq:
457                 if noFreq {
458                         return nil
459                 }
460                 src := string(pkt.Path[:int(pkt.PathLen)])
461                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
462                 if filepath.IsAbs(src) {
463                         err = errors.New("non-relative source path")
464                         ctx.LogE(
465                                 "rx-non-rel", les, err,
466                                 func(les LEs) string {
467                                         return fmt.Sprintf(
468                                                 "Tossing freq %s/%s (%s): %s: notifying",
469                                                 sender.Name, pktName,
470                                                 humanize.IBytes(pktSize), src,
471                                         )
472                                 },
473                         )
474                         return err
475                 }
476                 dstRaw, err := ioutil.ReadAll(pipeR)
477                 if err != nil {
478                         ctx.LogE("rx-read", les, err, func(les LEs) string {
479                                 return fmt.Sprintf(
480                                         "Tossing freq %s/%s (%s): %s: reading",
481                                         sender.Name, pktName,
482                                         humanize.IBytes(pktSize), src,
483                                 )
484                         })
485                         return err
486                 }
487                 dst := string(dstRaw)
488                 les = append(les, LE{"Dst", dst})
489                 freqPath := sender.FreqPath
490                 if freqPath == nil {
491                         err = errors.New("freqing is not allowed")
492                         ctx.LogE(
493                                 "rx-no-freq", les, err,
494                                 func(les LEs) string {
495                                         return fmt.Sprintf(
496                                                 "Tossing freq %s/%s (%s): %s -> %s",
497                                                 sender.Name, pktName,
498                                                 humanize.IBytes(pktSize), src, dst,
499                                         )
500                                 },
501                         )
502                         return err
503                 }
504                 if !dryRun {
505                         err = ctx.TxFile(
506                                 sender,
507                                 pkt.Nice,
508                                 filepath.Join(*freqPath, src),
509                                 dst,
510                                 sender.FreqChunked,
511                                 sender.FreqMinSize,
512                                 sender.FreqMaxSize,
513                                 nil,
514                         )
515                         if err != nil {
516                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
517                                         return fmt.Sprintf(
518                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
519                                                 sender.Name, pktName,
520                                                 humanize.IBytes(pktSize), src, dst,
521                                         )
522                                 })
523                                 return err
524                         }
525                 }
526                 ctx.LogI("rx", les, func(les LEs) string {
527                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
528                 })
529                 if !dryRun {
530                         if jobPath != "" {
531                                 if doSeen {
532                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
533                                                 return err
534                                         }
535                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
536                                                 fd.Close()
537                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
538                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
539                                                                 return fmt.Sprintf(
540                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
541                                                                         sender.Name, pktName,
542                                                                         humanize.IBytes(pktSize),
543                                                                         filepath.Base(jobPath),
544                                                                 )
545                                                         })
546                                                         return err
547                                                 }
548                                         }
549                                 }
550                                 if err = os.Remove(jobPath); err != nil {
551                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
552                                                 return fmt.Sprintf(
553                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
554                                                         sender.Name, pktName,
555                                                         humanize.IBytes(pktSize), src, dst,
556                                                 )
557                                         })
558                                         return err
559                                 } else if ctx.HdrUsage {
560                                         os.Remove(JobPath2Hdr(jobPath))
561                                 }
562                         }
563                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
564                                 cmd := exec.Command(
565                                         sendmail[0],
566                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
567                                 )
568                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
569                                         "Freq from %s: %s", sender.Name, src,
570                                 ), nil)
571                                 if err = cmd.Run(); err != nil {
572                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
573                                                 return fmt.Sprintf(
574                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
575                                                         sender.Name, pktName,
576                                                         humanize.IBytes(pktSize), src, dst,
577                                                 )
578                                         })
579                                 }
580                         }
581                 }
582
583         case PktTypeTrns:
584                 if noTrns {
585                         return nil
586                 }
587                 dst := new([MTHSize]byte)
588                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
589                 nodeId := NodeId(*dst)
590                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
591                 logMsg := func(les LEs) string {
592                         return fmt.Sprintf(
593                                 "Tossing trns %s/%s (%s): %s",
594                                 sender.Name, pktName,
595                                 humanize.IBytes(pktSize),
596                                 nodeId.String(),
597                         )
598                 }
599                 node := ctx.Neigh[nodeId]
600                 if node == nil {
601                         err = errors.New("unknown node")
602                         ctx.LogE("rx-unknown", les, err, logMsg)
603                         return err
604                 }
605                 ctx.LogD("rx-tx", les, logMsg)
606                 if !dryRun {
607                         if len(node.Via) == 0 {
608                                 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
609                                         ctx.LogE("rx", les, err, func(les LEs) string {
610                                                 return logMsg(les) + ": txing"
611                                         })
612                                         return err
613                                 }
614                         } else {
615                                 via := node.Via[:len(node.Via)-1]
616                                 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
617                                 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
618                                 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
619                                 if err != nil {
620                                         panic(err)
621                                 }
622                                 if _, err = ctx.Tx(
623                                         node,
624                                         pktTrns,
625                                         nice,
626                                         int64(pktSize), 0,
627                                         pipeR,
628                                         pktName,
629                                         nil,
630                                 ); err != nil {
631                                         ctx.LogE("rx", les, err, func(les LEs) string {
632                                                 return logMsg(les) + ": txing"
633                                         })
634                                         return err
635                                 }
636                         }
637                 }
638                 ctx.LogI("rx", les, func(les LEs) string {
639                         return fmt.Sprintf(
640                                 "Got transitional packet from %s to %s (%s)",
641                                 sender.Name,
642                                 ctx.NodeName(&nodeId),
643                                 humanize.IBytes(pktSize),
644                         )
645                 })
646                 if !dryRun && jobPath != "" {
647                         if doSeen {
648                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
649                                         return err
650                                 }
651                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
652                                         fd.Close()
653                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
654                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
655                                                         return fmt.Sprintf(
656                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
657                                                                 sender.Name, pktName,
658                                                                 humanize.IBytes(pktSize),
659                                                                 filepath.Base(jobPath),
660                                                         )
661                                                 })
662                                                 return err
663                                         }
664                                 }
665                         }
666                         if err = os.Remove(jobPath); err != nil {
667                                 ctx.LogE("rx", les, err, func(les LEs) string {
668                                         return fmt.Sprintf(
669                                                 "Tossing trns %s/%s (%s): %s: removing",
670                                                 sender.Name, pktName,
671                                                 humanize.IBytes(pktSize),
672                                                 ctx.NodeName(&nodeId),
673                                         )
674                                 })
675                                 return err
676                         } else if ctx.HdrUsage {
677                                 os.Remove(JobPath2Hdr(jobPath))
678                         }
679                 }
680
681         case PktTypeArea:
682                 if noArea {
683                         return nil
684                 }
685                 areaId := new(AreaId)
686                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
687                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
688                 logMsg := func(les LEs) string {
689                         return fmt.Sprintf(
690                                 "Tossing %s/%s (%s): area %s",
691                                 sender.Name, pktName,
692                                 humanize.IBytes(pktSize),
693                                 ctx.AreaName(areaId),
694                         )
695                 }
696                 area := ctx.AreaId2Area[*areaId]
697                 if area == nil {
698                         err = errors.New("unknown area")
699                         ctx.LogE("rx-area-unknown", les, err, logMsg)
700                         return err
701                 }
702                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
703                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
704                 if err != nil {
705                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
706                         return err
707                 }
708                 msgHashRaw := blake2b.Sum256(pktEncRaw)
709                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
710                 les = append(les, LE{"AreaMsg", msgHash})
711                 ctx.LogD("rx-area", les, logMsg)
712
713                 if dryRun {
714                         for _, nodeId := range area.Subs {
715                                 node := ctx.Neigh[*nodeId]
716                                 lesEcho := append(les, LE{"Echo", nodeId})
717                                 seenDir := filepath.Join(
718                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
719                                 )
720                                 seenPath := filepath.Join(seenDir, msgHash)
721                                 logMsgNode := func(les LEs) string {
722                                         return fmt.Sprintf(
723                                                 "%s: echoing to: %s", logMsg(les), node.Name,
724                                         )
725                                 }
726                                 if _, err := os.Stat(seenPath); err == nil {
727                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
728                                                 return logMsgNode(les) + ": already sent"
729                                         })
730                                         continue
731                                 }
732                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
733                         }
734                 } else {
735                         for _, nodeId := range area.Subs {
736                                 node := ctx.Neigh[*nodeId]
737                                 lesEcho := append(les, LE{"Echo", nodeId})
738                                 seenDir := filepath.Join(
739                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
740                                 )
741                                 seenPath := filepath.Join(seenDir, msgHash)
742                                 logMsgNode := func(les LEs) string {
743                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
744                                 }
745                                 if _, err := os.Stat(seenPath); err == nil {
746                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
747                                                 return logMsgNode(les) + ": already sent"
748                                         })
749                                         continue
750                                 }
751                                 if nodeId != sender.Id && nodeId != pktEnc.Sender {
752                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
753                                         if _, err = ctx.Tx(
754                                                 node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil,
755                                         ); err != nil {
756                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
757                                                 return err
758                                         }
759                                 }
760                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
761                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
762                                         return err
763                                 }
764                                 if fd, err := os.Create(seenPath); err == nil {
765                                         fd.Close()
766                                         if err = DirSync(seenDir); err != nil {
767                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
768                                                 return err
769                                         }
770                                 } else {
771                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
772                                         return err
773                                 }
774                                 return JobRepeatProcess
775                         }
776                 }
777
778                 seenDir := filepath.Join(
779                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
780                 )
781                 seenPath := filepath.Join(seenDir, msgHash)
782                 if _, err := os.Stat(seenPath); err == nil {
783                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
784                                 return logMsg(les) + ": already seen"
785                         })
786                         if !dryRun && jobPath != "" {
787                                 if err = os.Remove(jobPath); err != nil {
788                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
789                                                 return fmt.Sprintf(
790                                                         "Tossing area %s/%s (%s): %s: removing",
791                                                         sender.Name, pktName,
792                                                         humanize.IBytes(pktSize),
793                                                         msgHash,
794                                                 )
795                                         })
796                                         return err
797                                 } else if ctx.HdrUsage {
798                                         os.Remove(JobPath2Hdr(jobPath))
799                                 }
800                         }
801                         return nil
802                 }
803
804                 if area.Prv == nil {
805                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
806                                 return logMsg(les) + ": no private key for decoding"
807                         })
808                 } else {
809                         signatureVerify := true
810                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
811                                 if !area.AllowUnknown {
812                                         err = errors.New("unknown sender")
813                                         ctx.LogE(
814                                                 "rx-area-unknown",
815                                                 append(les, LE{"Sender", pktEnc.Sender}),
816                                                 err,
817                                                 func(les LEs) string {
818                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
819                                                 },
820                                         )
821                                         return err
822                                 }
823                                 signatureVerify = false
824                         }
825                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
826                         copy(areaNodeOur.Id[:], area.Id[:])
827                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
828                         areaNode := Node{
829                                 Id:       new(NodeId),
830                                 Name:     area.Name,
831                                 Incoming: area.Incoming,
832                                 Exec:     area.Exec,
833                         }
834                         copy(areaNode.Id[:], area.Id[:])
835                         pktName := fmt.Sprintf(
836                                 "area/%s/%s",
837                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
838                         )
839
840                         pipeR, pipeW := io.Pipe()
841                         errs := make(chan error, 1)
842                         go func() {
843                                 errs <- jobProcess(
844                                         ctx,
845                                         pipeR,
846                                         pktName,
847                                         les,
848                                         &areaNode,
849                                         nice,
850                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
851                                         "",
852                                         decompressor,
853                                         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
854                                 )
855                         }()
856                         _, _, _, err = PktEncRead(
857                                 &areaNodeOur,
858                                 ctx.Neigh,
859                                 fullPipeR,
860                                 pipeW,
861                                 signatureVerify,
862                                 nil,
863                         )
864                         if err != nil {
865                                 pipeW.CloseWithError(err)
866                                 <-errs
867                                 return err
868                         }
869                         pipeW.Close()
870                         if err = <-errs; err != nil {
871                                 return err
872                         }
873                 }
874
875                 if !dryRun && jobPath != "" {
876                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
877                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
878                                 return err
879                         }
880                         if fd, err := os.Create(seenPath); err == nil {
881                                 fd.Close()
882                                 if err = DirSync(seenDir); err != nil {
883                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
884                                         return err
885                                 }
886                         }
887                         if err = os.Remove(jobPath); err != nil {
888                                 ctx.LogE("rx", les, err, func(les LEs) string {
889                                         return fmt.Sprintf(
890                                                 "Tossing area %s/%s (%s): %s: removing",
891                                                 sender.Name, pktName,
892                                                 humanize.IBytes(pktSize),
893                                                 msgHash,
894                                         )
895                                 })
896                                 return err
897                         } else if ctx.HdrUsage {
898                                 os.Remove(JobPath2Hdr(jobPath))
899                         }
900                 }
901
902         default:
903                 err = errors.New("unknown type")
904                 ctx.LogE(
905                         "rx-type-unknown", les, err,
906                         func(les LEs) string {
907                                 return fmt.Sprintf(
908                                         "Tossing %s/%s (%s)",
909                                         sender.Name, pktName, humanize.IBytes(pktSize),
910                                 )
911                         },
912                 )
913                 return err
914         }
915         return nil
916 }
917
918 func (ctx *Ctx) Toss(
919         nodeId *NodeId,
920         xx TRxTx,
921         nice uint8,
922         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
923 ) bool {
924         dirLock, err := ctx.LockDir(nodeId, "toss")
925         if err != nil {
926                 return false
927         }
928         defer ctx.UnlockDir(dirLock)
929         isBad := false
930         decompressor, err := zstd.NewReader(nil)
931         if err != nil {
932                 panic(err)
933         }
934         defer decompressor.Close()
935         for job := range ctx.Jobs(nodeId, xx) {
936                 pktName := filepath.Base(job.Path)
937                 les := LEs{
938                         {"Node", job.PktEnc.Sender},
939                         {"Pkt", pktName},
940                         {"Nice", int(job.PktEnc.Nice)},
941                 }
942                 if job.PktEnc.Nice > nice {
943                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
944                                 return fmt.Sprintf(
945                                         "Tossing %s/%s: too nice: %s",
946                                         ctx.NodeName(job.PktEnc.Sender), pktName,
947                                         NicenessFmt(job.PktEnc.Nice),
948                                 )
949                         })
950                         continue
951                 }
952                 fd, err := os.Open(job.Path)
953                 if err != nil {
954                         ctx.LogE("rx-open", les, err, func(les LEs) string {
955                                 return fmt.Sprintf(
956                                         "Tossing %s/%s: opening %s",
957                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
958                                 )
959                         })
960                         isBad = true
961                         continue
962                 }
963                 sender := ctx.Neigh[*job.PktEnc.Sender]
964                 if sender == nil {
965                         err := errors.New("unknown node")
966                         ctx.LogE("rx-open", les, err, func(les LEs) string {
967                                 return fmt.Sprintf(
968                                         "Tossing %s/%s",
969                                         ctx.NodeName(job.PktEnc.Sender), pktName,
970                                 )
971                         })
972                         isBad = true
973                         continue
974                 }
975                 errs := make(chan error, 1)
976                 var sharedKey []byte
977         Retry:
978                 pipeR, pipeW := io.Pipe()
979                 go func() {
980                         errs <- jobProcess(
981                                 ctx,
982                                 pipeR,
983                                 pktName,
984                                 les,
985                                 sender,
986                                 job.PktEnc.Nice,
987                                 uint64(pktSizeWithoutEnc(job.Size)),
988                                 job.Path,
989                                 decompressor,
990                                 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
991                         )
992                 }()
993                 pipeWB := bufio.NewWriter(pipeW)
994                 sharedKey, _, _, err = PktEncRead(
995                         ctx.Self,
996                         ctx.Neigh,
997                         bufio.NewReader(fd),
998                         pipeWB,
999                         sharedKey == nil,
1000                         sharedKey,
1001                 )
1002                 if err != nil {
1003                         pipeW.CloseWithError(err)
1004                 }
1005                 if err := pipeWB.Flush(); err != nil {
1006                         pipeW.CloseWithError(err)
1007                 }
1008                 pipeW.Close()
1009
1010                 if err != nil {
1011                         isBad = true
1012                         fd.Close()
1013                         <-errs
1014                         continue
1015                 }
1016                 if err = <-errs; err == JobRepeatProcess {
1017                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1018                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1019                                         return fmt.Sprintf(
1020                                                 "Tossing %s/%s: can not seek",
1021                                                 ctx.NodeName(job.PktEnc.Sender),
1022                                                 pktName,
1023                                         )
1024                                 })
1025                                 isBad = true
1026                                 break
1027                         }
1028                         goto Retry
1029                 } else if err != nil {
1030                         isBad = true
1031                 }
1032                 fd.Close()
1033         }
1034         return isBad
1035 }
1036
1037 func (ctx *Ctx) AutoToss(
1038         nodeId *NodeId,
1039         nice uint8,
1040         doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
1041 ) (chan struct{}, chan bool) {
1042         dw, err := ctx.NewDirWatcher(
1043                 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1044                 time.Second,
1045         )
1046         if err != nil {
1047                 log.Fatalln(err)
1048         }
1049         finish := make(chan struct{})
1050         badCode := make(chan bool)
1051         go func() {
1052                 bad := false
1053                 for {
1054                         select {
1055                         case <-finish:
1056                                 dw.Close()
1057                                 badCode <- bad
1058                                 return
1059                         case <-dw.C:
1060                                 bad = !ctx.Toss(
1061                                         nodeId, TRx, nice, false,
1062                                         doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
1063                         }
1064                 }
1065         }()
1066         return finish, badCode
1067 }