]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
0c27289e2cf55359ed8398f58d086c1b1eb97c88
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenDir = "seen"
47 )
48
49 func jobPath2Seen(jobPath string) string {
50         return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
51 }
52
53 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
54         lines := []string{
55                 "From: " + fromTo.From,
56                 "To: " + fromTo.To,
57                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
58         }
59         if len(body) > 0 {
60                 lines = append(
61                         lines,
62                         "MIME-Version: 1.0",
63                         "Content-Type: text/plain; charset=utf-8",
64                         "Content-Transfer-Encoding: base64",
65                         "",
66                         base64.StdEncoding.EncodeToString(body),
67                 )
68         }
69         return strings.NewReader(strings.Join(lines, "\n"))
70 }
71
72 func pktSizeWithoutEnc(pktSize int64) int64 {
73         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
74         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
75         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
76                 pktSize -= poly1305.TagSize
77         }
78         pktSize -= pktSizeBlocks * poly1305.TagSize
79         return pktSize
80 }
81
82 var JobRepeatProcess = errors.New("needs processing repeat")
83
84 func jobProcess(
85         ctx *Ctx,
86         pipeR *io.PipeReader,
87         pktName string,
88         les LEs,
89         sender *Node,
90         nice uint8,
91         pktSize uint64,
92         jobPath string,
93         decompressor *zstd.Decoder,
94         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
95 ) error {
96         defer pipeR.Close()
97         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
98         var pkt Pkt
99         _, err := xdr.Unmarshal(pipeR, &pkt)
100         if err != nil {
101                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
102                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
103                 })
104                 return err
105         }
106         les = append(les, LE{"Size", int64(pktSize)})
107         ctx.LogD("rx", les, func(les LEs) string {
108                 return fmt.Sprintf(
109                         "Tossing %s/%s (%s)",
110                         sender.Name, pktName,
111                         humanize.IBytes(pktSize),
112                 )
113         })
114         switch pkt.Type {
115         case PktTypeExec, PktTypeExecFat:
116                 if noExec {
117                         return nil
118                 }
119                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
120                 handle := string(path[0])
121                 args := make([]string, 0, len(path)-1)
122                 for _, p := range path[1:] {
123                         args = append(args, string(p))
124                 }
125                 argsStr := strings.Join(append([]string{handle}, args...), " ")
126                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
127                 cmdline := sender.Exec[handle]
128                 if len(cmdline) == 0 {
129                         err = errors.New("No handle found")
130                         ctx.LogE(
131                                 "rx-no-handle", les, err,
132                                 func(les LEs) string {
133                                         return fmt.Sprintf(
134                                                 "Tossing exec %s/%s (%s): %s",
135                                                 sender.Name, pktName,
136                                                 humanize.IBytes(pktSize), argsStr,
137                                         )
138                                 },
139                         )
140                         return err
141                 }
142                 if pkt.Type == PktTypeExec {
143                         if err = decompressor.Reset(pipeR); err != nil {
144                                 log.Fatalln(err)
145                         }
146                 }
147                 if !dryRun {
148                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
149                         cmd.Env = append(
150                                 cmd.Env,
151                                 "NNCP_SELF="+ctx.Self.Id.String(),
152                                 "NNCP_SENDER="+sender.Id.String(),
153                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
154                         )
155                         if pkt.Type == PktTypeExec {
156                                 cmd.Stdin = decompressor
157                         } else {
158                                 cmd.Stdin = pipeR
159                         }
160                         output, err := cmd.CombinedOutput()
161                         if err != nil {
162                                 les = append(les, LE{"Output", strings.Split(
163                                         strings.Trim(string(output), "\n"), "\n"),
164                                 })
165                                 ctx.LogE("rx-handle", les, err, func(les LEs) string {
166                                         return fmt.Sprintf(
167                                                 "Tossing exec %s/%s (%s): %s: handling",
168                                                 sender.Name, pktName,
169                                                 humanize.IBytes(uint64(pktSize)), argsStr,
170                                         )
171                                 })
172                                 return err
173                         }
174                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
175                                 notify := ctx.NotifyExec[sender.Name+"."+handle]
176                                 if notify == nil {
177                                         notify = ctx.NotifyExec["*."+handle]
178                                 }
179                                 if notify != nil {
180                                         cmd := exec.Command(
181                                                 sendmail[0],
182                                                 append(sendmail[1:], notify.To)...,
183                                         )
184                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
185                                                 "Exec from %s: %s", sender.Name, argsStr,
186                                         ), output)
187                                         if err = cmd.Run(); err != nil {
188                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
189                                                         return fmt.Sprintf(
190                                                                 "Tossing exec %s/%s (%s): %s: notifying",
191                                                                 sender.Name, pktName,
192                                                                 humanize.IBytes(pktSize), argsStr,
193                                                         )
194                                                 })
195                                         }
196                                 }
197                         }
198                 }
199                 ctx.LogI("rx", les, func(les LEs) string {
200                         return fmt.Sprintf(
201                                 "Got exec from %s to %s (%s)",
202                                 sender.Name, argsStr,
203                                 humanize.IBytes(pktSize),
204                         )
205                 })
206                 if !dryRun && jobPath != "" {
207                         if doSeen {
208                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
209                                         return err
210                                 }
211                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
212                                         fd.Close()
213                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
214                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
215                                                         return fmt.Sprintf(
216                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
217                                                                 sender.Name, pktName,
218                                                                 humanize.IBytes(pktSize),
219                                                                 filepath.Base(jobPath),
220                                                         )
221                                                 })
222                                                 return err
223                                         }
224                                 }
225                         }
226                         if err = os.Remove(jobPath); err != nil {
227                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
228                                         return fmt.Sprintf(
229                                                 "Tossing exec %s/%s (%s): %s: notifying",
230                                                 sender.Name, pktName,
231                                                 humanize.IBytes(pktSize), argsStr,
232                                         )
233                                 })
234                                 return err
235                         } else if ctx.HdrUsage {
236                                 os.Remove(JobPath2Hdr(jobPath))
237                         }
238                 }
239
240         case PktTypeFile:
241                 if noFile {
242                         return nil
243                 }
244                 dst := string(pkt.Path[:int(pkt.PathLen)])
245                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
246                 if filepath.IsAbs(dst) {
247                         err = errors.New("non-relative destination path")
248                         ctx.LogE(
249                                 "rx-non-rel", les, err,
250                                 func(les LEs) string {
251                                         return fmt.Sprintf(
252                                                 "Tossing file %s/%s (%s): %s",
253                                                 sender.Name, pktName,
254                                                 humanize.IBytes(pktSize), dst,
255                                         )
256                                 },
257                         )
258                         return err
259                 }
260                 incoming := sender.Incoming
261                 if incoming == nil {
262                         err = errors.New("incoming is not allowed")
263                         ctx.LogE(
264                                 "rx-no-incoming", les, err,
265                                 func(les LEs) string {
266                                         return fmt.Sprintf(
267                                                 "Tossing file %s/%s (%s): %s",
268                                                 sender.Name, pktName,
269                                                 humanize.IBytes(pktSize), dst,
270                                         )
271                                 },
272                         )
273                         return err
274                 }
275                 dir := filepath.Join(*incoming, path.Dir(dst))
276                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
277                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
278                                 return fmt.Sprintf(
279                                         "Tossing file %s/%s (%s): %s: mkdir",
280                                         sender.Name, pktName,
281                                         humanize.IBytes(pktSize), dst,
282                                 )
283                         })
284                         return err
285                 }
286                 if !dryRun {
287                         tmp, err := TempFile(dir, "file")
288                         if err != nil {
289                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
290                                         return fmt.Sprintf(
291                                                 "Tossing file %s/%s (%s): %s: mktemp",
292                                                 sender.Name, pktName,
293                                                 humanize.IBytes(pktSize), dst,
294                                         )
295                                 })
296                                 return err
297                         }
298                         les = append(les, LE{"Tmp", tmp.Name()})
299                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
300                                 return fmt.Sprintf(
301                                         "Tossing file %s/%s (%s): %s: created: %s",
302                                         sender.Name, pktName,
303                                         humanize.IBytes(pktSize), dst, tmp.Name(),
304                                 )
305                         })
306                         bufW := bufio.NewWriter(tmp)
307                         if _, err = CopyProgressed(
308                                 bufW, pipeR, "Rx file",
309                                 append(les, LE{"FullSize", int64(pktSize)}),
310                                 ctx.ShowPrgrs,
311                         ); err != nil {
312                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
313                                         return fmt.Sprintf(
314                                                 "Tossing file %s/%s (%s): %s: copying",
315                                                 sender.Name, pktName,
316                                                 humanize.IBytes(pktSize), dst,
317                                         )
318                                 })
319                                 return err
320                         }
321                         if err = bufW.Flush(); err != nil {
322                                 tmp.Close()
323                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
324                                         return fmt.Sprintf(
325                                                 "Tossing file %s/%s (%s): %s: flushing",
326                                                 sender.Name, pktName,
327                                                 humanize.IBytes(pktSize), dst,
328                                         )
329                                 })
330                                 return err
331                         }
332                         if err = tmp.Sync(); err != nil {
333                                 tmp.Close()
334                                 ctx.LogE("rx-sync", les, err, func(les LEs) string {
335                                         return fmt.Sprintf(
336                                                 "Tossing file %s/%s (%s): %s: syncing",
337                                                 sender.Name, pktName,
338                                                 humanize.IBytes(pktSize), dst,
339                                         )
340                                 })
341                                 return err
342                         }
343                         if err = tmp.Close(); err != nil {
344                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
345                                         return fmt.Sprintf(
346                                                 "Tossing file %s/%s (%s): %s: closing",
347                                                 sender.Name, pktName,
348                                                 humanize.IBytes(pktSize), dst,
349                                         )
350                                 })
351                                 return err
352                         }
353                         dstPathOrig := filepath.Join(*incoming, dst)
354                         dstPath := dstPathOrig
355                         dstPathCtr := 0
356                         for {
357                                 if _, err = os.Stat(dstPath); err != nil {
358                                         if os.IsNotExist(err) {
359                                                 break
360                                         }
361                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
362                                                 return fmt.Sprintf(
363                                                         "Tossing file %s/%s (%s): %s: stating: %s",
364                                                         sender.Name, pktName,
365                                                         humanize.IBytes(pktSize), dst, dstPath,
366                                                 )
367                                         })
368                                         return err
369                                 }
370                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
371                                 dstPathCtr++
372                         }
373                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
374                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
375                                         return fmt.Sprintf(
376                                                 "Tossing file %s/%s (%s): %s: renaming",
377                                                 sender.Name, pktName,
378                                                 humanize.IBytes(pktSize), dst,
379                                         )
380                                 })
381                                 return err
382                         }
383                         if err = DirSync(*incoming); err != nil {
384                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
385                                         return fmt.Sprintf(
386                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
387                                                 sender.Name, pktName,
388                                                 humanize.IBytes(pktSize), dst,
389                                         )
390                                 })
391                                 return err
392                         }
393                         les = les[:len(les)-1] // delete Tmp
394                 }
395                 ctx.LogI("rx", les, func(les LEs) string {
396                         return fmt.Sprintf(
397                                 "Got file %s (%s) from %s",
398                                 dst, humanize.IBytes(pktSize), sender.Name,
399                         )
400                 })
401                 if !dryRun {
402                         if jobPath != "" {
403                                 if doSeen {
404                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
405                                                 return err
406                                         }
407                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
408                                                 fd.Close()
409                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
410                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
411                                                                 return fmt.Sprintf(
412                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
413                                                                         sender.Name, pktName,
414                                                                         humanize.IBytes(pktSize),
415                                                                         filepath.Base(jobPath),
416                                                                 )
417                                                         })
418                                                         return err
419                                                 }
420                                         }
421                                 }
422                                 if err = os.Remove(jobPath); err != nil {
423                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
424                                                 return fmt.Sprintf(
425                                                         "Tossing file %s/%s (%s): %s: removing",
426                                                         sender.Name, pktName,
427                                                         humanize.IBytes(pktSize), dst,
428                                                 )
429                                         })
430                                         return err
431                                 } else if ctx.HdrUsage {
432                                         os.Remove(JobPath2Hdr(jobPath))
433                                 }
434                         }
435                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
436                                 cmd := exec.Command(
437                                         sendmail[0],
438                                         append(sendmail[1:], ctx.NotifyFile.To)...,
439                                 )
440                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
441                                         "File from %s: %s (%s)",
442                                         sender.Name, dst, humanize.IBytes(pktSize),
443                                 ), nil)
444                                 if err = cmd.Run(); err != nil {
445                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
446                                                 return fmt.Sprintf(
447                                                         "Tossing file %s/%s (%s): %s: notifying",
448                                                         sender.Name, pktName,
449                                                         humanize.IBytes(pktSize), dst,
450                                                 )
451                                         })
452                                 }
453                         }
454                 }
455
456         case PktTypeFreq:
457                 if noFreq {
458                         return nil
459                 }
460                 src := string(pkt.Path[:int(pkt.PathLen)])
461                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
462                 if filepath.IsAbs(src) {
463                         err = errors.New("non-relative source path")
464                         ctx.LogE(
465                                 "rx-non-rel", les, err,
466                                 func(les LEs) string {
467                                         return fmt.Sprintf(
468                                                 "Tossing freq %s/%s (%s): %s: notifying",
469                                                 sender.Name, pktName,
470                                                 humanize.IBytes(pktSize), src,
471                                         )
472                                 },
473                         )
474                         return err
475                 }
476                 dstRaw, err := ioutil.ReadAll(pipeR)
477                 if err != nil {
478                         ctx.LogE("rx-read", les, err, func(les LEs) string {
479                                 return fmt.Sprintf(
480                                         "Tossing freq %s/%s (%s): %s: reading",
481                                         sender.Name, pktName,
482                                         humanize.IBytes(pktSize), src,
483                                 )
484                         })
485                         return err
486                 }
487                 dst := string(dstRaw)
488                 les = append(les, LE{"Dst", dst})
489                 freqPath := sender.FreqPath
490                 if freqPath == nil {
491                         err = errors.New("freqing is not allowed")
492                         ctx.LogE(
493                                 "rx-no-freq", les, err,
494                                 func(les LEs) string {
495                                         return fmt.Sprintf(
496                                                 "Tossing freq %s/%s (%s): %s -> %s",
497                                                 sender.Name, pktName,
498                                                 humanize.IBytes(pktSize), src, dst,
499                                         )
500                                 },
501                         )
502                         return err
503                 }
504                 if !dryRun {
505                         err = ctx.TxFile(
506                                 sender,
507                                 pkt.Nice,
508                                 filepath.Join(*freqPath, src),
509                                 dst,
510                                 sender.FreqChunked,
511                                 sender.FreqMinSize,
512                                 sender.FreqMaxSize,
513                                 nil,
514                         )
515                         if err != nil {
516                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
517                                         return fmt.Sprintf(
518                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
519                                                 sender.Name, pktName,
520                                                 humanize.IBytes(pktSize), src, dst,
521                                         )
522                                 })
523                                 return err
524                         }
525                 }
526                 ctx.LogI("rx", les, func(les LEs) string {
527                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
528                 })
529                 if !dryRun {
530                         if jobPath != "" {
531                                 if doSeen {
532                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
533                                                 return err
534                                         }
535                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
536                                                 fd.Close()
537                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
538                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
539                                                                 return fmt.Sprintf(
540                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
541                                                                         sender.Name, pktName,
542                                                                         humanize.IBytes(pktSize),
543                                                                         filepath.Base(jobPath),
544                                                                 )
545                                                         })
546                                                         return err
547                                                 }
548                                         }
549                                 }
550                                 if err = os.Remove(jobPath); err != nil {
551                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
552                                                 return fmt.Sprintf(
553                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
554                                                         sender.Name, pktName,
555                                                         humanize.IBytes(pktSize), src, dst,
556                                                 )
557                                         })
558                                         return err
559                                 } else if ctx.HdrUsage {
560                                         os.Remove(JobPath2Hdr(jobPath))
561                                 }
562                         }
563                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
564                                 cmd := exec.Command(
565                                         sendmail[0],
566                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
567                                 )
568                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
569                                         "Freq from %s: %s", sender.Name, src,
570                                 ), nil)
571                                 if err = cmd.Run(); err != nil {
572                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
573                                                 return fmt.Sprintf(
574                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
575                                                         sender.Name, pktName,
576                                                         humanize.IBytes(pktSize), src, dst,
577                                                 )
578                                         })
579                                 }
580                         }
581                 }
582
583         case PktTypeTrns:
584                 if noTrns {
585                         return nil
586                 }
587                 dst := new([MTHSize]byte)
588                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
589                 nodeId := NodeId(*dst)
590                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
591                 logMsg := func(les LEs) string {
592                         return fmt.Sprintf(
593                                 "Tossing trns %s/%s (%s): %s",
594                                 sender.Name, pktName,
595                                 humanize.IBytes(pktSize),
596                                 nodeId.String(),
597                         )
598                 }
599                 node := ctx.Neigh[nodeId]
600                 if node == nil {
601                         err = errors.New("unknown node")
602                         ctx.LogE("rx-unknown", les, err, logMsg)
603                         return err
604                 }
605                 ctx.LogD("rx-tx", les, logMsg)
606                 if !dryRun {
607                         if len(node.Via) == 0 {
608                                 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
609                                         ctx.LogE("rx", les, err, func(les LEs) string {
610                                                 return logMsg(les) + ": txing"
611                                         })
612                                         return err
613                                 }
614                         } else {
615                                 via := node.Via[:len(node.Via)-1]
616                                 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
617                                 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
618                                 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
619                                 if err != nil {
620                                         panic(err)
621                                 }
622                                 if _, _, err = ctx.Tx(
623                                         node,
624                                         pktTrns,
625                                         nice,
626                                         int64(pktSize), 0, MaxFileSize,
627                                         pipeR,
628                                         pktName,
629                                         nil,
630                                 ); err != nil {
631                                         ctx.LogE("rx", les, err, func(les LEs) string {
632                                                 return logMsg(les) + ": txing"
633                                         })
634                                         return err
635                                 }
636                         }
637                 }
638                 ctx.LogI("rx", les, func(les LEs) string {
639                         return fmt.Sprintf(
640                                 "Got transitional packet from %s to %s (%s)",
641                                 sender.Name,
642                                 ctx.NodeName(&nodeId),
643                                 humanize.IBytes(pktSize),
644                         )
645                 })
646                 if !dryRun && jobPath != "" {
647                         if doSeen {
648                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
649                                         return err
650                                 }
651                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
652                                         fd.Close()
653                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
654                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
655                                                         return fmt.Sprintf(
656                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
657                                                                 sender.Name, pktName,
658                                                                 humanize.IBytes(pktSize),
659                                                                 filepath.Base(jobPath),
660                                                         )
661                                                 })
662                                                 return err
663                                         }
664                                 }
665                         }
666                         if err = os.Remove(jobPath); err != nil {
667                                 ctx.LogE("rx", les, err, func(les LEs) string {
668                                         return fmt.Sprintf(
669                                                 "Tossing trns %s/%s (%s): %s: removing",
670                                                 sender.Name, pktName,
671                                                 humanize.IBytes(pktSize),
672                                                 ctx.NodeName(&nodeId),
673                                         )
674                                 })
675                                 return err
676                         } else if ctx.HdrUsage {
677                                 os.Remove(JobPath2Hdr(jobPath))
678                         }
679                 }
680
681         case PktTypeArea:
682                 if noArea {
683                         return nil
684                 }
685                 areaId := new(AreaId)
686                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
687                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
688                 logMsg := func(les LEs) string {
689                         return fmt.Sprintf(
690                                 "Tossing %s/%s (%s): area %s",
691                                 sender.Name, pktName,
692                                 humanize.IBytes(pktSize),
693                                 ctx.AreaName(areaId),
694                         )
695                 }
696                 area := ctx.AreaId2Area[*areaId]
697                 if area == nil {
698                         err = errors.New("unknown area")
699                         ctx.LogE("rx-area-unknown", les, err, logMsg)
700                         return err
701                 }
702                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
703                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
704                 if err != nil {
705                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
706                         return err
707                 }
708                 msgHashRaw := blake2b.Sum256(pktEncRaw)
709                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
710                 les = append(les, LE{"AreaMsg", msgHash})
711                 ctx.LogD("rx-area", les, logMsg)
712
713                 if dryRun {
714                         for _, nodeId := range area.Subs {
715                                 node := ctx.Neigh[*nodeId]
716                                 lesEcho := append(les, LE{"Echo", nodeId})
717                                 seenDir := filepath.Join(
718                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
719                                 )
720                                 seenPath := filepath.Join(seenDir, msgHash)
721                                 logMsgNode := func(les LEs) string {
722                                         return fmt.Sprintf(
723                                                 "%s: echoing to: %s", logMsg(les), node.Name,
724                                         )
725                                 }
726                                 if _, err := os.Stat(seenPath); err == nil {
727                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
728                                                 return logMsgNode(les) + ": already sent"
729                                         })
730                                         continue
731                                 }
732                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
733                         }
734                 } else {
735                         for _, nodeId := range area.Subs {
736                                 node := ctx.Neigh[*nodeId]
737                                 lesEcho := append(les, LE{"Echo", nodeId})
738                                 seenDir := filepath.Join(
739                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
740                                 )
741                                 seenPath := filepath.Join(seenDir, msgHash)
742                                 logMsgNode := func(les LEs) string {
743                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
744                                 }
745                                 if _, err := os.Stat(seenPath); err == nil {
746                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
747                                                 return logMsgNode(les) + ": already sent"
748                                         })
749                                         continue
750                                 }
751                                 if nodeId != sender.Id && nodeId != pktEnc.Sender {
752                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
753                                         if _, _, err = ctx.Tx(
754                                                 node,
755                                                 &pkt,
756                                                 nice,
757                                                 int64(pktSize), 0, MaxFileSize,
758                                                 fullPipeR,
759                                                 pktName,
760                                                 nil,
761                                         ); err != nil {
762                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
763                                                 return err
764                                         }
765                                 }
766                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
767                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
768                                         return err
769                                 }
770                                 if fd, err := os.Create(seenPath); err == nil {
771                                         fd.Close()
772                                         if err = DirSync(seenDir); err != nil {
773                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
774                                                 return err
775                                         }
776                                 } else {
777                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
778                                         return err
779                                 }
780                                 return JobRepeatProcess
781                         }
782                 }
783
784                 seenDir := filepath.Join(
785                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
786                 )
787                 seenPath := filepath.Join(seenDir, msgHash)
788                 if _, err := os.Stat(seenPath); err == nil {
789                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
790                                 return logMsg(les) + ": already seen"
791                         })
792                         if !dryRun && jobPath != "" {
793                                 if err = os.Remove(jobPath); err != nil {
794                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
795                                                 return fmt.Sprintf(
796                                                         "Tossing area %s/%s (%s): %s: removing",
797                                                         sender.Name, pktName,
798                                                         humanize.IBytes(pktSize),
799                                                         msgHash,
800                                                 )
801                                         })
802                                         return err
803                                 } else if ctx.HdrUsage {
804                                         os.Remove(JobPath2Hdr(jobPath))
805                                 }
806                         }
807                         return nil
808                 }
809
810                 if area.Prv == nil {
811                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
812                                 return logMsg(les) + ": no private key for decoding"
813                         })
814                 } else {
815                         signatureVerify := true
816                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
817                                 if !area.AllowUnknown {
818                                         err = errors.New("unknown sender")
819                                         ctx.LogE(
820                                                 "rx-area-unknown",
821                                                 append(les, LE{"Sender", pktEnc.Sender}),
822                                                 err,
823                                                 func(les LEs) string {
824                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
825                                                 },
826                                         )
827                                         return err
828                                 }
829                                 signatureVerify = false
830                         }
831                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
832                         copy(areaNodeOur.Id[:], area.Id[:])
833                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
834                         areaNode := Node{
835                                 Id:       new(NodeId),
836                                 Name:     area.Name,
837                                 Incoming: area.Incoming,
838                                 Exec:     area.Exec,
839                         }
840                         copy(areaNode.Id[:], area.Id[:])
841                         pktName := fmt.Sprintf(
842                                 "area/%s/%s",
843                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
844                         )
845
846                         pipeR, pipeW := io.Pipe()
847                         errs := make(chan error, 1)
848                         go func() {
849                                 errs <- jobProcess(
850                                         ctx,
851                                         pipeR,
852                                         pktName,
853                                         les,
854                                         &areaNode,
855                                         nice,
856                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
857                                         "",
858                                         decompressor,
859                                         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
860                                 )
861                         }()
862                         _, _, _, err = PktEncRead(
863                                 &areaNodeOur,
864                                 ctx.Neigh,
865                                 fullPipeR,
866                                 pipeW,
867                                 signatureVerify,
868                                 nil,
869                         )
870                         if err != nil {
871                                 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
872                                 pipeW.CloseWithError(err)
873                                 <-errs
874                                 return err
875                         }
876                         pipeW.Close()
877                         if err = <-errs; err != nil {
878                                 return err
879                         }
880                 }
881
882                 if !dryRun && jobPath != "" {
883                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
884                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
885                                 return err
886                         }
887                         if fd, err := os.Create(seenPath); err == nil {
888                                 fd.Close()
889                                 if err = DirSync(seenDir); err != nil {
890                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
891                                         return err
892                                 }
893                         }
894                         if err = os.Remove(jobPath); err != nil {
895                                 ctx.LogE("rx", les, err, func(les LEs) string {
896                                         return fmt.Sprintf(
897                                                 "Tossing area %s/%s (%s): %s: removing",
898                                                 sender.Name, pktName,
899                                                 humanize.IBytes(pktSize),
900                                                 msgHash,
901                                         )
902                                 })
903                                 return err
904                         } else if ctx.HdrUsage {
905                                 os.Remove(JobPath2Hdr(jobPath))
906                         }
907                 }
908
909         default:
910                 err = errors.New("unknown type")
911                 ctx.LogE(
912                         "rx-type-unknown", les, err,
913                         func(les LEs) string {
914                                 return fmt.Sprintf(
915                                         "Tossing %s/%s (%s)",
916                                         sender.Name, pktName, humanize.IBytes(pktSize),
917                                 )
918                         },
919                 )
920                 return err
921         }
922         return nil
923 }
924
925 func (ctx *Ctx) Toss(
926         nodeId *NodeId,
927         xx TRxTx,
928         nice uint8,
929         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
930 ) bool {
931         dirLock, err := ctx.LockDir(nodeId, "toss")
932         if err != nil {
933                 return false
934         }
935         defer ctx.UnlockDir(dirLock)
936         isBad := false
937         decompressor, err := zstd.NewReader(nil)
938         if err != nil {
939                 panic(err)
940         }
941         defer decompressor.Close()
942         for job := range ctx.Jobs(nodeId, xx) {
943                 pktName := filepath.Base(job.Path)
944                 les := LEs{
945                         {"Node", job.PktEnc.Sender},
946                         {"Pkt", pktName},
947                         {"Nice", int(job.PktEnc.Nice)},
948                 }
949                 if job.PktEnc.Nice > nice {
950                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
951                                 return fmt.Sprintf(
952                                         "Tossing %s/%s: too nice: %s",
953                                         ctx.NodeName(job.PktEnc.Sender), pktName,
954                                         NicenessFmt(job.PktEnc.Nice),
955                                 )
956                         })
957                         continue
958                 }
959                 fd, err := os.Open(job.Path)
960                 if err != nil {
961                         ctx.LogE("rx-open", les, err, func(les LEs) string {
962                                 return fmt.Sprintf(
963                                         "Tossing %s/%s: opening %s",
964                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
965                                 )
966                         })
967                         isBad = true
968                         continue
969                 }
970                 sender := ctx.Neigh[*job.PktEnc.Sender]
971                 if sender == nil {
972                         err := errors.New("unknown node")
973                         ctx.LogE("rx-open", les, err, func(les LEs) string {
974                                 return fmt.Sprintf(
975                                         "Tossing %s/%s",
976                                         ctx.NodeName(job.PktEnc.Sender), pktName,
977                                 )
978                         })
979                         isBad = true
980                         continue
981                 }
982                 errs := make(chan error, 1)
983                 var sharedKey []byte
984         Retry:
985                 pipeR, pipeW := io.Pipe()
986                 go func() {
987                         errs <- jobProcess(
988                                 ctx,
989                                 pipeR,
990                                 pktName,
991                                 les,
992                                 sender,
993                                 job.PktEnc.Nice,
994                                 uint64(pktSizeWithoutEnc(job.Size)),
995                                 job.Path,
996                                 decompressor,
997                                 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
998                         )
999                 }()
1000                 pipeWB := bufio.NewWriter(pipeW)
1001                 sharedKey, _, _, err = PktEncRead(
1002                         ctx.Self,
1003                         ctx.Neigh,
1004                         bufio.NewReader(fd),
1005                         pipeWB,
1006                         sharedKey == nil,
1007                         sharedKey,
1008                 )
1009                 if err != nil {
1010                         pipeW.CloseWithError(err)
1011                 }
1012                 if err := pipeWB.Flush(); err != nil {
1013                         pipeW.CloseWithError(err)
1014                 }
1015                 pipeW.Close()
1016
1017                 if err != nil {
1018                         isBad = true
1019                         fd.Close()
1020                         <-errs
1021                         continue
1022                 }
1023                 if err = <-errs; err == JobRepeatProcess {
1024                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1025                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1026                                         return fmt.Sprintf(
1027                                                 "Tossing %s/%s: can not seek",
1028                                                 ctx.NodeName(job.PktEnc.Sender),
1029                                                 pktName,
1030                                         )
1031                                 })
1032                                 isBad = true
1033                                 break
1034                         }
1035                         goto Retry
1036                 } else if err != nil {
1037                         isBad = true
1038                 }
1039                 fd.Close()
1040         }
1041         return isBad
1042 }
1043
1044 func (ctx *Ctx) AutoToss(
1045         nodeId *NodeId,
1046         nice uint8,
1047         doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
1048 ) (chan struct{}, chan bool) {
1049         dw, err := ctx.NewDirWatcher(
1050                 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1051                 time.Second,
1052         )
1053         if err != nil {
1054                 log.Fatalln(err)
1055         }
1056         finish := make(chan struct{})
1057         badCode := make(chan bool)
1058         go func() {
1059                 bad := false
1060                 for {
1061                         select {
1062                         case <-finish:
1063                                 dw.Close()
1064                                 badCode <- bad
1065                                 return
1066                         case <-dw.C:
1067                                 bad = !ctx.Toss(
1068                                         nodeId, TRx, nice, false,
1069                                         doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
1070                         }
1071                 }
1072         }()
1073         return finish, badCode
1074 }