]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Fix directories path fsync
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenSuffix = ".seen"
47 )
48
49 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
50         lines := []string{
51                 "From: " + fromTo.From,
52                 "To: " + fromTo.To,
53                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
54         }
55         if len(body) > 0 {
56                 lines = append(
57                         lines,
58                         "MIME-Version: 1.0",
59                         "Content-Type: text/plain; charset=utf-8",
60                         "Content-Transfer-Encoding: base64",
61                         "",
62                         base64.StdEncoding.EncodeToString(body),
63                 )
64         }
65         return strings.NewReader(strings.Join(lines, "\n"))
66 }
67
68 func pktSizeWithoutEnc(pktSize int64) int64 {
69         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
70         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
71         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
72                 pktSize -= poly1305.TagSize
73         }
74         pktSize -= pktSizeBlocks * poly1305.TagSize
75         return pktSize
76 }
77
78 var JobRepeatProcess = errors.New("needs processing repeat")
79
80 func jobProcess(
81         ctx *Ctx,
82         pipeR *io.PipeReader,
83         pktName string,
84         les LEs,
85         sender *Node,
86         nice uint8,
87         pktSize uint64,
88         jobPath string,
89         decompressor *zstd.Decoder,
90         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
91 ) error {
92         defer pipeR.Close()
93         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
94         var pkt Pkt
95         _, err := xdr.Unmarshal(pipeR, &pkt)
96         if err != nil {
97                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
98                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
99                 })
100                 return err
101         }
102         les = append(les, LE{"Size", int64(pktSize)})
103         ctx.LogD("rx", les, func(les LEs) string {
104                 return fmt.Sprintf(
105                         "Tossing %s/%s (%s)",
106                         sender.Name, pktName,
107                         humanize.IBytes(pktSize),
108                 )
109         })
110         switch pkt.Type {
111         case PktTypeExec, PktTypeExecFat:
112                 if noExec {
113                         return nil
114                 }
115                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
116                 handle := string(path[0])
117                 args := make([]string, 0, len(path)-1)
118                 for _, p := range path[1:] {
119                         args = append(args, string(p))
120                 }
121                 argsStr := strings.Join(append([]string{handle}, args...), " ")
122                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
123                 cmdline, exists := sender.Exec[handle]
124                 if !exists || len(cmdline) == 0 {
125                         err = errors.New("No handle found")
126                         ctx.LogE(
127                                 "rx-no-handle", les, err,
128                                 func(les LEs) string {
129                                         return fmt.Sprintf(
130                                                 "Tossing exec %s/%s (%s): %s",
131                                                 sender.Name, pktName,
132                                                 humanize.IBytes(pktSize), argsStr,
133                                         )
134                                 },
135                         )
136                         return err
137                 }
138                 if pkt.Type == PktTypeExec {
139                         if err = decompressor.Reset(pipeR); err != nil {
140                                 log.Fatalln(err)
141                         }
142                 }
143                 if !dryRun {
144                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
145                         cmd.Env = append(
146                                 cmd.Env,
147                                 "NNCP_SELF="+ctx.Self.Id.String(),
148                                 "NNCP_SENDER="+sender.Id.String(),
149                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
150                         )
151                         if pkt.Type == PktTypeExec {
152                                 cmd.Stdin = decompressor
153                         } else {
154                                 cmd.Stdin = pipeR
155                         }
156                         output, err := cmd.Output()
157                         if err != nil {
158                                 ctx.LogE("rx-hande", les, err, func(les LEs) string {
159                                         return fmt.Sprintf(
160                                                 "Tossing exec %s/%s (%s): %s: handling",
161                                                 sender.Name, pktName,
162                                                 humanize.IBytes(uint64(pktSize)), argsStr,
163                                         )
164                                 })
165                                 return err
166                         }
167                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
168                                 notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
169                                 if !exists {
170                                         notify, exists = ctx.NotifyExec["*."+handle]
171                                 }
172                                 if exists {
173                                         cmd := exec.Command(
174                                                 sendmail[0],
175                                                 append(sendmail[1:], notify.To)...,
176                                         )
177                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
178                                                 "Exec from %s: %s", sender.Name, argsStr,
179                                         ), output)
180                                         if err = cmd.Run(); err != nil {
181                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
182                                                         return fmt.Sprintf(
183                                                                 "Tossing exec %s/%s (%s): %s: notifying",
184                                                                 sender.Name, pktName,
185                                                                 humanize.IBytes(pktSize), argsStr,
186                                                         )
187                                                 })
188                                         }
189                                 }
190                         }
191                 }
192                 ctx.LogI("rx", les, func(les LEs) string {
193                         return fmt.Sprintf(
194                                 "Got exec from %s to %s (%s)",
195                                 sender.Name, argsStr,
196                                 humanize.IBytes(pktSize),
197                         )
198                 })
199                 if !dryRun && jobPath != "" {
200                         if doSeen {
201                                 if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
202                                         fd.Close()
203                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
204                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
205                                                         return fmt.Sprintf(
206                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
207                                                                 sender.Name, pktName,
208                                                                 humanize.IBytes(pktSize),
209                                                                 filepath.Base(jobPath),
210                                                         )
211                                                 })
212                                                 return err
213                                         }
214                                 }
215                         }
216                         if err = os.Remove(jobPath); err != nil {
217                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
218                                         return fmt.Sprintf(
219                                                 "Tossing exec %s/%s (%s): %s: notifying",
220                                                 sender.Name, pktName,
221                                                 humanize.IBytes(pktSize), argsStr,
222                                         )
223                                 })
224                                 return err
225                         } else if ctx.HdrUsage {
226                                 os.Remove(jobPath + HdrSuffix)
227                         }
228                 }
229
230         case PktTypeFile:
231                 if noFile {
232                         return nil
233                 }
234                 dst := string(pkt.Path[:int(pkt.PathLen)])
235                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
236                 if filepath.IsAbs(dst) {
237                         err = errors.New("non-relative destination path")
238                         ctx.LogE(
239                                 "rx-non-rel", les, err,
240                                 func(les LEs) string {
241                                         return fmt.Sprintf(
242                                                 "Tossing file %s/%s (%s): %s",
243                                                 sender.Name, pktName,
244                                                 humanize.IBytes(pktSize), dst,
245                                         )
246                                 },
247                         )
248                         return err
249                 }
250                 incoming := sender.Incoming
251                 if incoming == nil {
252                         err = errors.New("incoming is not allowed")
253                         ctx.LogE(
254                                 "rx-no-incoming", les, err,
255                                 func(les LEs) string {
256                                         return fmt.Sprintf(
257                                                 "Tossing file %s/%s (%s): %s",
258                                                 sender.Name, pktName,
259                                                 humanize.IBytes(pktSize), dst,
260                                         )
261                                 },
262                         )
263                         return err
264                 }
265                 dir := filepath.Join(*incoming, path.Dir(dst))
266                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
267                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
268                                 return fmt.Sprintf(
269                                         "Tossing file %s/%s (%s): %s: mkdir",
270                                         sender.Name, pktName,
271                                         humanize.IBytes(pktSize), dst,
272                                 )
273                         })
274                         return err
275                 }
276                 if !dryRun {
277                         tmp, err := TempFile(dir, "file")
278                         if err != nil {
279                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
280                                         return fmt.Sprintf(
281                                                 "Tossing file %s/%s (%s): %s: mktemp",
282                                                 sender.Name, pktName,
283                                                 humanize.IBytes(pktSize), dst,
284                                         )
285                                 })
286                                 return err
287                         }
288                         les = append(les, LE{"Tmp", tmp.Name()})
289                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
290                                 return fmt.Sprintf(
291                                         "Tossing file %s/%s (%s): %s: created: %s",
292                                         sender.Name, pktName,
293                                         humanize.IBytes(pktSize), dst, tmp.Name(),
294                                 )
295                         })
296                         bufW := bufio.NewWriter(tmp)
297                         if _, err = CopyProgressed(
298                                 bufW, pipeR, "Rx file",
299                                 append(les, LE{"FullSize", int64(pktSize)}),
300                                 ctx.ShowPrgrs,
301                         ); err != nil {
302                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
303                                         return fmt.Sprintf(
304                                                 "Tossing file %s/%s (%s): %s: copying",
305                                                 sender.Name, pktName,
306                                                 humanize.IBytes(pktSize), dst,
307                                         )
308                                 })
309                                 return err
310                         }
311                         if err = bufW.Flush(); err != nil {
312                                 tmp.Close() // #nosec G104
313                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
314                                         return fmt.Sprintf(
315                                                 "Tossing file %s/%s (%s): %s: flushing",
316                                                 sender.Name, pktName,
317                                                 humanize.IBytes(pktSize), dst,
318                                         )
319                                 })
320                                 return err
321                         }
322                         if err = tmp.Sync(); err != nil {
323                                 tmp.Close() // #nosec G104
324                                 ctx.LogE("rx-sync", les, err, func(les LEs) string {
325                                         return fmt.Sprintf(
326                                                 "Tossing file %s/%s (%s): %s: syncing",
327                                                 sender.Name, pktName,
328                                                 humanize.IBytes(pktSize), dst,
329                                         )
330                                 })
331                                 return err
332                         }
333                         if err = tmp.Close(); err != nil {
334                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
335                                         return fmt.Sprintf(
336                                                 "Tossing file %s/%s (%s): %s: closing",
337                                                 sender.Name, pktName,
338                                                 humanize.IBytes(pktSize), dst,
339                                         )
340                                 })
341                                 return err
342                         }
343                         dstPathOrig := filepath.Join(*incoming, dst)
344                         dstPath := dstPathOrig
345                         dstPathCtr := 0
346                         for {
347                                 if _, err = os.Stat(dstPath); err != nil {
348                                         if os.IsNotExist(err) {
349                                                 break
350                                         }
351                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
352                                                 return fmt.Sprintf(
353                                                         "Tossing file %s/%s (%s): %s: stating: %s",
354                                                         sender.Name, pktName,
355                                                         humanize.IBytes(pktSize), dst, dstPath,
356                                                 )
357                                         })
358                                         return err
359                                 }
360                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
361                                 dstPathCtr++
362                         }
363                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
364                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
365                                         return fmt.Sprintf(
366                                                 "Tossing file %s/%s (%s): %s: renaming",
367                                                 sender.Name, pktName,
368                                                 humanize.IBytes(pktSize), dst,
369                                         )
370                                 })
371                                 return err
372                         }
373                         if err = DirSync(*incoming); err != nil {
374                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
375                                         return fmt.Sprintf(
376                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
377                                                 sender.Name, pktName,
378                                                 humanize.IBytes(pktSize), dst,
379                                         )
380                                 })
381                                 return err
382                         }
383                         les = les[:len(les)-1] // delete Tmp
384                 }
385                 ctx.LogI("rx", les, func(les LEs) string {
386                         return fmt.Sprintf(
387                                 "Got file %s (%s) from %s",
388                                 dst, humanize.IBytes(pktSize), sender.Name,
389                         )
390                 })
391                 if !dryRun {
392                         if jobPath != "" {
393                                 if doSeen {
394                                         if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
395                                                 fd.Close()
396                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
397                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
398                                                                 return fmt.Sprintf(
399                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
400                                                                         sender.Name, pktName,
401                                                                         humanize.IBytes(pktSize),
402                                                                         filepath.Base(jobPath),
403                                                                 )
404                                                         })
405                                                         return err
406                                                 }
407                                         }
408                                 }
409                                 if err = os.Remove(jobPath); err != nil {
410                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
411                                                 return fmt.Sprintf(
412                                                         "Tossing file %s/%s (%s): %s: removing",
413                                                         sender.Name, pktName,
414                                                         humanize.IBytes(pktSize), dst,
415                                                 )
416                                         })
417                                         return err
418                                 } else if ctx.HdrUsage {
419                                         os.Remove(jobPath + HdrSuffix)
420                                 }
421                         }
422                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
423                                 cmd := exec.Command(
424                                         sendmail[0],
425                                         append(sendmail[1:], ctx.NotifyFile.To)...,
426                                 )
427                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
428                                         "File from %s: %s (%s)",
429                                         sender.Name, dst, humanize.IBytes(pktSize),
430                                 ), nil)
431                                 if err = cmd.Run(); err != nil {
432                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
433                                                 return fmt.Sprintf(
434                                                         "Tossing file %s/%s (%s): %s: notifying",
435                                                         sender.Name, pktName,
436                                                         humanize.IBytes(pktSize), dst,
437                                                 )
438                                         })
439                                 }
440                         }
441                 }
442
443         case PktTypeFreq:
444                 if noFreq {
445                         return nil
446                 }
447                 src := string(pkt.Path[:int(pkt.PathLen)])
448                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
449                 if filepath.IsAbs(src) {
450                         err = errors.New("non-relative source path")
451                         ctx.LogE(
452                                 "rx-non-rel", les, err,
453                                 func(les LEs) string {
454                                         return fmt.Sprintf(
455                                                 "Tossing freq %s/%s (%s): %s: notifying",
456                                                 sender.Name, pktName,
457                                                 humanize.IBytes(pktSize), src,
458                                         )
459                                 },
460                         )
461                         return err
462                 }
463                 dstRaw, err := ioutil.ReadAll(pipeR)
464                 if err != nil {
465                         ctx.LogE("rx-read", les, err, func(les LEs) string {
466                                 return fmt.Sprintf(
467                                         "Tossing freq %s/%s (%s): %s: reading",
468                                         sender.Name, pktName,
469                                         humanize.IBytes(pktSize), src,
470                                 )
471                         })
472                         return err
473                 }
474                 dst := string(dstRaw)
475                 les = append(les, LE{"Dst", dst})
476                 freqPath := sender.FreqPath
477                 if freqPath == nil {
478                         err = errors.New("freqing is not allowed")
479                         ctx.LogE(
480                                 "rx-no-freq", les, err,
481                                 func(les LEs) string {
482                                         return fmt.Sprintf(
483                                                 "Tossing freq %s/%s (%s): %s -> %s",
484                                                 sender.Name, pktName,
485                                                 humanize.IBytes(pktSize), src, dst,
486                                         )
487                                 },
488                         )
489                         return err
490                 }
491                 if !dryRun {
492                         err = ctx.TxFile(
493                                 sender,
494                                 pkt.Nice,
495                                 filepath.Join(*freqPath, src),
496                                 dst,
497                                 sender.FreqChunked,
498                                 sender.FreqMinSize,
499                                 sender.FreqMaxSize,
500                                 nil,
501                         )
502                         if err != nil {
503                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
504                                         return fmt.Sprintf(
505                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
506                                                 sender.Name, pktName,
507                                                 humanize.IBytes(pktSize), src, dst,
508                                         )
509                                 })
510                                 return err
511                         }
512                 }
513                 ctx.LogI("rx", les, func(les LEs) string {
514                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
515                 })
516                 if !dryRun {
517                         if jobPath != "" {
518                                 if doSeen {
519                                         if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
520                                                 fd.Close()
521                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
522                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
523                                                                 return fmt.Sprintf(
524                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
525                                                                         sender.Name, pktName,
526                                                                         humanize.IBytes(pktSize),
527                                                                         filepath.Base(jobPath),
528                                                                 )
529                                                         })
530                                                         return err
531                                                 }
532                                         }
533                                 }
534                                 if err = os.Remove(jobPath); err != nil {
535                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
536                                                 return fmt.Sprintf(
537                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
538                                                         sender.Name, pktName,
539                                                         humanize.IBytes(pktSize), src, dst,
540                                                 )
541                                         })
542                                         return err
543                                 } else if ctx.HdrUsage {
544                                         os.Remove(jobPath + HdrSuffix)
545                                 }
546                         }
547                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
548                                 cmd := exec.Command(
549                                         sendmail[0],
550                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
551                                 )
552                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
553                                         "Freq from %s: %s", sender.Name, src,
554                                 ), nil)
555                                 if err = cmd.Run(); err != nil {
556                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
557                                                 return fmt.Sprintf(
558                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
559                                                         sender.Name, pktName,
560                                                         humanize.IBytes(pktSize), src, dst,
561                                                 )
562                                         })
563                                 }
564                         }
565                 }
566
567         case PktTypeTrns:
568                 if noTrns {
569                         return nil
570                 }
571                 dst := new([MTHSize]byte)
572                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
573                 nodeId := NodeId(*dst)
574                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
575                 logMsg := func(les LEs) string {
576                         return fmt.Sprintf(
577                                 "Tossing trns %s/%s (%s): %s",
578                                 sender.Name, pktName,
579                                 humanize.IBytes(pktSize),
580                                 nodeId.String(),
581                         )
582                 }
583                 node := ctx.Neigh[nodeId]
584                 if node == nil {
585                         err = errors.New("unknown node")
586                         ctx.LogE("rx-unknown", les, err, logMsg)
587                         return err
588                 }
589                 ctx.LogD("rx-tx", les, logMsg)
590                 if !dryRun {
591                         if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
592                                 ctx.LogE("rx", les, err, func(les LEs) string {
593                                         return logMsg(les) + ": txing"
594                                 })
595                                 return err
596                         }
597                 }
598                 ctx.LogI("rx", les, func(les LEs) string {
599                         return fmt.Sprintf(
600                                 "Got transitional packet from %s to %s (%s)",
601                                 sender.Name,
602                                 ctx.NodeName(&nodeId),
603                                 humanize.IBytes(pktSize),
604                         )
605                 })
606                 if !dryRun && jobPath != "" {
607                         if doSeen {
608                                 if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
609                                         fd.Close()
610                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
611                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
612                                                         return fmt.Sprintf(
613                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
614                                                                 sender.Name, pktName,
615                                                                 humanize.IBytes(pktSize),
616                                                                 filepath.Base(jobPath),
617                                                         )
618                                                 })
619                                                 return err
620                                         }
621                                 }
622                         }
623                         if err = os.Remove(jobPath); err != nil {
624                                 ctx.LogE("rx", les, err, func(les LEs) string {
625                                         return fmt.Sprintf(
626                                                 "Tossing trns %s/%s (%s): %s: removing",
627                                                 sender.Name, pktName,
628                                                 humanize.IBytes(pktSize),
629                                                 ctx.NodeName(&nodeId),
630                                         )
631                                 })
632                                 return err
633                         } else if ctx.HdrUsage {
634                                 os.Remove(jobPath + HdrSuffix)
635                         }
636                 }
637
638         case PktTypeArea:
639                 if noArea {
640                         return nil
641                 }
642                 areaId := new(AreaId)
643                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
644                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
645                 logMsg := func(les LEs) string {
646                         return fmt.Sprintf(
647                                 "Tossing %s/%s (%s): area %s",
648                                 sender.Name, pktName,
649                                 humanize.IBytes(pktSize),
650                                 ctx.AreaName(areaId),
651                         )
652                 }
653                 area := ctx.AreaId2Area[*areaId]
654                 if area == nil {
655                         err = errors.New("unknown area")
656                         ctx.LogE("rx-area-unknown", les, err, logMsg)
657                         return err
658                 }
659                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
660                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
661                 if err != nil {
662                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
663                         return err
664                 }
665                 msgHashRaw := blake2b.Sum256(pktEncRaw)
666                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
667                 les = append(les, LE{"AreaMsg", msgHash})
668                 ctx.LogD("rx-area", les, logMsg)
669
670                 if dryRun {
671                         for _, nodeId := range area.Subs {
672                                 node := ctx.Neigh[*nodeId]
673                                 lesEcho := append(les, LE{"Echo", nodeId})
674                                 seenDir := filepath.Join(
675                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
676                                 )
677                                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
678                                 logMsgNode := func(les LEs) string {
679                                         return fmt.Sprintf(
680                                                 "%s: echoing to: %s", logMsg(les), node.Name,
681                                         )
682                                 }
683                                 if _, err := os.Stat(seenPath); err == nil {
684                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
685                                                 return logMsgNode(les) + ": already sent"
686                                         })
687                                         continue
688                                 }
689                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
690                         }
691                 } else {
692                         for _, nodeId := range area.Subs {
693                                 node := ctx.Neigh[*nodeId]
694                                 lesEcho := append(les, LE{"Echo", nodeId})
695                                 seenDir := filepath.Join(
696                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
697                                 )
698                                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
699                                 logMsgNode := func(les LEs) string {
700                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
701                                 }
702                                 if _, err := os.Stat(seenPath); err == nil {
703                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
704                                                 return logMsgNode(les) + ": already sent"
705                                         })
706                                         continue
707                                 }
708                                 if nodeId != sender.Id {
709                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
710                                         if _, err = ctx.Tx(
711                                                 node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil,
712                                         ); err != nil {
713                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
714                                                 return err
715                                         }
716                                 }
717                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
718                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
719                                         return err
720                                 }
721                                 if fd, err := os.Create(seenPath); err == nil {
722                                         fd.Close()
723                                         if err = DirSync(seenDir); err != nil {
724                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
725                                                 return err
726                                         }
727                                 } else {
728                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
729                                         return err
730                                 }
731                                 return JobRepeatProcess
732                         }
733                 }
734
735                 seenDir := filepath.Join(
736                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
737                 )
738                 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
739                 if _, err := os.Stat(seenPath); err == nil {
740                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
741                                 return logMsg(les) + ": already seen"
742                         })
743                         if !dryRun && jobPath != "" {
744                                 if err = os.Remove(jobPath); err != nil {
745                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
746                                                 return fmt.Sprintf(
747                                                         "Tossing area %s/%s (%s): %s: removing",
748                                                         sender.Name, pktName,
749                                                         humanize.IBytes(pktSize),
750                                                         msgHash,
751                                                 )
752                                         })
753                                         return err
754                                 } else if ctx.HdrUsage {
755                                         os.Remove(jobPath + HdrSuffix)
756                                 }
757                         }
758                         return nil
759                 }
760
761                 if area.Prv == nil {
762                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
763                                 return logMsg(les) + ": no private key for decoding"
764                         })
765                 } else {
766                         signatureVerify := true
767                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
768                                 if !area.AllowUnknown {
769                                         err = errors.New("unknown sender")
770                                         ctx.LogE(
771                                                 "rx-area-unknown",
772                                                 append(les, LE{"Sender", pktEnc.Sender}),
773                                                 err,
774                                                 func(les LEs) string {
775                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
776                                                 },
777                                         )
778                                         return err
779                                 }
780                                 signatureVerify = false
781                         }
782                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
783                         copy(areaNodeOur.Id[:], area.Id[:])
784                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
785                         areaNode := Node{
786                                 Id:       new(NodeId),
787                                 Name:     area.Name,
788                                 Incoming: area.Incoming,
789                                 Exec:     area.Exec,
790                         }
791                         copy(areaNode.Id[:], area.Id[:])
792                         pktName := fmt.Sprintf(
793                                 "area/%s/%s",
794                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
795                         )
796
797                         pipeR, pipeW := io.Pipe()
798                         errs := make(chan error, 1)
799                         go func() {
800                                 errs <- jobProcess(
801                                         ctx,
802                                         pipeR,
803                                         pktName,
804                                         les,
805                                         &areaNode,
806                                         nice,
807                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
808                                         "",
809                                         decompressor,
810                                         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
811                                 )
812                         }()
813                         _, _, _, err = PktEncRead(
814                                 &areaNodeOur,
815                                 ctx.Neigh,
816                                 fullPipeR,
817                                 pipeW,
818                                 signatureVerify,
819                                 nil,
820                         )
821                         if err != nil {
822                                 pipeW.CloseWithError(err)
823                                 go func() { <-errs }()
824                                 return err
825                         }
826                         pipeW.Close()
827                         if err = <-errs; err != nil {
828                                 return err
829                         }
830                 }
831
832                 if !dryRun && jobPath != "" {
833                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
834                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
835                                 return err
836                         }
837                         if fd, err := os.Create(seenPath); err == nil {
838                                 fd.Close()
839                                 if err = DirSync(seenDir); err != nil {
840                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
841                                         return err
842                                 }
843                         }
844                         if err = os.Remove(jobPath); err != nil {
845                                 ctx.LogE("rx", les, err, func(les LEs) string {
846                                         return fmt.Sprintf(
847                                                 "Tossing area %s/%s (%s): %s: removing",
848                                                 sender.Name, pktName,
849                                                 humanize.IBytes(pktSize),
850                                                 msgHash,
851                                         )
852                                 })
853                                 return err
854                         } else if ctx.HdrUsage {
855                                 os.Remove(jobPath + HdrSuffix)
856                         }
857                 }
858
859         default:
860                 err = errors.New("unknown type")
861                 ctx.LogE(
862                         "rx-type-unknown", les, err,
863                         func(les LEs) string {
864                                 return fmt.Sprintf(
865                                         "Tossing %s/%s (%s)",
866                                         sender.Name, pktName, humanize.IBytes(pktSize),
867                                 )
868                         },
869                 )
870                 return err
871         }
872         return nil
873 }
874
875 func (ctx *Ctx) Toss(
876         nodeId *NodeId,
877         xx TRxTx,
878         nice uint8,
879         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
880 ) bool {
881         dirLock, err := ctx.LockDir(nodeId, "toss")
882         if err != nil {
883                 return false
884         }
885         defer ctx.UnlockDir(dirLock)
886         isBad := false
887         decompressor, err := zstd.NewReader(nil)
888         if err != nil {
889                 panic(err)
890         }
891         defer decompressor.Close()
892         for job := range ctx.Jobs(nodeId, xx) {
893                 pktName := filepath.Base(job.Path)
894                 les := LEs{
895                         {"Node", job.PktEnc.Sender},
896                         {"Pkt", pktName},
897                         {"Nice", int(job.PktEnc.Nice)},
898                 }
899                 if job.PktEnc.Nice > nice {
900                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
901                                 return fmt.Sprintf(
902                                         "Tossing %s/%s: too nice: %s",
903                                         ctx.NodeName(job.PktEnc.Sender), pktName,
904                                         NicenessFmt(job.PktEnc.Nice),
905                                 )
906                         })
907                         continue
908                 }
909                 fd, err := os.Open(job.Path)
910                 if err != nil {
911                         ctx.LogE("rx-open", les, err, func(les LEs) string {
912                                 return fmt.Sprintf(
913                                         "Tossing %s/%s: opening %s",
914                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
915                                 )
916                         })
917                         isBad = true
918                         continue
919                 }
920                 errs := make(chan error, 1)
921                 var sharedKey []byte
922         Retry:
923                 pipeR, pipeW := io.Pipe()
924                 go func() {
925                         errs <- jobProcess(
926                                 ctx,
927                                 pipeR,
928                                 pktName,
929                                 les,
930                                 ctx.Neigh[*job.PktEnc.Sender],
931                                 job.PktEnc.Nice,
932                                 uint64(pktSizeWithoutEnc(job.Size)),
933                                 job.Path,
934                                 decompressor,
935                                 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
936                         )
937                 }()
938                 pipeWB := bufio.NewWriter(pipeW)
939                 sharedKey, _, _, err = PktEncRead(
940                         ctx.Self,
941                         ctx.Neigh,
942                         bufio.NewReader(fd),
943                         pipeWB,
944                         sharedKey == nil,
945                         sharedKey,
946                 )
947                 if err != nil {
948                         pipeW.CloseWithError(err)
949                 }
950                 if err := pipeWB.Flush(); err != nil {
951                         pipeW.CloseWithError(err)
952                 }
953                 pipeW.Close()
954
955                 if err != nil {
956                         isBad = true
957                         fd.Close()
958                         go func() { <-errs }()
959                         continue
960                 }
961                 if err = <-errs; err == JobRepeatProcess {
962                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
963                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
964                                         return fmt.Sprintf(
965                                                 "Tossing %s/%s: can not seek",
966                                                 ctx.NodeName(job.PktEnc.Sender),
967                                                 pktName,
968                                         )
969                                 })
970                                 isBad = true
971                                 break
972                         }
973                         goto Retry
974                 } else if err != nil {
975                         isBad = true
976                 }
977                 fd.Close()
978         }
979         return isBad
980 }
981
982 func (ctx *Ctx) AutoToss(
983         nodeId *NodeId,
984         nice uint8,
985         doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
986 ) (chan struct{}, chan bool) {
987         finish := make(chan struct{})
988         badCode := make(chan bool)
989         go func() {
990                 bad := false
991                 for {
992                         select {
993                         case <-finish:
994                                 badCode <- bad
995                                 break
996                         default:
997                         }
998                         time.Sleep(time.Second)
999                         bad = !ctx.Toss(
1000                                 nodeId, TRx, nice, false,
1001                                 doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
1002                 }
1003         }()
1004         return finish, badCode
1005 }