]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
d365e2efd5c47c172f607bee0f8919817da83189
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/fs"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenDir = "seen"
47 )
48
49 type TossOpts struct {
50         Nice   uint8
51         DryRun bool
52         DoSeen bool
53         NoFile bool
54         NoFreq bool
55         NoExec bool
56         NoTrns bool
57         NoArea bool
58         NoACK  bool
59 }
60
61 func jobPath2Seen(jobPath string) string {
62         return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
63 }
64
65 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
66         lines := []string{
67                 "From: " + fromTo.From,
68                 "To: " + fromTo.To,
69                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
70         }
71         if len(body) > 0 {
72                 lines = append(
73                         lines,
74                         "MIME-Version: 1.0",
75                         "Content-Type: text/plain; charset=utf-8",
76                         "Content-Transfer-Encoding: base64",
77                         "",
78                         base64.StdEncoding.EncodeToString(body),
79                 )
80         }
81         return strings.NewReader(strings.Join(lines, "\n"))
82 }
83
84 func pktSizeWithoutEnc(pktSize int64) int64 {
85         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
86         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
87         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
88                 pktSize -= poly1305.TagSize
89         }
90         pktSize -= pktSizeBlocks * poly1305.TagSize
91         return pktSize
92 }
93
94 var JobRepeatProcess = errors.New("needs processing repeat")
95
96 func jobProcess(
97         ctx *Ctx,
98         pipeR *io.PipeReader,
99         pktName string,
100         les LEs,
101         sender *Node,
102         nice uint8,
103         pktSize uint64,
104         jobPath string,
105         decompressor *zstd.Decoder,
106         opts *TossOpts,
107 ) error {
108         defer pipeR.Close()
109         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
110         var pkt Pkt
111         _, err := xdr.Unmarshal(pipeR, &pkt)
112         if err != nil {
113                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
114                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
115                 })
116                 return err
117         }
118         les = append(les, LE{"Size", int64(pktSize)})
119         ctx.LogD("rx", les, func(les LEs) string {
120                 return fmt.Sprintf(
121                         "Tossing %s/%s (%s)",
122                         sender.Name, pktName,
123                         humanize.IBytes(pktSize),
124                 )
125         })
126         switch pkt.Type {
127         case PktTypeExec, PktTypeExecFat:
128                 if opts.NoExec {
129                         return nil
130                 }
131                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
132                 handle := string(path[0])
133                 args := make([]string, 0, len(path)-1)
134                 for _, p := range path[1:] {
135                         args = append(args, string(p))
136                 }
137                 argsStr := strings.Join(append([]string{handle}, args...), " ")
138                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
139                 cmdline := sender.Exec[handle]
140                 if len(cmdline) == 0 {
141                         err = errors.New("No handle found")
142                         ctx.LogE(
143                                 "rx-no-handle", les, err,
144                                 func(les LEs) string {
145                                         return fmt.Sprintf(
146                                                 "Tossing exec %s/%s (%s): %s",
147                                                 sender.Name, pktName,
148                                                 humanize.IBytes(pktSize), argsStr,
149                                         )
150                                 },
151                         )
152                         return err
153                 }
154                 if pkt.Type == PktTypeExec {
155                         if err = decompressor.Reset(pipeR); err != nil {
156                                 log.Fatalln(err)
157                         }
158                 }
159                 if !opts.DryRun {
160                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
161                         cmd.Env = append(
162                                 cmd.Env,
163                                 "NNCP_SELF="+ctx.Self.Id.String(),
164                                 "NNCP_SENDER="+sender.Id.String(),
165                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
166                         )
167                         if pkt.Type == PktTypeExec {
168                                 cmd.Stdin = decompressor
169                         } else {
170                                 cmd.Stdin = pipeR
171                         }
172                         output, err := cmd.CombinedOutput()
173                         if err != nil {
174                                 les = append(les, LE{"Output", strings.Split(
175                                         strings.Trim(string(output), "\n"), "\n"),
176                                 })
177                                 ctx.LogE("rx-handle", les, err, func(les LEs) string {
178                                         return fmt.Sprintf(
179                                                 "Tossing exec %s/%s (%s): %s: handling",
180                                                 sender.Name, pktName,
181                                                 humanize.IBytes(uint64(pktSize)), argsStr,
182                                         )
183                                 })
184                                 return err
185                         }
186                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
187                                 notify := ctx.NotifyExec[sender.Name+"."+handle]
188                                 if notify == nil {
189                                         notify = ctx.NotifyExec["*."+handle]
190                                 }
191                                 if notify != nil {
192                                         cmd := exec.Command(
193                                                 sendmail[0],
194                                                 append(sendmail[1:], notify.To)...,
195                                         )
196                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
197                                                 "Exec from %s: %s", sender.Name, argsStr,
198                                         ), output)
199                                         if err = cmd.Run(); err != nil {
200                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
201                                                         return fmt.Sprintf(
202                                                                 "Tossing exec %s/%s (%s): %s: notifying",
203                                                                 sender.Name, pktName,
204                                                                 humanize.IBytes(pktSize), argsStr,
205                                                         )
206                                                 })
207                                         }
208                                 }
209                         }
210                 }
211                 ctx.LogI("rx", les, func(les LEs) string {
212                         return fmt.Sprintf(
213                                 "Got exec from %s to %s (%s)",
214                                 sender.Name, argsStr,
215                                 humanize.IBytes(pktSize),
216                         )
217                 })
218                 if !opts.DryRun && jobPath != "" {
219                         if opts.DoSeen {
220                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
221                                         return err
222                                 }
223                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
224                                         fd.Close()
225                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
226                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
227                                                         return fmt.Sprintf(
228                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
229                                                                 sender.Name, pktName,
230                                                                 humanize.IBytes(pktSize),
231                                                                 filepath.Base(jobPath),
232                                                         )
233                                                 })
234                                                 return err
235                                         }
236                                 }
237                         }
238                         if err = os.Remove(jobPath); err != nil {
239                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
240                                         return fmt.Sprintf(
241                                                 "Tossing exec %s/%s (%s): %s: notifying",
242                                                 sender.Name, pktName,
243                                                 humanize.IBytes(pktSize), argsStr,
244                                         )
245                                 })
246                                 return err
247                         } else if ctx.HdrUsage {
248                                 os.Remove(JobPath2Hdr(jobPath))
249                         }
250                 }
251
252         case PktTypeFile:
253                 if opts.NoFile {
254                         return nil
255                 }
256                 dst := string(pkt.Path[:int(pkt.PathLen)])
257                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
258                 if filepath.IsAbs(dst) {
259                         err = errors.New("non-relative destination path")
260                         ctx.LogE(
261                                 "rx-non-rel", les, err,
262                                 func(les LEs) string {
263                                         return fmt.Sprintf(
264                                                 "Tossing file %s/%s (%s): %s",
265                                                 sender.Name, pktName,
266                                                 humanize.IBytes(pktSize), dst,
267                                         )
268                                 },
269                         )
270                         return err
271                 }
272                 incoming := sender.Incoming
273                 if incoming == nil {
274                         err = errors.New("incoming is not allowed")
275                         ctx.LogE(
276                                 "rx-no-incoming", les, err,
277                                 func(les LEs) string {
278                                         return fmt.Sprintf(
279                                                 "Tossing file %s/%s (%s): %s",
280                                                 sender.Name, pktName,
281                                                 humanize.IBytes(pktSize), dst,
282                                         )
283                                 },
284                         )
285                         return err
286                 }
287                 dir := filepath.Join(*incoming, path.Dir(dst))
288                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
289                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
290                                 return fmt.Sprintf(
291                                         "Tossing file %s/%s (%s): %s: mkdir",
292                                         sender.Name, pktName,
293                                         humanize.IBytes(pktSize), dst,
294                                 )
295                         })
296                         return err
297                 }
298                 if !opts.DryRun {
299                         tmp, err := TempFile(dir, "file")
300                         if err != nil {
301                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
302                                         return fmt.Sprintf(
303                                                 "Tossing file %s/%s (%s): %s: mktemp",
304                                                 sender.Name, pktName,
305                                                 humanize.IBytes(pktSize), dst,
306                                         )
307                                 })
308                                 return err
309                         }
310                         les = append(les, LE{"Tmp", tmp.Name()})
311                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
312                                 return fmt.Sprintf(
313                                         "Tossing file %s/%s (%s): %s: created: %s",
314                                         sender.Name, pktName,
315                                         humanize.IBytes(pktSize), dst, tmp.Name(),
316                                 )
317                         })
318                         bufW := bufio.NewWriter(tmp)
319                         if _, err = CopyProgressed(
320                                 bufW, pipeR, "Rx file",
321                                 append(les, LE{"FullSize", int64(pktSize)}),
322                                 ctx.ShowPrgrs,
323                         ); err != nil {
324                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
325                                         return fmt.Sprintf(
326                                                 "Tossing file %s/%s (%s): %s: copying",
327                                                 sender.Name, pktName,
328                                                 humanize.IBytes(pktSize), dst,
329                                         )
330                                 })
331                                 return err
332                         }
333                         if err = bufW.Flush(); err != nil {
334                                 tmp.Close()
335                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
336                                         return fmt.Sprintf(
337                                                 "Tossing file %s/%s (%s): %s: flushing",
338                                                 sender.Name, pktName,
339                                                 humanize.IBytes(pktSize), dst,
340                                         )
341                                 })
342                                 return err
343                         }
344                         if !NoSync {
345                                 if err = tmp.Sync(); err != nil {
346                                         tmp.Close()
347                                         ctx.LogE("rx-sync", les, err, func(les LEs) string {
348                                                 return fmt.Sprintf(
349                                                         "Tossing file %s/%s (%s): %s: syncing",
350                                                         sender.Name, pktName,
351                                                         humanize.IBytes(pktSize), dst,
352                                                 )
353                                         })
354                                         return err
355                                 }
356                         }
357                         if err = tmp.Close(); err != nil {
358                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
359                                         return fmt.Sprintf(
360                                                 "Tossing file %s/%s (%s): %s: closing",
361                                                 sender.Name, pktName,
362                                                 humanize.IBytes(pktSize), dst,
363                                         )
364                                 })
365                                 return err
366                         }
367                         dstPathOrig := filepath.Join(*incoming, dst)
368                         dstPath := dstPathOrig
369                         dstPathCtr := 0
370                         for {
371                                 if _, err = os.Stat(dstPath); err != nil {
372                                         if errors.Is(err, fs.ErrNotExist) {
373                                                 break
374                                         }
375                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
376                                                 return fmt.Sprintf(
377                                                         "Tossing file %s/%s (%s): %s: stating: %s",
378                                                         sender.Name, pktName,
379                                                         humanize.IBytes(pktSize), dst, dstPath,
380                                                 )
381                                         })
382                                         return err
383                                 }
384                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
385                                 dstPathCtr++
386                         }
387                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
388                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
389                                         return fmt.Sprintf(
390                                                 "Tossing file %s/%s (%s): %s: renaming",
391                                                 sender.Name, pktName,
392                                                 humanize.IBytes(pktSize), dst,
393                                         )
394                                 })
395                                 return err
396                         }
397                         if err = DirSync(*incoming); err != nil {
398                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
399                                         return fmt.Sprintf(
400                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
401                                                 sender.Name, pktName,
402                                                 humanize.IBytes(pktSize), dst,
403                                         )
404                                 })
405                                 return err
406                         }
407                         les = les[:len(les)-1] // delete Tmp
408                 }
409                 ctx.LogI("rx", les, func(les LEs) string {
410                         return fmt.Sprintf(
411                                 "Got file %s (%s) from %s",
412                                 dst, humanize.IBytes(pktSize), sender.Name,
413                         )
414                 })
415                 if !opts.DryRun {
416                         if jobPath != "" {
417                                 if opts.DoSeen {
418                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
419                                                 return err
420                                         }
421                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
422                                                 fd.Close()
423                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
424                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
425                                                                 return fmt.Sprintf(
426                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
427                                                                         sender.Name, pktName,
428                                                                         humanize.IBytes(pktSize),
429                                                                         filepath.Base(jobPath),
430                                                                 )
431                                                         })
432                                                         return err
433                                                 }
434                                         }
435                                 }
436                                 if err = os.Remove(jobPath); err != nil {
437                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
438                                                 return fmt.Sprintf(
439                                                         "Tossing file %s/%s (%s): %s: removing",
440                                                         sender.Name, pktName,
441                                                         humanize.IBytes(pktSize), dst,
442                                                 )
443                                         })
444                                         return err
445                                 } else if ctx.HdrUsage {
446                                         os.Remove(JobPath2Hdr(jobPath))
447                                 }
448                         }
449                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
450                                 cmd := exec.Command(
451                                         sendmail[0],
452                                         append(sendmail[1:], ctx.NotifyFile.To)...,
453                                 )
454                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
455                                         "File from %s: %s (%s)",
456                                         sender.Name, dst, humanize.IBytes(pktSize),
457                                 ), nil)
458                                 if err = cmd.Run(); err != nil {
459                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
460                                                 return fmt.Sprintf(
461                                                         "Tossing file %s/%s (%s): %s: notifying",
462                                                         sender.Name, pktName,
463                                                         humanize.IBytes(pktSize), dst,
464                                                 )
465                                         })
466                                 }
467                         }
468                 }
469
470         case PktTypeFreq:
471                 if opts.NoFreq {
472                         return nil
473                 }
474                 src := string(pkt.Path[:int(pkt.PathLen)])
475                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
476                 if filepath.IsAbs(src) {
477                         err = errors.New("non-relative source path")
478                         ctx.LogE(
479                                 "rx-non-rel", les, err,
480                                 func(les LEs) string {
481                                         return fmt.Sprintf(
482                                                 "Tossing freq %s/%s (%s): %s: notifying",
483                                                 sender.Name, pktName,
484                                                 humanize.IBytes(pktSize), src,
485                                         )
486                                 },
487                         )
488                         return err
489                 }
490                 dstRaw, err := io.ReadAll(pipeR)
491                 if err != nil {
492                         ctx.LogE("rx-read", les, err, func(les LEs) string {
493                                 return fmt.Sprintf(
494                                         "Tossing freq %s/%s (%s): %s: reading",
495                                         sender.Name, pktName,
496                                         humanize.IBytes(pktSize), src,
497                                 )
498                         })
499                         return err
500                 }
501                 dst := string(dstRaw)
502                 les = append(les, LE{"Dst", dst})
503                 freqPath := sender.FreqPath
504                 if freqPath == nil {
505                         err = errors.New("freqing is not allowed")
506                         ctx.LogE(
507                                 "rx-no-freq", les, err,
508                                 func(les LEs) string {
509                                         return fmt.Sprintf(
510                                                 "Tossing freq %s/%s (%s): %s -> %s",
511                                                 sender.Name, pktName,
512                                                 humanize.IBytes(pktSize), src, dst,
513                                         )
514                                 },
515                         )
516                         return err
517                 }
518                 if !opts.DryRun {
519                         err = ctx.TxFile(
520                                 sender,
521                                 pkt.Nice,
522                                 filepath.Join(*freqPath, src),
523                                 dst,
524                                 sender.FreqChunked,
525                                 sender.FreqMinSize,
526                                 sender.FreqMaxSize,
527                                 nil,
528                         )
529                         if err != nil {
530                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
531                                         return fmt.Sprintf(
532                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
533                                                 sender.Name, pktName,
534                                                 humanize.IBytes(pktSize), src, dst,
535                                         )
536                                 })
537                                 return err
538                         }
539                 }
540                 ctx.LogI("rx", les, func(les LEs) string {
541                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
542                 })
543                 if !opts.DryRun {
544                         if jobPath != "" {
545                                 if opts.DoSeen {
546                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
547                                                 return err
548                                         }
549                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
550                                                 fd.Close()
551                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
552                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
553                                                                 return fmt.Sprintf(
554                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
555                                                                         sender.Name, pktName,
556                                                                         humanize.IBytes(pktSize),
557                                                                         filepath.Base(jobPath),
558                                                                 )
559                                                         })
560                                                         return err
561                                                 }
562                                         }
563                                 }
564                                 if err = os.Remove(jobPath); err != nil {
565                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
566                                                 return fmt.Sprintf(
567                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
568                                                         sender.Name, pktName,
569                                                         humanize.IBytes(pktSize), src, dst,
570                                                 )
571                                         })
572                                         return err
573                                 } else if ctx.HdrUsage {
574                                         os.Remove(JobPath2Hdr(jobPath))
575                                 }
576                         }
577                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
578                                 cmd := exec.Command(
579                                         sendmail[0],
580                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
581                                 )
582                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
583                                         "Freq from %s: %s", sender.Name, src,
584                                 ), nil)
585                                 if err = cmd.Run(); err != nil {
586                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
587                                                 return fmt.Sprintf(
588                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
589                                                         sender.Name, pktName,
590                                                         humanize.IBytes(pktSize), src, dst,
591                                                 )
592                                         })
593                                 }
594                         }
595                 }
596
597         case PktTypeTrns:
598                 if opts.NoTrns {
599                         return nil
600                 }
601                 dst := new([MTHSize]byte)
602                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
603                 nodeId := NodeId(*dst)
604                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
605                 logMsg := func(les LEs) string {
606                         return fmt.Sprintf(
607                                 "Tossing trns %s/%s (%s): %s",
608                                 sender.Name, pktName,
609                                 humanize.IBytes(pktSize),
610                                 nodeId.String(),
611                         )
612                 }
613                 node := ctx.Neigh[nodeId]
614                 if node == nil {
615                         err = errors.New("unknown node")
616                         ctx.LogE("rx-unknown", les, err, logMsg)
617                         return err
618                 }
619                 ctx.LogD("rx-tx", les, logMsg)
620                 if !opts.DryRun {
621                         if len(node.Via) == 0 {
622                                 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
623                                         ctx.LogE("rx", les, err, func(les LEs) string {
624                                                 return logMsg(les) + ": txing"
625                                         })
626                                         return err
627                                 }
628                         } else {
629                                 via := node.Via[:len(node.Via)-1]
630                                 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
631                                 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
632                                 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
633                                 if err != nil {
634                                         panic(err)
635                                 }
636                                 if _, _, _, err = ctx.Tx(
637                                         node,
638                                         pktTrns,
639                                         nice,
640                                         int64(pktSize), 0, MaxFileSize,
641                                         pipeR,
642                                         pktName,
643                                         nil,
644                                 ); err != nil {
645                                         ctx.LogE("rx", les, err, func(les LEs) string {
646                                                 return logMsg(les) + ": txing"
647                                         })
648                                         return err
649                                 }
650                         }
651                 }
652                 ctx.LogI("rx", les, func(les LEs) string {
653                         return fmt.Sprintf(
654                                 "Got transitional packet from %s to %s (%s)",
655                                 sender.Name,
656                                 ctx.NodeName(&nodeId),
657                                 humanize.IBytes(pktSize),
658                         )
659                 })
660                 if !opts.DryRun && jobPath != "" {
661                         if opts.DoSeen {
662                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
663                                         return err
664                                 }
665                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
666                                         fd.Close()
667                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
668                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
669                                                         return fmt.Sprintf(
670                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
671                                                                 sender.Name, pktName,
672                                                                 humanize.IBytes(pktSize),
673                                                                 filepath.Base(jobPath),
674                                                         )
675                                                 })
676                                                 return err
677                                         }
678                                 }
679                         }
680                         if err = os.Remove(jobPath); err != nil {
681                                 ctx.LogE("rx", les, err, func(les LEs) string {
682                                         return fmt.Sprintf(
683                                                 "Tossing trns %s/%s (%s): %s: removing",
684                                                 sender.Name, pktName,
685                                                 humanize.IBytes(pktSize),
686                                                 ctx.NodeName(&nodeId),
687                                         )
688                                 })
689                                 return err
690                         } else if ctx.HdrUsage {
691                                 os.Remove(JobPath2Hdr(jobPath))
692                         }
693                 }
694
695         case PktTypeArea:
696                 if opts.NoArea {
697                         return nil
698                 }
699                 areaId := new(AreaId)
700                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
701                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
702                 logMsg := func(les LEs) string {
703                         return fmt.Sprintf(
704                                 "Tossing %s/%s (%s): area %s",
705                                 sender.Name, pktName,
706                                 humanize.IBytes(pktSize),
707                                 ctx.AreaName(areaId),
708                         )
709                 }
710                 area := ctx.AreaId2Area[*areaId]
711                 if area == nil {
712                         err = errors.New("unknown area")
713                         ctx.LogE("rx-area-unknown", les, err, logMsg)
714                         return err
715                 }
716                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
717                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
718                 if err != nil {
719                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
720                         return err
721                 }
722                 msgHashRaw := blake2b.Sum256(pktEncRaw)
723                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
724                 les = append(les, LE{"AreaMsg", msgHash})
725                 ctx.LogD("rx-area", les, logMsg)
726
727                 if opts.DryRun {
728                         for _, nodeId := range area.Subs {
729                                 node := ctx.Neigh[*nodeId]
730                                 lesEcho := append(les, LE{"Echo", nodeId})
731                                 seenDir := filepath.Join(
732                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
733                                 )
734                                 seenPath := filepath.Join(seenDir, msgHash)
735                                 logMsgNode := func(les LEs) string {
736                                         return fmt.Sprintf(
737                                                 "%s: echoing to: %s", logMsg(les), node.Name,
738                                         )
739                                 }
740                                 if _, err := os.Stat(seenPath); err == nil {
741                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
742                                                 return logMsgNode(les) + ": already sent"
743                                         })
744                                         continue
745                                 }
746                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
747                         }
748                 } else {
749                         for _, nodeId := range area.Subs {
750                                 node := ctx.Neigh[*nodeId]
751                                 lesEcho := append(les, LE{"Echo", nodeId})
752                                 seenDir := filepath.Join(
753                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
754                                 )
755                                 seenPath := filepath.Join(seenDir, msgHash)
756                                 logMsgNode := func(les LEs) string {
757                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
758                                 }
759                                 if _, err := os.Stat(seenPath); err == nil {
760                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
761                                                 return logMsgNode(les) + ": already sent"
762                                         })
763                                         continue
764                                 }
765                                 if nodeId != sender.Id && nodeId != pktEnc.Sender {
766                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
767                                         if _, _, _, err = ctx.Tx(
768                                                 node,
769                                                 &pkt,
770                                                 nice,
771                                                 int64(pktSize), 0, MaxFileSize,
772                                                 fullPipeR,
773                                                 pktName,
774                                                 nil,
775                                         ); err != nil {
776                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
777                                                 return err
778                                         }
779                                 }
780                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
781                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
782                                         return err
783                                 }
784                                 if fd, err := os.Create(seenPath); err == nil {
785                                         fd.Close()
786                                         if err = DirSync(seenDir); err != nil {
787                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
788                                                 return err
789                                         }
790                                 } else {
791                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
792                                         return err
793                                 }
794                                 return JobRepeatProcess
795                         }
796                 }
797
798                 seenDir := filepath.Join(
799                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
800                 )
801                 seenPath := filepath.Join(seenDir, msgHash)
802                 if _, err := os.Stat(seenPath); err == nil {
803                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
804                                 return logMsg(les) + ": already seen"
805                         })
806                         if !opts.DryRun && jobPath != "" {
807                                 if err = os.Remove(jobPath); err != nil {
808                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
809                                                 return fmt.Sprintf(
810                                                         "Tossing area %s/%s (%s): %s: removing",
811                                                         sender.Name, pktName,
812                                                         humanize.IBytes(pktSize),
813                                                         msgHash,
814                                                 )
815                                         })
816                                         return err
817                                 } else if ctx.HdrUsage {
818                                         os.Remove(JobPath2Hdr(jobPath))
819                                 }
820                         }
821                         return nil
822                 }
823
824                 if area.Prv == nil {
825                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
826                                 return logMsg(les) + ": no private key for decoding"
827                         })
828                 } else {
829                         signatureVerify := true
830                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
831                                 if !area.AllowUnknown {
832                                         err = errors.New("unknown sender")
833                                         ctx.LogE(
834                                                 "rx-area-unknown",
835                                                 append(les, LE{"Sender", pktEnc.Sender}),
836                                                 err,
837                                                 func(les LEs) string {
838                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
839                                                 },
840                                         )
841                                         return err
842                                 }
843                                 signatureVerify = false
844                         }
845                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
846                         copy(areaNodeOur.Id[:], area.Id[:])
847                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
848                         areaNode := Node{
849                                 Id:       new(NodeId),
850                                 Name:     area.Name,
851                                 Incoming: area.Incoming,
852                                 Exec:     area.Exec,
853                         }
854                         copy(areaNode.Id[:], area.Id[:])
855                         pktName := fmt.Sprintf(
856                                 "area/%s/%s",
857                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
858                         )
859
860                         pipeR, pipeW := io.Pipe()
861                         errs := make(chan error, 1)
862                         go func() {
863                                 errs <- jobProcess(
864                                         ctx,
865                                         pipeR,
866                                         pktName,
867                                         les,
868                                         &areaNode,
869                                         nice,
870                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
871                                         "",
872                                         decompressor,
873                                         opts,
874                                 )
875                         }()
876                         _, _, _, err = PktEncRead(
877                                 &areaNodeOur,
878                                 ctx.Neigh,
879                                 fullPipeR,
880                                 pipeW,
881                                 signatureVerify,
882                                 nil,
883                         )
884                         if err != nil {
885                                 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
886                                 pipeW.CloseWithError(err)
887                                 <-errs
888                                 return err
889                         }
890                         pipeW.Close()
891                         if err = <-errs; err != nil {
892                                 return err
893                         }
894                 }
895
896                 if !opts.DryRun && jobPath != "" {
897                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
898                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
899                                 return err
900                         }
901                         if fd, err := os.Create(seenPath); err == nil {
902                                 fd.Close()
903                                 if err = DirSync(seenDir); err != nil {
904                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
905                                         return err
906                                 }
907                         }
908                         if err = os.Remove(jobPath); err != nil {
909                                 ctx.LogE("rx", les, err, func(les LEs) string {
910                                         return fmt.Sprintf(
911                                                 "Tossing area %s/%s (%s): %s: removing",
912                                                 sender.Name, pktName,
913                                                 humanize.IBytes(pktSize),
914                                                 msgHash,
915                                         )
916                                 })
917                                 return err
918                         } else if ctx.HdrUsage {
919                                 os.Remove(JobPath2Hdr(jobPath))
920                         }
921                 }
922
923         case PktTypeACK:
924                 if opts.NoACK {
925                         return nil
926                 }
927                 hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
928                 les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
929                 logMsg := func(les LEs) string {
930                         return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
931                 }
932                 ctx.LogD("rx-ack", les, logMsg)
933                 pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
934                 if _, err := os.Stat(pktPath); err == nil {
935                         if !opts.DryRun {
936                                 if err = os.Remove(pktPath); err != nil {
937                                         ctx.LogE("rx-ack", les, err, func(les LEs) string {
938                                                 return logMsg(les) + ": removing packet"
939                                         })
940                                         return err
941                                 } else if ctx.HdrUsage {
942                                         os.Remove(JobPath2Hdr(pktPath))
943                                 }
944                         }
945                 } else {
946                         ctx.LogD("rx-ack", les, func(les LEs) string {
947                                 return logMsg(les) + ": already disappeared"
948                         })
949                 }
950                 if !opts.DryRun && opts.DoSeen {
951                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
952                                 return err
953                         }
954                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
955                                 fd.Close()
956                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
957                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
958                                                 return fmt.Sprintf(
959                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
960                                                         sender.Name, pktName,
961                                                         humanize.IBytes(pktSize),
962                                                         filepath.Base(jobPath),
963                                                 )
964                                         })
965                                         return err
966                                 }
967                         }
968                 }
969                 if !opts.DryRun {
970                         if err = os.Remove(jobPath); err != nil {
971                                 ctx.LogE("rx", les, err, func(les LEs) string {
972                                         return logMsg(les) + ": removing job"
973                                 })
974                                 return err
975                         } else if ctx.HdrUsage {
976                                 os.Remove(JobPath2Hdr(jobPath))
977                         }
978                 }
979                 ctx.LogI("rx", les, func(les LEs) string {
980                         return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
981                 })
982
983         default:
984                 err = errors.New("unknown type")
985                 ctx.LogE(
986                         "rx-type-unknown", les, err,
987                         func(les LEs) string {
988                                 return fmt.Sprintf(
989                                         "Tossing %s/%s (%s)",
990                                         sender.Name, pktName, humanize.IBytes(pktSize),
991                                 )
992                         },
993                 )
994                 return err
995         }
996         return nil
997 }
998
999 func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool {
1000         dirLock, err := ctx.LockDir(nodeId, "toss")
1001         if err != nil {
1002                 return false
1003         }
1004         defer ctx.UnlockDir(dirLock)
1005         isBad := false
1006         decompressor, err := zstd.NewReader(nil)
1007         if err != nil {
1008                 panic(err)
1009         }
1010         defer decompressor.Close()
1011         for job := range ctx.Jobs(nodeId, xx) {
1012                 pktName := filepath.Base(job.Path)
1013                 les := LEs{
1014                         {"Node", job.PktEnc.Sender},
1015                         {"Pkt", pktName},
1016                         {"Nice", int(job.PktEnc.Nice)},
1017                 }
1018                 if job.PktEnc.Nice > opts.Nice {
1019                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
1020                                 return fmt.Sprintf(
1021                                         "Tossing %s/%s: too nice: %s",
1022                                         ctx.NodeName(job.PktEnc.Sender), pktName,
1023                                         NicenessFmt(job.PktEnc.Nice),
1024                                 )
1025                         })
1026                         continue
1027                 }
1028                 fd, err := os.Open(job.Path)
1029                 if err != nil {
1030                         ctx.LogE("rx-open", les, err, func(les LEs) string {
1031                                 return fmt.Sprintf(
1032                                         "Tossing %s/%s: opening %s",
1033                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
1034                                 )
1035                         })
1036                         isBad = true
1037                         continue
1038                 }
1039                 sender := ctx.Neigh[*job.PktEnc.Sender]
1040                 if sender == nil {
1041                         err := errors.New("unknown node")
1042                         ctx.LogE("rx-open", les, err, func(les LEs) string {
1043                                 return fmt.Sprintf(
1044                                         "Tossing %s/%s",
1045                                         ctx.NodeName(job.PktEnc.Sender), pktName,
1046                                 )
1047                         })
1048                         isBad = true
1049                         continue
1050                 }
1051                 errs := make(chan error, 1)
1052                 var sharedKey []byte
1053         Retry:
1054                 pipeR, pipeW := io.Pipe()
1055                 go func() {
1056                         errs <- jobProcess(
1057                                 ctx,
1058                                 pipeR,
1059                                 pktName,
1060                                 les,
1061                                 sender,
1062                                 job.PktEnc.Nice,
1063                                 uint64(pktSizeWithoutEnc(job.Size)),
1064                                 job.Path,
1065                                 decompressor,
1066                                 opts,
1067                         )
1068                 }()
1069                 pipeWB := bufio.NewWriter(pipeW)
1070                 sharedKey, _, _, err = PktEncRead(
1071                         ctx.Self,
1072                         ctx.Neigh,
1073                         bufio.NewReaderSize(fd, MTHBlockSize),
1074                         pipeWB,
1075                         sharedKey == nil,
1076                         sharedKey,
1077                 )
1078                 if err != nil {
1079                         pipeW.CloseWithError(err)
1080                 }
1081                 if err := pipeWB.Flush(); err != nil {
1082                         pipeW.CloseWithError(err)
1083                 }
1084                 pipeW.Close()
1085
1086                 if err != nil {
1087                         isBad = true
1088                         fd.Close()
1089                         <-errs
1090                         continue
1091                 }
1092                 if err = <-errs; err == JobRepeatProcess {
1093                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1094                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1095                                         return fmt.Sprintf(
1096                                                 "Tossing %s/%s: can not seek",
1097                                                 ctx.NodeName(job.PktEnc.Sender),
1098                                                 pktName,
1099                                         )
1100                                 })
1101                                 isBad = true
1102                                 break
1103                         }
1104                         goto Retry
1105                 } else if err != nil {
1106                         isBad = true
1107                 }
1108                 fd.Close()
1109         }
1110         return isBad
1111 }
1112
1113 func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) {
1114         dw, err := ctx.NewDirWatcher(
1115                 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1116                 time.Second,
1117         )
1118         if err != nil {
1119                 log.Fatalln(err)
1120         }
1121         finish := make(chan struct{})
1122         badCode := make(chan bool)
1123         go func() {
1124                 bad := false
1125                 for {
1126                         select {
1127                         case <-finish:
1128                                 dw.Close()
1129                                 badCode <- bad
1130                                 return
1131                         case <-dw.C:
1132                                 bad = !ctx.Toss(nodeId, TRx, opts) || bad
1133                         }
1134                 }
1135         }()
1136         return finish, badCode
1137 }