]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Remove unused #nosec-s
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenSuffix = ".seen"
47 )
48
49 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
50         lines := []string{
51                 "From: " + fromTo.From,
52                 "To: " + fromTo.To,
53                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
54         }
55         if len(body) > 0 {
56                 lines = append(
57                         lines,
58                         "MIME-Version: 1.0",
59                         "Content-Type: text/plain; charset=utf-8",
60                         "Content-Transfer-Encoding: base64",
61                         "",
62                         base64.StdEncoding.EncodeToString(body),
63                 )
64         }
65         return strings.NewReader(strings.Join(lines, "\n"))
66 }
67
68 func pktSizeWithoutEnc(pktSize int64) int64 {
69         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
70         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
71         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
72                 pktSize -= poly1305.TagSize
73         }
74         pktSize -= pktSizeBlocks * poly1305.TagSize
75         return pktSize
76 }
77
78 var JobRepeatProcess = errors.New("needs processing repeat")
79
80 func jobProcess(
81         ctx *Ctx,
82         pipeR *io.PipeReader,
83         pktName string,
84         les LEs,
85         sender *Node,
86         nice uint8,
87         pktSize uint64,
88         jobPath string,
89         decompressor *zstd.Decoder,
90         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
91 ) error {
92         defer pipeR.Close()
93         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
94         var pkt Pkt
95         _, err := xdr.Unmarshal(pipeR, &pkt)
96         if err != nil {
97                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
98                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
99                 })
100                 return err
101         }
102         les = append(les, LE{"Size", int64(pktSize)})
103         ctx.LogD("rx", les, func(les LEs) string {
104                 return fmt.Sprintf(
105                         "Tossing %s/%s (%s)",
106                         sender.Name, pktName,
107                         humanize.IBytes(pktSize),
108                 )
109         })
110         switch pkt.Type {
111         case PktTypeExec, PktTypeExecFat:
112                 if noExec {
113                         return nil
114                 }
115                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
116                 handle := string(path[0])
117                 args := make([]string, 0, len(path)-1)
118                 for _, p := range path[1:] {
119                         args = append(args, string(p))
120                 }
121                 argsStr := strings.Join(append([]string{handle}, args...), " ")
122                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
123                 cmdline := sender.Exec[handle]
124                 if len(cmdline) == 0 {
125                         err = errors.New("No handle found")
126                         ctx.LogE(
127                                 "rx-no-handle", les, err,
128                                 func(les LEs) string {
129                                         return fmt.Sprintf(
130                                                 "Tossing exec %s/%s (%s): %s",
131                                                 sender.Name, pktName,
132                                                 humanize.IBytes(pktSize), argsStr,
133                                         )
134                                 },
135                         )
136                         return err
137                 }
138                 if pkt.Type == PktTypeExec {
139                         if err = decompressor.Reset(pipeR); err != nil {
140                                 log.Fatalln(err)
141                         }
142                 }
143                 if !dryRun {
144                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
145                         cmd.Env = append(
146                                 cmd.Env,
147                                 "NNCP_SELF="+ctx.Self.Id.String(),
148                                 "NNCP_SENDER="+sender.Id.String(),
149                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
150                         )
151                         if pkt.Type == PktTypeExec {
152                                 cmd.Stdin = decompressor
153                         } else {
154                                 cmd.Stdin = pipeR
155                         }
156                         output, err := cmd.Output()
157                         if err != nil {
158                                 ctx.LogE("rx-hande", les, err, func(les LEs) string {
159                                         return fmt.Sprintf(
160                                                 "Tossing exec %s/%s (%s): %s: handling",
161                                                 sender.Name, pktName,
162                                                 humanize.IBytes(uint64(pktSize)), argsStr,
163                                         )
164                                 })
165                                 return err
166                         }
167                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
168                                 notify := ctx.NotifyExec[sender.Name+"."+handle]
169                                 if notify == nil {
170                                         notify = ctx.NotifyExec["*."+handle]
171                                 }
172                                 if notify != nil {
173                                         cmd := exec.Command(
174                                                 sendmail[0],
175                                                 append(sendmail[1:], notify.To)...,
176                                         )
177                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
178                                                 "Exec from %s: %s", sender.Name, argsStr,
179                                         ), output)
180                                         if err = cmd.Run(); err != nil {
181                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
182                                                         return fmt.Sprintf(
183                                                                 "Tossing exec %s/%s (%s): %s: notifying",
184                                                                 sender.Name, pktName,
185                                                                 humanize.IBytes(pktSize), argsStr,
186                                                         )
187                                                 })
188                                         }
189                                 }
190                         }
191                 }
192                 ctx.LogI("rx", les, func(les LEs) string {
193                         return fmt.Sprintf(
194                                 "Got exec from %s to %s (%s)",
195                                 sender.Name, argsStr,
196                                 humanize.IBytes(pktSize),
197                         )
198                 })
199                 if !dryRun && jobPath != "" {
200                         if doSeen {
201                                 if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
202                                         fd.Close()
203                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
204                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
205                                                         return fmt.Sprintf(
206                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
207                                                                 sender.Name, pktName,
208                                                                 humanize.IBytes(pktSize),
209                                                                 filepath.Base(jobPath),
210                                                         )
211                                                 })
212                                                 return err
213                                         }
214                                 }
215                         }
216                         if err = os.Remove(jobPath); err != nil {
217                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
218                                         return fmt.Sprintf(
219                                                 "Tossing exec %s/%s (%s): %s: notifying",
220                                                 sender.Name, pktName,
221                                                 humanize.IBytes(pktSize), argsStr,
222                                         )
223                                 })
224                                 return err
225                         } else if ctx.HdrUsage {
226                                 os.Remove(jobPath + HdrSuffix)
227                         }
228                 }
229
230         case PktTypeFile:
231                 if noFile {
232                         return nil
233                 }
234                 dst := string(pkt.Path[:int(pkt.PathLen)])
235                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
236                 if filepath.IsAbs(dst) {
237                         err = errors.New("non-relative destination path")
238                         ctx.LogE(
239                                 "rx-non-rel", les, err,
240                                 func(les LEs) string {
241                                         return fmt.Sprintf(
242                                                 "Tossing file %s/%s (%s): %s",
243                                                 sender.Name, pktName,
244                                                 humanize.IBytes(pktSize), dst,
245                                         )
246                                 },
247                         )
248                         return err
249                 }
250                 incoming := sender.Incoming
251                 if incoming == nil {
252                         err = errors.New("incoming is not allowed")
253                         ctx.LogE(
254                                 "rx-no-incoming", les, err,
255                                 func(les LEs) string {
256                                         return fmt.Sprintf(
257                                                 "Tossing file %s/%s (%s): %s",
258                                                 sender.Name, pktName,
259                                                 humanize.IBytes(pktSize), dst,
260                                         )
261                                 },
262                         )
263                         return err
264                 }
265                 dir := filepath.Join(*incoming, path.Dir(dst))
266                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
267                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
268                                 return fmt.Sprintf(
269                                         "Tossing file %s/%s (%s): %s: mkdir",
270                                         sender.Name, pktName,
271                                         humanize.IBytes(pktSize), dst,
272                                 )
273                         })
274                         return err
275                 }
276                 if !dryRun {
277                         tmp, err := TempFile(dir, "file")
278                         if err != nil {
279                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
280                                         return fmt.Sprintf(
281                                                 "Tossing file %s/%s (%s): %s: mktemp",
282                                                 sender.Name, pktName,
283                                                 humanize.IBytes(pktSize), dst,
284                                         )
285                                 })
286                                 return err
287                         }
288                         les = append(les, LE{"Tmp", tmp.Name()})
289                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
290                                 return fmt.Sprintf(
291                                         "Tossing file %s/%s (%s): %s: created: %s",
292                                         sender.Name, pktName,
293                                         humanize.IBytes(pktSize), dst, tmp.Name(),
294                                 )
295                         })
296                         bufW := bufio.NewWriter(tmp)
297                         if _, err = CopyProgressed(
298                                 bufW, pipeR, "Rx file",
299                                 append(les, LE{"FullSize", int64(pktSize)}),
300                                 ctx.ShowPrgrs,
301                         ); err != nil {
302                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
303                                         return fmt.Sprintf(
304                                                 "Tossing file %s/%s (%s): %s: copying",
305                                                 sender.Name, pktName,
306                                                 humanize.IBytes(pktSize), dst,
307                                         )
308                                 })
309                                 return err
310                         }
311                         if err = bufW.Flush(); err != nil {
312                                 tmp.Close()
313                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
314                                         return fmt.Sprintf(
315                                                 "Tossing file %s/%s (%s): %s: flushing",
316                                                 sender.Name, pktName,
317                                                 humanize.IBytes(pktSize), dst,
318                                         )
319                                 })
320                                 return err
321                         }
322                         if err = tmp.Sync(); err != nil {
323                                 tmp.Close()
324                                 ctx.LogE("rx-sync", les, err, func(les LEs) string {
325                                         return fmt.Sprintf(
326                                                 "Tossing file %s/%s (%s): %s: syncing",
327                                                 sender.Name, pktName,
328                                                 humanize.IBytes(pktSize), dst,
329                                         )
330                                 })
331                                 return err
332                         }
333                         if err = tmp.Close(); err != nil {
334                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
335                                         return fmt.Sprintf(
336                                                 "Tossing file %s/%s (%s): %s: closing",
337                                                 sender.Name, pktName,
338                                                 humanize.IBytes(pktSize), dst,
339                                         )
340                                 })
341                                 return err
342                         }
343                         dstPathOrig := filepath.Join(*incoming, dst)
344                         dstPath := dstPathOrig
345                         dstPathCtr := 0
346                         for {
347                                 if _, err = os.Stat(dstPath); err != nil {
348                                         if os.IsNotExist(err) {
349                                                 break
350                                         }
351                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
352                                                 return fmt.Sprintf(
353                                                         "Tossing file %s/%s (%s): %s: stating: %s",
354                                                         sender.Name, pktName,
355                                                         humanize.IBytes(pktSize), dst, dstPath,
356                                                 )
357                                         })
358                                         return err
359                                 }
360                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
361                                 dstPathCtr++
362                         }
363                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
364                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
365                                         return fmt.Sprintf(
366                                                 "Tossing file %s/%s (%s): %s: renaming",
367                                                 sender.Name, pktName,
368                                                 humanize.IBytes(pktSize), dst,
369                                         )
370                                 })
371                                 return err
372                         }
373                         if err = DirSync(*incoming); err != nil {
374                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
375                                         return fmt.Sprintf(
376                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
377                                                 sender.Name, pktName,
378                                                 humanize.IBytes(pktSize), dst,
379                                         )
380                                 })
381                                 return err
382                         }
383                         les = les[:len(les)-1] // delete Tmp
384                 }
385                 ctx.LogI("rx", les, func(les LEs) string {
386                         return fmt.Sprintf(
387                                 "Got file %s (%s) from %s",
388                                 dst, humanize.IBytes(pktSize), sender.Name,
389                         )
390                 })
391                 if !dryRun {
392                         if jobPath != "" {
393                                 if doSeen {
394                                         if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
395                                                 fd.Close()
396                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
397                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
398                                                                 return fmt.Sprintf(
399                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
400                                                                         sender.Name, pktName,
401                                                                         humanize.IBytes(pktSize),
402                                                                         filepath.Base(jobPath),
403                                                                 )
404                                                         })
405                                                         return err
406                                                 }
407                                         }
408                                 }
409                                 if err = os.Remove(jobPath); err != nil {
410                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
411                                                 return fmt.Sprintf(
412                                                         "Tossing file %s/%s (%s): %s: removing",
413                                                         sender.Name, pktName,
414                                                         humanize.IBytes(pktSize), dst,
415                                                 )
416                                         })
417                                         return err
418                                 } else if ctx.HdrUsage {
419                                         os.Remove(jobPath + HdrSuffix)
420                                 }
421                         }
422                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
423                                 cmd := exec.Command(
424                                         sendmail[0],
425                                         append(sendmail[1:], ctx.NotifyFile.To)...,
426                                 )
427                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
428                                         "File from %s: %s (%s)",
429                                         sender.Name, dst, humanize.IBytes(pktSize),
430                                 ), nil)
431                                 if err = cmd.Run(); err != nil {
432                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
433                                                 return fmt.Sprintf(
434                                                         "Tossing file %s/%s (%s): %s: notifying",
435                                                         sender.Name, pktName,
436                                                         humanize.IBytes(pktSize), dst,
437                                                 )
438                                         })
439                                 }
440                         }
441                 }
442
443         case PktTypeFreq:
444                 if noFreq {
445                         return nil
446                 }
447                 src := string(pkt.Path[:int(pkt.PathLen)])
448                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
449                 if filepath.IsAbs(src) {
450                         err = errors.New("non-relative source path")
451                         ctx.LogE(
452                                 "rx-non-rel", les, err,
453                                 func(les LEs) string {
454                                         return fmt.Sprintf(
455                                                 "Tossing freq %s/%s (%s): %s: notifying",
456                                                 sender.Name, pktName,
457                                                 humanize.IBytes(pktSize), src,
458                                         )
459                                 },
460                         )
461                         return err
462                 }
463                 dstRaw, err := ioutil.ReadAll(pipeR)
464                 if err != nil {
465                         ctx.LogE("rx-read", les, err, func(les LEs) string {
466                                 return fmt.Sprintf(
467                                         "Tossing freq %s/%s (%s): %s: reading",
468                                         sender.Name, pktName,
469                                         humanize.IBytes(pktSize), src,
470                                 )
471                         })
472                         return err
473                 }
474                 dst := string(dstRaw)
475                 les = append(les, LE{"Dst", dst})
476                 freqPath := sender.FreqPath
477                 if freqPath == nil {
478                         err = errors.New("freqing is not allowed")
479                         ctx.LogE(
480                                 "rx-no-freq", les, err,
481                                 func(les LEs) string {
482                                         return fmt.Sprintf(
483                                                 "Tossing freq %s/%s (%s): %s -> %s",
484                                                 sender.Name, pktName,
485                                                 humanize.IBytes(pktSize), src, dst,
486                                         )
487                                 },
488                         )
489                         return err
490                 }
491                 if !dryRun {
492                         err = ctx.TxFile(
493                                 sender,
494                                 pkt.Nice,
495                                 filepath.Join(*freqPath, src),
496                                 dst,
497                                 sender.FreqChunked,
498                                 sender.FreqMinSize,
499                                 sender.FreqMaxSize,
500                                 nil,
501                         )
502                         if err != nil {
503                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
504                                         return fmt.Sprintf(
505                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
506                                                 sender.Name, pktName,
507                                                 humanize.IBytes(pktSize), src, dst,
508                                         )
509                                 })
510                                 return err
511                         }
512                 }
513                 ctx.LogI("rx", les, func(les LEs) string {
514                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
515                 })
516                 if !dryRun {
517                         if jobPath != "" {
518                                 if doSeen {
519                                         if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
520                                                 fd.Close()
521                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
522                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
523                                                                 return fmt.Sprintf(
524                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
525                                                                         sender.Name, pktName,
526                                                                         humanize.IBytes(pktSize),
527                                                                         filepath.Base(jobPath),
528                                                                 )
529                                                         })
530                                                         return err
531                                                 }
532                                         }
533                                 }
534                                 if err = os.Remove(jobPath); err != nil {
535                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
536                                                 return fmt.Sprintf(
537                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
538                                                         sender.Name, pktName,
539                                                         humanize.IBytes(pktSize), src, dst,
540                                                 )
541                                         })
542                                         return err
543                                 } else if ctx.HdrUsage {
544                                         os.Remove(jobPath + HdrSuffix)
545                                 }
546                         }
547                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
548                                 cmd := exec.Command(
549                                         sendmail[0],
550                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
551                                 )
552                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
553                                         "Freq from %s: %s", sender.Name, src,
554                                 ), nil)
555                                 if err = cmd.Run(); err != nil {
556                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
557                                                 return fmt.Sprintf(
558                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
559                                                         sender.Name, pktName,
560                                                         humanize.IBytes(pktSize), src, dst,
561                                                 )
562                                         })
563                                 }
564                         }
565                 }
566
567         case PktTypeTrns:
568                 if noTrns {
569                         return nil
570                 }
571                 dst := new([MTHSize]byte)
572                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
573                 nodeId := NodeId(*dst)
574                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
575                 logMsg := func(les LEs) string {
576                         return fmt.Sprintf(
577                                 "Tossing trns %s/%s (%s): %s",
578                                 sender.Name, pktName,
579                                 humanize.IBytes(pktSize),
580                                 nodeId.String(),
581                         )
582                 }
583                 node := ctx.Neigh[nodeId]
584                 if node == nil {
585                         err = errors.New("unknown node")
586                         ctx.LogE("rx-unknown", les, err, logMsg)
587                         return err
588                 }
589                 ctx.LogD("rx-tx", les, logMsg)
590                 if !dryRun {
591                         if len(node.Via) == 0 {
592                                 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
593                                         ctx.LogE("rx", les, err, func(les LEs) string {
594                                                 return logMsg(les) + ": txing"
595                                         })
596                                         return err
597                                 }
598                         } else {
599                                 via := node.Via[:len(node.Via)-1]
600                                 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
601                                 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
602                                 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
603                                 if err != nil {
604                                         panic(err)
605                                 }
606                                 if _, err = ctx.Tx(
607                                         node,
608                                         pktTrns,
609                                         nice,
610                                         int64(pktSize), 0,
611                                         pipeR,
612                                         pktName,
613                                         nil,
614                                 ); err != nil {
615                                         ctx.LogE("rx", les, err, func(les LEs) string {
616                                                 return logMsg(les) + ": txing"
617                                         })
618                                         return err
619                                 }
620                         }
621                 }
622                 ctx.LogI("rx", les, func(les LEs) string {
623                         return fmt.Sprintf(
624                                 "Got transitional packet from %s to %s (%s)",
625                                 sender.Name,
626                                 ctx.NodeName(&nodeId),
627                                 humanize.IBytes(pktSize),
628                         )
629                 })
630                 if !dryRun && jobPath != "" {
631                         if doSeen {
632                                 if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
633                                         fd.Close()
634                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
635                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
636                                                         return fmt.Sprintf(
637                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
638                                                                 sender.Name, pktName,
639                                                                 humanize.IBytes(pktSize),
640                                                                 filepath.Base(jobPath),
641                                                         )
642                                                 })
643                                                 return err
644                                         }
645                                 }
646                         }
647                         if err = os.Remove(jobPath); err != nil {
648                                 ctx.LogE("rx", les, err, func(les LEs) string {
649                                         return fmt.Sprintf(
650                                                 "Tossing trns %s/%s (%s): %s: removing",
651                                                 sender.Name, pktName,
652                                                 humanize.IBytes(pktSize),
653                                                 ctx.NodeName(&nodeId),
654                                         )
655                                 })
656                                 return err
657                         } else if ctx.HdrUsage {
658                                 os.Remove(jobPath + HdrSuffix)
659                         }
660                 }
661
662         case PktTypeArea:
663                 if noArea {
664                         return nil
665                 }
666                 areaId := new(AreaId)
667                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
668                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
669                 logMsg := func(les LEs) string {
670                         return fmt.Sprintf(
671                                 "Tossing %s/%s (%s): area %s",
672                                 sender.Name, pktName,
673                                 humanize.IBytes(pktSize),
674                                 ctx.AreaName(areaId),
675                         )
676                 }
677                 area := ctx.AreaId2Area[*areaId]
678                 if area == nil {
679                         err = errors.New("unknown area")
680                         ctx.LogE("rx-area-unknown", les, err, logMsg)
681                         return err
682                 }
683                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
684                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
685                 if err != nil {
686                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
687                         return err
688                 }
689                 msgHashRaw := blake2b.Sum256(pktEncRaw)
690                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
691                 les = append(les, LE{"AreaMsg", msgHash})
692                 ctx.LogD("rx-area", les, logMsg)
693
694                 if dryRun {
695                         for _, nodeId := range area.Subs {
696                                 node := ctx.Neigh[*nodeId]
697                                 lesEcho := append(les, LE{"Echo", nodeId})
698                                 seenDir := filepath.Join(
699                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
700                                 )
701                                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
702                                 logMsgNode := func(les LEs) string {
703                                         return fmt.Sprintf(
704                                                 "%s: echoing to: %s", logMsg(les), node.Name,
705                                         )
706                                 }
707                                 if _, err := os.Stat(seenPath); err == nil {
708                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
709                                                 return logMsgNode(les) + ": already sent"
710                                         })
711                                         continue
712                                 }
713                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
714                         }
715                 } else {
716                         for _, nodeId := range area.Subs {
717                                 node := ctx.Neigh[*nodeId]
718                                 lesEcho := append(les, LE{"Echo", nodeId})
719                                 seenDir := filepath.Join(
720                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
721                                 )
722                                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
723                                 logMsgNode := func(les LEs) string {
724                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
725                                 }
726                                 if _, err := os.Stat(seenPath); err == nil {
727                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
728                                                 return logMsgNode(les) + ": already sent"
729                                         })
730                                         continue
731                                 }
732                                 if nodeId != sender.Id && nodeId != pktEnc.Sender {
733                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
734                                         if _, err = ctx.Tx(
735                                                 node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil,
736                                         ); err != nil {
737                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
738                                                 return err
739                                         }
740                                 }
741                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
742                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
743                                         return err
744                                 }
745                                 if fd, err := os.Create(seenPath); err == nil {
746                                         fd.Close()
747                                         if err = DirSync(seenDir); err != nil {
748                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
749                                                 return err
750                                         }
751                                 } else {
752                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
753                                         return err
754                                 }
755                                 return JobRepeatProcess
756                         }
757                 }
758
759                 seenDir := filepath.Join(
760                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
761                 )
762                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
763                 if _, err := os.Stat(seenPath); err == nil {
764                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
765                                 return logMsg(les) + ": already seen"
766                         })
767                         if !dryRun && jobPath != "" {
768                                 if err = os.Remove(jobPath); err != nil {
769                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
770                                                 return fmt.Sprintf(
771                                                         "Tossing area %s/%s (%s): %s: removing",
772                                                         sender.Name, pktName,
773                                                         humanize.IBytes(pktSize),
774                                                         msgHash,
775                                                 )
776                                         })
777                                         return err
778                                 } else if ctx.HdrUsage {
779                                         os.Remove(jobPath + HdrSuffix)
780                                 }
781                         }
782                         return nil
783                 }
784
785                 if area.Prv == nil {
786                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
787                                 return logMsg(les) + ": no private key for decoding"
788                         })
789                 } else {
790                         signatureVerify := true
791                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
792                                 if !area.AllowUnknown {
793                                         err = errors.New("unknown sender")
794                                         ctx.LogE(
795                                                 "rx-area-unknown",
796                                                 append(les, LE{"Sender", pktEnc.Sender}),
797                                                 err,
798                                                 func(les LEs) string {
799                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
800                                                 },
801                                         )
802                                         return err
803                                 }
804                                 signatureVerify = false
805                         }
806                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
807                         copy(areaNodeOur.Id[:], area.Id[:])
808                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
809                         areaNode := Node{
810                                 Id:       new(NodeId),
811                                 Name:     area.Name,
812                                 Incoming: area.Incoming,
813                                 Exec:     area.Exec,
814                         }
815                         copy(areaNode.Id[:], area.Id[:])
816                         pktName := fmt.Sprintf(
817                                 "area/%s/%s",
818                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
819                         )
820
821                         pipeR, pipeW := io.Pipe()
822                         errs := make(chan error, 1)
823                         go func() {
824                                 errs <- jobProcess(
825                                         ctx,
826                                         pipeR,
827                                         pktName,
828                                         les,
829                                         &areaNode,
830                                         nice,
831                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
832                                         "",
833                                         decompressor,
834                                         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
835                                 )
836                         }()
837                         _, _, _, err = PktEncRead(
838                                 &areaNodeOur,
839                                 ctx.Neigh,
840                                 fullPipeR,
841                                 pipeW,
842                                 signatureVerify,
843                                 nil,
844                         )
845                         if err != nil {
846                                 pipeW.CloseWithError(err)
847                                 <-errs
848                                 return err
849                         }
850                         pipeW.Close()
851                         if err = <-errs; err != nil {
852                                 return err
853                         }
854                 }
855
856                 if !dryRun && jobPath != "" {
857                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
858                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
859                                 return err
860                         }
861                         if fd, err := os.Create(seenPath); err == nil {
862                                 fd.Close()
863                                 if err = DirSync(seenDir); err != nil {
864                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
865                                         return err
866                                 }
867                         }
868                         if err = os.Remove(jobPath); err != nil {
869                                 ctx.LogE("rx", les, err, func(les LEs) string {
870                                         return fmt.Sprintf(
871                                                 "Tossing area %s/%s (%s): %s: removing",
872                                                 sender.Name, pktName,
873                                                 humanize.IBytes(pktSize),
874                                                 msgHash,
875                                         )
876                                 })
877                                 return err
878                         } else if ctx.HdrUsage {
879                                 os.Remove(jobPath + HdrSuffix)
880                         }
881                 }
882
883         default:
884                 err = errors.New("unknown type")
885                 ctx.LogE(
886                         "rx-type-unknown", les, err,
887                         func(les LEs) string {
888                                 return fmt.Sprintf(
889                                         "Tossing %s/%s (%s)",
890                                         sender.Name, pktName, humanize.IBytes(pktSize),
891                                 )
892                         },
893                 )
894                 return err
895         }
896         return nil
897 }
898
899 func (ctx *Ctx) Toss(
900         nodeId *NodeId,
901         xx TRxTx,
902         nice uint8,
903         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
904 ) bool {
905         dirLock, err := ctx.LockDir(nodeId, "toss")
906         if err != nil {
907                 return false
908         }
909         defer ctx.UnlockDir(dirLock)
910         isBad := false
911         decompressor, err := zstd.NewReader(nil)
912         if err != nil {
913                 panic(err)
914         }
915         defer decompressor.Close()
916         for job := range ctx.Jobs(nodeId, xx) {
917                 pktName := filepath.Base(job.Path)
918                 les := LEs{
919                         {"Node", job.PktEnc.Sender},
920                         {"Pkt", pktName},
921                         {"Nice", int(job.PktEnc.Nice)},
922                 }
923                 if job.PktEnc.Nice > nice {
924                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
925                                 return fmt.Sprintf(
926                                         "Tossing %s/%s: too nice: %s",
927                                         ctx.NodeName(job.PktEnc.Sender), pktName,
928                                         NicenessFmt(job.PktEnc.Nice),
929                                 )
930                         })
931                         continue
932                 }
933                 fd, err := os.Open(job.Path)
934                 if err != nil {
935                         ctx.LogE("rx-open", les, err, func(les LEs) string {
936                                 return fmt.Sprintf(
937                                         "Tossing %s/%s: opening %s",
938                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
939                                 )
940                         })
941                         isBad = true
942                         continue
943                 }
944                 sender := ctx.Neigh[*job.PktEnc.Sender]
945                 if sender == nil {
946                         err := errors.New("unknown node")
947                         ctx.LogE("rx-open", les, err, func(les LEs) string {
948                                 return fmt.Sprintf(
949                                         "Tossing %s/%s",
950                                         ctx.NodeName(job.PktEnc.Sender), pktName,
951                                 )
952                         })
953                         isBad = true
954                         continue
955                 }
956                 errs := make(chan error, 1)
957                 var sharedKey []byte
958         Retry:
959                 pipeR, pipeW := io.Pipe()
960                 go func() {
961                         errs <- jobProcess(
962                                 ctx,
963                                 pipeR,
964                                 pktName,
965                                 les,
966                                 sender,
967                                 job.PktEnc.Nice,
968                                 uint64(pktSizeWithoutEnc(job.Size)),
969                                 job.Path,
970                                 decompressor,
971                                 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
972                         )
973                 }()
974                 pipeWB := bufio.NewWriter(pipeW)
975                 sharedKey, _, _, err = PktEncRead(
976                         ctx.Self,
977                         ctx.Neigh,
978                         bufio.NewReader(fd),
979                         pipeWB,
980                         sharedKey == nil,
981                         sharedKey,
982                 )
983                 if err != nil {
984                         pipeW.CloseWithError(err)
985                 }
986                 if err := pipeWB.Flush(); err != nil {
987                         pipeW.CloseWithError(err)
988                 }
989                 pipeW.Close()
990
991                 if err != nil {
992                         isBad = true
993                         fd.Close()
994                         <-errs
995                         continue
996                 }
997                 if err = <-errs; err == JobRepeatProcess {
998                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
999                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1000                                         return fmt.Sprintf(
1001                                                 "Tossing %s/%s: can not seek",
1002                                                 ctx.NodeName(job.PktEnc.Sender),
1003                                                 pktName,
1004                                         )
1005                                 })
1006                                 isBad = true
1007                                 break
1008                         }
1009                         goto Retry
1010                 } else if err != nil {
1011                         isBad = true
1012                 }
1013                 fd.Close()
1014         }
1015         return isBad
1016 }
1017
1018 func (ctx *Ctx) AutoToss(
1019         nodeId *NodeId,
1020         nice uint8,
1021         doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
1022 ) (chan struct{}, chan bool) {
1023         finish := make(chan struct{})
1024         badCode := make(chan bool)
1025         go func() {
1026                 bad := false
1027                 for {
1028                         select {
1029                         case <-finish:
1030                                 badCode <- bad
1031                                 break
1032                         default:
1033                         }
1034                         time.Sleep(time.Second)
1035                         bad = !ctx.Toss(
1036                                 nodeId, TRx, nice, false,
1037                                 doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
1038                 }
1039         }()
1040         return finish, badCode
1041 }