]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Multicast areas
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenSuffix = ".seen"
47 )
48
49 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
50         lines := []string{
51                 "From: " + fromTo.From,
52                 "To: " + fromTo.To,
53                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
54         }
55         if len(body) > 0 {
56                 lines = append(
57                         lines,
58                         "MIME-Version: 1.0",
59                         "Content-Type: text/plain; charset=utf-8",
60                         "Content-Transfer-Encoding: base64",
61                         "",
62                         base64.StdEncoding.EncodeToString(body),
63                 )
64         }
65         return strings.NewReader(strings.Join(lines, "\n"))
66 }
67
68 func pktSizeWithoutEnc(pktSize int64) int64 {
69         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
70         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
71         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
72                 pktSize -= poly1305.TagSize
73         }
74         pktSize -= pktSizeBlocks * poly1305.TagSize
75         return pktSize
76 }
77
78 var JobRepeatProcess = errors.New("needs processing repeat")
79
80 func jobProcess(
81         ctx *Ctx,
82         pipeR *io.PipeReader,
83         pktName string,
84         les LEs,
85         sender *Node,
86         nice uint8,
87         pktSize uint64,
88         jobPath string,
89         decompressor *zstd.Decoder,
90         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
91 ) error {
92         defer pipeR.Close()
93         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
94         var pkt Pkt
95         _, err := xdr.Unmarshal(pipeR, &pkt)
96         if err != nil {
97                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
98                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
99                 })
100                 return err
101         }
102         les = append(les, LE{"Size", int64(pktSize)})
103         ctx.LogD("rx", les, func(les LEs) string {
104                 return fmt.Sprintf(
105                         "Tossing %s/%s (%s)",
106                         sender.Name, pktName,
107                         humanize.IBytes(pktSize),
108                 )
109         })
110         switch pkt.Type {
111         case PktTypeExec, PktTypeExecFat:
112                 if noExec {
113                         return nil
114                 }
115                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
116                 handle := string(path[0])
117                 args := make([]string, 0, len(path)-1)
118                 for _, p := range path[1:] {
119                         args = append(args, string(p))
120                 }
121                 argsStr := strings.Join(append([]string{handle}, args...), " ")
122                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
123                 cmdline, exists := sender.Exec[handle]
124                 if !exists || len(cmdline) == 0 {
125                         err = errors.New("No handle found")
126                         ctx.LogE(
127                                 "rx-no-handle", les, err,
128                                 func(les LEs) string {
129                                         return fmt.Sprintf(
130                                                 "Tossing exec %s/%s (%s): %s",
131                                                 sender.Name, pktName,
132                                                 humanize.IBytes(pktSize), argsStr,
133                                         )
134                                 },
135                         )
136                         return err
137                 }
138                 if pkt.Type == PktTypeExec {
139                         if err = decompressor.Reset(pipeR); err != nil {
140                                 log.Fatalln(err)
141                         }
142                 }
143                 if !dryRun {
144                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
145                         cmd.Env = append(
146                                 cmd.Env,
147                                 "NNCP_SELF="+ctx.Self.Id.String(),
148                                 "NNCP_SENDER="+sender.Id.String(),
149                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
150                         )
151                         if pkt.Type == PktTypeExec {
152                                 cmd.Stdin = decompressor
153                         } else {
154                                 cmd.Stdin = pipeR
155                         }
156                         output, err := cmd.Output()
157                         if err != nil {
158                                 ctx.LogE("rx-hande", les, err, func(les LEs) string {
159                                         return fmt.Sprintf(
160                                                 "Tossing exec %s/%s (%s): %s: handling",
161                                                 sender.Name, pktName,
162                                                 humanize.IBytes(uint64(pktSize)), argsStr,
163                                         )
164                                 })
165                                 return err
166                         }
167                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
168                                 notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
169                                 if !exists {
170                                         notify, exists = ctx.NotifyExec["*."+handle]
171                                 }
172                                 if exists {
173                                         cmd := exec.Command(
174                                                 sendmail[0],
175                                                 append(sendmail[1:], notify.To)...,
176                                         )
177                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
178                                                 "Exec from %s: %s", sender.Name, argsStr,
179                                         ), output)
180                                         if err = cmd.Run(); err != nil {
181                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
182                                                         return fmt.Sprintf(
183                                                                 "Tossing exec %s/%s (%s): %s: notifying",
184                                                                 sender.Name, pktName,
185                                                                 humanize.IBytes(pktSize), argsStr,
186                                                         )
187                                                 })
188                                         }
189                                 }
190                         }
191                 }
192                 ctx.LogI("rx", les, func(les LEs) string {
193                         return fmt.Sprintf(
194                                 "Got exec from %s to %s (%s)",
195                                 sender.Name, argsStr,
196                                 humanize.IBytes(pktSize),
197                         )
198                 })
199                 if !dryRun && jobPath != "" {
200                         if doSeen {
201                                 if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
202                                         fd.Close() // #nosec G104
203                                 }
204                         }
205                         if err = os.Remove(jobPath); err != nil {
206                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
207                                         return fmt.Sprintf(
208                                                 "Tossing exec %s/%s (%s): %s: notifying",
209                                                 sender.Name, pktName,
210                                                 humanize.IBytes(pktSize), argsStr,
211                                         )
212                                 })
213                                 return err
214                         } else if ctx.HdrUsage {
215                                 os.Remove(jobPath + HdrSuffix)
216                         }
217                 }
218
219         case PktTypeFile:
220                 if noFile {
221                         return nil
222                 }
223                 dst := string(pkt.Path[:int(pkt.PathLen)])
224                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
225                 if filepath.IsAbs(dst) {
226                         err = errors.New("non-relative destination path")
227                         ctx.LogE(
228                                 "rx-non-rel", les, err,
229                                 func(les LEs) string {
230                                         return fmt.Sprintf(
231                                                 "Tossing file %s/%s (%s): %s",
232                                                 sender.Name, pktName,
233                                                 humanize.IBytes(pktSize), dst,
234                                         )
235                                 },
236                         )
237                         return err
238                 }
239                 incoming := sender.Incoming
240                 if incoming == nil {
241                         err = errors.New("incoming is not allowed")
242                         ctx.LogE(
243                                 "rx-no-incoming", les, err,
244                                 func(les LEs) string {
245                                         return fmt.Sprintf(
246                                                 "Tossing file %s/%s (%s): %s",
247                                                 sender.Name, pktName,
248                                                 humanize.IBytes(pktSize), dst,
249                                         )
250                                 },
251                         )
252                         return err
253                 }
254                 dir := filepath.Join(*incoming, path.Dir(dst))
255                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
256                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
257                                 return fmt.Sprintf(
258                                         "Tossing file %s/%s (%s): %s: mkdir",
259                                         sender.Name, pktName,
260                                         humanize.IBytes(pktSize), dst,
261                                 )
262                         })
263                         return err
264                 }
265                 if !dryRun {
266                         tmp, err := TempFile(dir, "file")
267                         if err != nil {
268                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
269                                         return fmt.Sprintf(
270                                                 "Tossing file %s/%s (%s): %s: mktemp",
271                                                 sender.Name, pktName,
272                                                 humanize.IBytes(pktSize), dst,
273                                         )
274                                 })
275                                 return err
276                         }
277                         les = append(les, LE{"Tmp", tmp.Name()})
278                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
279                                 return fmt.Sprintf(
280                                         "Tossing file %s/%s (%s): %s: created: %s",
281                                         sender.Name, pktName,
282                                         humanize.IBytes(pktSize), dst, tmp.Name(),
283                                 )
284                         })
285                         bufW := bufio.NewWriter(tmp)
286                         if _, err = CopyProgressed(
287                                 bufW, pipeR, "Rx file",
288                                 append(les, LE{"FullSize", int64(pktSize)}),
289                                 ctx.ShowPrgrs,
290                         ); err != nil {
291                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
292                                         return fmt.Sprintf(
293                                                 "Tossing file %s/%s (%s): %s: copying",
294                                                 sender.Name, pktName,
295                                                 humanize.IBytes(pktSize), dst,
296                                         )
297                                 })
298                                 return err
299                         }
300                         if err = bufW.Flush(); err != nil {
301                                 tmp.Close() // #nosec G104
302                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
303                                         return fmt.Sprintf(
304                                                 "Tossing file %s/%s (%s): %s: flushing",
305                                                 sender.Name, pktName,
306                                                 humanize.IBytes(pktSize), dst,
307                                         )
308                                 })
309                                 return err
310                         }
311                         if err = tmp.Sync(); err != nil {
312                                 tmp.Close() // #nosec G104
313                                 ctx.LogE("rx-sync", les, err, func(les LEs) string {
314                                         return fmt.Sprintf(
315                                                 "Tossing file %s/%s (%s): %s: syncing",
316                                                 sender.Name, pktName,
317                                                 humanize.IBytes(pktSize), dst,
318                                         )
319                                 })
320                                 return err
321                         }
322                         if err = tmp.Close(); err != nil {
323                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
324                                         return fmt.Sprintf(
325                                                 "Tossing file %s/%s (%s): %s: closing",
326                                                 sender.Name, pktName,
327                                                 humanize.IBytes(pktSize), dst,
328                                         )
329                                 })
330                                 return err
331                         }
332                         dstPathOrig := filepath.Join(*incoming, dst)
333                         dstPath := dstPathOrig
334                         dstPathCtr := 0
335                         for {
336                                 if _, err = os.Stat(dstPath); err != nil {
337                                         if os.IsNotExist(err) {
338                                                 break
339                                         }
340                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
341                                                 return fmt.Sprintf(
342                                                         "Tossing file %s/%s (%s): %s: stating: %s",
343                                                         sender.Name, pktName,
344                                                         humanize.IBytes(pktSize), dst, dstPath,
345                                                 )
346                                         })
347                                         return err
348                                 }
349                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
350                                 dstPathCtr++
351                         }
352                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
353                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
354                                         return fmt.Sprintf(
355                                                 "Tossing file %s/%s (%s): %s: renaming",
356                                                 sender.Name, pktName,
357                                                 humanize.IBytes(pktSize), dst,
358                                         )
359                                 })
360                                 return err
361                         }
362                         if err = DirSync(*incoming); err != nil {
363                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
364                                         return fmt.Sprintf(
365                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
366                                                 sender.Name, pktName,
367                                                 humanize.IBytes(pktSize), dst,
368                                         )
369                                 })
370                                 return err
371                         }
372                         les = les[:len(les)-1] // delete Tmp
373                 }
374                 ctx.LogI("rx", les, func(les LEs) string {
375                         return fmt.Sprintf(
376                                 "Got file %s (%s) from %s",
377                                 dst, humanize.IBytes(pktSize), sender.Name,
378                         )
379                 })
380                 if !dryRun {
381                         if jobPath != "" {
382                                 if doSeen {
383                                         if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
384                                                 fd.Close() // #nosec G104
385                                         }
386                                 }
387                                 if err = os.Remove(jobPath); err != nil {
388                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
389                                                 return fmt.Sprintf(
390                                                         "Tossing file %s/%s (%s): %s: removing",
391                                                         sender.Name, pktName,
392                                                         humanize.IBytes(pktSize), dst,
393                                                 )
394                                         })
395                                         return err
396                                 } else if ctx.HdrUsage {
397                                         os.Remove(jobPath + HdrSuffix)
398                                 }
399                         }
400                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
401                                 cmd := exec.Command(
402                                         sendmail[0],
403                                         append(sendmail[1:], ctx.NotifyFile.To)...,
404                                 )
405                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
406                                         "File from %s: %s (%s)",
407                                         sender.Name, dst, humanize.IBytes(pktSize),
408                                 ), nil)
409                                 if err = cmd.Run(); err != nil {
410                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
411                                                 return fmt.Sprintf(
412                                                         "Tossing file %s/%s (%s): %s: notifying",
413                                                         sender.Name, pktName,
414                                                         humanize.IBytes(pktSize), dst,
415                                                 )
416                                         })
417                                 }
418                         }
419                 }
420
421         case PktTypeFreq:
422                 if noFreq {
423                         return nil
424                 }
425                 src := string(pkt.Path[:int(pkt.PathLen)])
426                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
427                 if filepath.IsAbs(src) {
428                         err = errors.New("non-relative source path")
429                         ctx.LogE(
430                                 "rx-non-rel", les, err,
431                                 func(les LEs) string {
432                                         return fmt.Sprintf(
433                                                 "Tossing freq %s/%s (%s): %s: notifying",
434                                                 sender.Name, pktName,
435                                                 humanize.IBytes(pktSize), src,
436                                         )
437                                 },
438                         )
439                         return err
440                 }
441                 dstRaw, err := ioutil.ReadAll(pipeR)
442                 if err != nil {
443                         ctx.LogE("rx-read", les, err, func(les LEs) string {
444                                 return fmt.Sprintf(
445                                         "Tossing freq %s/%s (%s): %s: reading",
446                                         sender.Name, pktName,
447                                         humanize.IBytes(pktSize), src,
448                                 )
449                         })
450                         return err
451                 }
452                 dst := string(dstRaw)
453                 les = append(les, LE{"Dst", dst})
454                 freqPath := sender.FreqPath
455                 if freqPath == nil {
456                         err = errors.New("freqing is not allowed")
457                         ctx.LogE(
458                                 "rx-no-freq", les, err,
459                                 func(les LEs) string {
460                                         return fmt.Sprintf(
461                                                 "Tossing freq %s/%s (%s): %s -> %s",
462                                                 sender.Name, pktName,
463                                                 humanize.IBytes(pktSize), src, dst,
464                                         )
465                                 },
466                         )
467                         return err
468                 }
469                 if !dryRun {
470                         err = ctx.TxFile(
471                                 sender,
472                                 pkt.Nice,
473                                 filepath.Join(*freqPath, src),
474                                 dst,
475                                 sender.FreqChunked,
476                                 sender.FreqMinSize,
477                                 sender.FreqMaxSize,
478                                 nil,
479                         )
480                         if err != nil {
481                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
482                                         return fmt.Sprintf(
483                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
484                                                 sender.Name, pktName,
485                                                 humanize.IBytes(pktSize), src, dst,
486                                         )
487                                 })
488                                 return err
489                         }
490                 }
491                 ctx.LogI("rx", les, func(les LEs) string {
492                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
493                 })
494                 if !dryRun {
495                         if jobPath != "" {
496                                 if doSeen {
497                                         if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
498                                                 fd.Close() // #nosec G104
499                                         }
500                                 }
501                                 if err = os.Remove(jobPath); err != nil {
502                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
503                                                 return fmt.Sprintf(
504                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
505                                                         sender.Name, pktName,
506                                                         humanize.IBytes(pktSize), src, dst,
507                                                 )
508                                         })
509                                         return err
510                                 } else if ctx.HdrUsage {
511                                         os.Remove(jobPath + HdrSuffix)
512                                 }
513                         }
514                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
515                                 cmd := exec.Command(
516                                         sendmail[0],
517                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
518                                 )
519                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
520                                         "Freq from %s: %s", sender.Name, src,
521                                 ), nil)
522                                 if err = cmd.Run(); err != nil {
523                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
524                                                 return fmt.Sprintf(
525                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
526                                                         sender.Name, pktName,
527                                                         humanize.IBytes(pktSize), src, dst,
528                                                 )
529                                         })
530                                 }
531                         }
532                 }
533
534         case PktTypeTrns:
535                 if noTrns {
536                         return nil
537                 }
538                 dst := new([MTHSize]byte)
539                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
540                 nodeId := NodeId(*dst)
541                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
542                 logMsg := func(les LEs) string {
543                         return fmt.Sprintf(
544                                 "Tossing trns %s/%s (%s): %s",
545                                 sender.Name, pktName,
546                                 humanize.IBytes(pktSize),
547                                 nodeId.String(),
548                         )
549                 }
550                 node := ctx.Neigh[nodeId]
551                 if node == nil {
552                         err = errors.New("unknown node")
553                         ctx.LogE("rx-unknown", les, err, logMsg)
554                         return err
555                 }
556                 ctx.LogD("rx-tx", les, logMsg)
557                 if !dryRun {
558                         if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
559                                 ctx.LogE("rx", les, err, func(les LEs) string {
560                                         return logMsg(les) + ": txing"
561                                 })
562                                 return err
563                         }
564                 }
565                 ctx.LogI("rx", les, func(les LEs) string {
566                         return fmt.Sprintf(
567                                 "Got transitional packet from %s to %s (%s)",
568                                 sender.Name,
569                                 ctx.NodeName(&nodeId),
570                                 humanize.IBytes(pktSize),
571                         )
572                 })
573                 if !dryRun && jobPath != "" {
574                         if doSeen {
575                                 if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
576                                         fd.Close() // #nosec G104
577                                 }
578                         }
579                         if err = os.Remove(jobPath); err != nil {
580                                 ctx.LogE("rx", les, err, func(les LEs) string {
581                                         return fmt.Sprintf(
582                                                 "Tossing trns %s/%s (%s): %s: removing",
583                                                 sender.Name, pktName,
584                                                 humanize.IBytes(pktSize),
585                                                 ctx.NodeName(&nodeId),
586                                         )
587                                 })
588                                 return err
589                         } else if ctx.HdrUsage {
590                                 os.Remove(jobPath + HdrSuffix)
591                         }
592                 }
593
594         case PktTypeArea:
595                 if noArea {
596                         return nil
597                 }
598                 areaId := new(AreaId)
599                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
600                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
601                 logMsg := func(les LEs) string {
602                         return fmt.Sprintf(
603                                 "Tossing %s/%s (%s): area %s",
604                                 sender.Name, pktName,
605                                 humanize.IBytes(pktSize),
606                                 ctx.AreaName(areaId),
607                         )
608                 }
609                 area := ctx.AreaId2Area[*areaId]
610                 if area == nil {
611                         err = errors.New("unknown area")
612                         ctx.LogE("rx-area-unknown", les, err, logMsg)
613                         return err
614                 }
615                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
616                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
617                 if err != nil {
618                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
619                         return err
620                 }
621                 msgHashRaw := blake2b.Sum256(pktEncRaw)
622                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
623                 les = append(les, LE{"AreaMsg", msgHash})
624                 ctx.LogD("rx-area", les, logMsg)
625
626                 if dryRun {
627                         for _, nodeId := range area.Subs {
628                                 node := ctx.Neigh[*nodeId]
629                                 lesEcho := append(les, LE{"Echo", nodeId})
630                                 seenDir := filepath.Join(
631                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
632                                 )
633                                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
634                                 logMsgNode := func(les LEs) string {
635                                         return fmt.Sprintf(
636                                                 "%s: echoing to: %s", logMsg(les), node.Name,
637                                         )
638                                 }
639                                 if _, err := os.Stat(seenPath); err == nil {
640                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
641                                                 return logMsgNode(les) + ": already sent"
642                                         })
643                                         continue
644                                 }
645                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
646                         }
647                 } else {
648                         for _, nodeId := range area.Subs {
649                                 node := ctx.Neigh[*nodeId]
650                                 lesEcho := append(les, LE{"Echo", nodeId})
651                                 seenDir := filepath.Join(
652                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
653                                 )
654                                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
655                                 logMsgNode := func(les LEs) string {
656                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
657                                 }
658                                 if _, err := os.Stat(seenPath); err == nil {
659                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
660                                                 return logMsgNode(les) + ": already sent"
661                                         })
662                                         continue
663                                 }
664                                 if nodeId != sender.Id {
665                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
666                                         if _, err = ctx.Tx(
667                                                 node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil,
668                                         ); err != nil {
669                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
670                                                 return err
671                                         }
672                                 }
673                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
674                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
675                                         return err
676                                 }
677                                 if fd, err := os.Create(seenPath); err == nil {
678                                         fd.Close()
679                                         if err = DirSync(seenDir); err != nil {
680                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
681                                                 return err
682                                         }
683                                 } else {
684                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
685                                         return err
686                                 }
687                                 return JobRepeatProcess
688                         }
689                 }
690
691                 seenDir := filepath.Join(
692                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
693                 )
694                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
695                 if _, err := os.Stat(seenPath); err == nil {
696                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
697                                 return logMsg(les) + ": already seen"
698                         })
699                         if !dryRun && jobPath != "" {
700                                 if err = os.Remove(jobPath); err != nil {
701                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
702                                                 return fmt.Sprintf(
703                                                         "Tossing area %s/%s (%s): %s: removing",
704                                                         sender.Name, pktName,
705                                                         humanize.IBytes(pktSize),
706                                                         msgHash,
707                                                 )
708                                         })
709                                         return err
710                                 } else if ctx.HdrUsage {
711                                         os.Remove(jobPath + HdrSuffix)
712                                 }
713                         }
714                         return nil
715                 }
716
717                 if area.Prv == nil {
718                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
719                                 return logMsg(les) + ": no private key for decoding"
720                         })
721                 } else {
722                         signatureVerify := true
723                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
724                                 if !area.AllowUnknown {
725                                         err = errors.New("unknown sender")
726                                         ctx.LogE(
727                                                 "rx-area-unknown",
728                                                 append(les, LE{"Sender", pktEnc.Sender}),
729                                                 err,
730                                                 func(les LEs) string {
731                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
732                                                 },
733                                         )
734                                         return err
735                                 }
736                                 signatureVerify = false
737                         }
738                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
739                         copy(areaNodeOur.Id[:], area.Id[:])
740                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
741                         areaNode := Node{
742                                 Id:       new(NodeId),
743                                 Name:     area.Name,
744                                 Incoming: area.Incoming,
745                                 Exec:     area.Exec,
746                         }
747                         copy(areaNode.Id[:], area.Id[:])
748                         pktName := fmt.Sprintf(
749                                 "area/%s/%s",
750                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
751                         )
752
753                         pipeR, pipeW := io.Pipe()
754                         errs := make(chan error, 1)
755                         go func() {
756                                 errs <- jobProcess(
757                                         ctx,
758                                         pipeR,
759                                         pktName,
760                                         les,
761                                         &areaNode,
762                                         nice,
763                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
764                                         "",
765                                         decompressor,
766                                         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
767                                 )
768                         }()
769                         _, _, _, err = PktEncRead(
770                                 &areaNodeOur,
771                                 ctx.Neigh,
772                                 fullPipeR,
773                                 pipeW,
774                                 signatureVerify,
775                                 nil,
776                         )
777                         if err != nil {
778                                 pipeW.CloseWithError(err)
779                                 go func() { <-errs }()
780                                 return err
781                         }
782                         pipeW.Close()
783                         if err = <-errs; err != nil {
784                                 return err
785                         }
786                 }
787
788                 if !dryRun && jobPath != "" {
789                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
790                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
791                                 return err
792                         }
793                         if fd, err := os.Create(seenPath); err == nil {
794                                 fd.Close()
795                                 if err = DirSync(seenDir); err != nil {
796                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
797                                         return err
798                                 }
799                         }
800                         if err = os.Remove(jobPath); err != nil {
801                                 ctx.LogE("rx", les, err, func(les LEs) string {
802                                         return fmt.Sprintf(
803                                                 "Tossing area %s/%s (%s): %s: removing",
804                                                 sender.Name, pktName,
805                                                 humanize.IBytes(pktSize),
806                                                 msgHash,
807                                         )
808                                 })
809                                 return err
810                         } else if ctx.HdrUsage {
811                                 os.Remove(jobPath + HdrSuffix)
812                         }
813                 }
814
815         default:
816                 err = errors.New("unknown type")
817                 ctx.LogE(
818                         "rx-type-unknown", les, err,
819                         func(les LEs) string {
820                                 return fmt.Sprintf(
821                                         "Tossing %s/%s (%s)",
822                                         sender.Name, pktName, humanize.IBytes(pktSize),
823                                 )
824                         },
825                 )
826                 return err
827         }
828         return nil
829 }
830
831 func (ctx *Ctx) Toss(
832         nodeId *NodeId,
833         xx TRxTx,
834         nice uint8,
835         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
836 ) bool {
837         dirLock, err := ctx.LockDir(nodeId, "toss")
838         if err != nil {
839                 return false
840         }
841         defer ctx.UnlockDir(dirLock)
842         isBad := false
843         decompressor, err := zstd.NewReader(nil)
844         if err != nil {
845                 panic(err)
846         }
847         defer decompressor.Close()
848         for job := range ctx.Jobs(nodeId, xx) {
849                 pktName := filepath.Base(job.Path)
850                 les := LEs{
851                         {"Node", job.PktEnc.Sender},
852                         {"Pkt", pktName},
853                         {"Nice", int(job.PktEnc.Nice)},
854                 }
855                 if job.PktEnc.Nice > nice {
856                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
857                                 return fmt.Sprintf(
858                                         "Tossing %s/%s: too nice: %s",
859                                         ctx.NodeName(job.PktEnc.Sender), pktName,
860                                         NicenessFmt(job.PktEnc.Nice),
861                                 )
862                         })
863                         continue
864                 }
865                 fd, err := os.Open(job.Path)
866                 if err != nil {
867                         ctx.LogE("rx-open", les, err, func(les LEs) string {
868                                 return fmt.Sprintf(
869                                         "Tossing %s/%s: opening %s",
870                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
871                                 )
872                         })
873                         isBad = true
874                         continue
875                 }
876                 errs := make(chan error, 1)
877                 var sharedKey []byte
878         Retry:
879                 pipeR, pipeW := io.Pipe()
880                 go func() {
881                         errs <- jobProcess(
882                                 ctx,
883                                 pipeR,
884                                 pktName,
885                                 les,
886                                 ctx.Neigh[*job.PktEnc.Sender],
887                                 job.PktEnc.Nice,
888                                 uint64(pktSizeWithoutEnc(job.Size)),
889                                 job.Path,
890                                 decompressor,
891                                 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
892                         )
893                 }()
894                 pipeWB := bufio.NewWriter(pipeW)
895                 sharedKey, _, _, err = PktEncRead(
896                         ctx.Self,
897                         ctx.Neigh,
898                         bufio.NewReader(fd),
899                         pipeWB,
900                         sharedKey == nil,
901                         sharedKey,
902                 )
903                 if err != nil {
904                         pipeW.CloseWithError(err)
905                 }
906                 if err := pipeWB.Flush(); err != nil {
907                         pipeW.CloseWithError(err)
908                 }
909                 pipeW.Close()
910
911                 if err != nil {
912                         isBad = true
913                         fd.Close()
914                         go func() { <-errs }()
915                         continue
916                 }
917                 if err = <-errs; err == JobRepeatProcess {
918                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
919                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
920                                         return fmt.Sprintf(
921                                                 "Tossing %s/%s: can not seek",
922                                                 ctx.NodeName(job.PktEnc.Sender),
923                                                 pktName,
924                                         )
925                                 })
926                                 isBad = true
927                                 break
928                         }
929                         goto Retry
930                 } else if err != nil {
931                         isBad = true
932                 }
933                 fd.Close()
934         }
935         return isBad
936 }
937
938 func (ctx *Ctx) AutoToss(
939         nodeId *NodeId,
940         nice uint8,
941         doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
942 ) (chan struct{}, chan bool) {
943         finish := make(chan struct{})
944         badCode := make(chan bool)
945         go func() {
946                 bad := false
947                 for {
948                         select {
949                         case <-finish:
950                                 badCode <- bad
951                                 break
952                         default:
953                         }
954                         time.Sleep(time.Second)
955                         bad = !ctx.Toss(
956                                 nodeId, TRx, nice, false,
957                                 doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
958                 }
959         }()
960         return finish, badCode
961 }