]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
2fab0a2e8cb3a304323e2e65a6c5bfdf687430d5
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "log"
28         "mime"
29         "os"
30         "os/exec"
31         "path"
32         "path/filepath"
33         "strconv"
34         "strings"
35         "time"
36
37         xdr "github.com/davecgh/go-xdr/xdr2"
38         "github.com/dustin/go-humanize"
39         "github.com/klauspost/compress/zstd"
40         "golang.org/x/crypto/blake2b"
41         "golang.org/x/crypto/poly1305"
42 )
43
44 const (
45         SeenDir = "seen"
46 )
47
48 func jobPath2Seen(jobPath string) string {
49         return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
50 }
51
52 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
53         lines := []string{
54                 "From: " + fromTo.From,
55                 "To: " + fromTo.To,
56                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
57         }
58         if len(body) > 0 {
59                 lines = append(
60                         lines,
61                         "MIME-Version: 1.0",
62                         "Content-Type: text/plain; charset=utf-8",
63                         "Content-Transfer-Encoding: base64",
64                         "",
65                         base64.StdEncoding.EncodeToString(body),
66                 )
67         }
68         return strings.NewReader(strings.Join(lines, "\n"))
69 }
70
71 func pktSizeWithoutEnc(pktSize int64) int64 {
72         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
73         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
74         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
75                 pktSize -= poly1305.TagSize
76         }
77         pktSize -= pktSizeBlocks * poly1305.TagSize
78         return pktSize
79 }
80
81 var JobRepeatProcess = errors.New("needs processing repeat")
82
83 func jobProcess(
84         ctx *Ctx,
85         pipeR *io.PipeReader,
86         pktName string,
87         les LEs,
88         sender *Node,
89         nice uint8,
90         pktSize uint64,
91         jobPath string,
92         decompressor *zstd.Decoder,
93         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
94 ) error {
95         defer pipeR.Close()
96         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
97         var pkt Pkt
98         _, err := xdr.Unmarshal(pipeR, &pkt)
99         if err != nil {
100                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
101                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
102                 })
103                 return err
104         }
105         les = append(les, LE{"Size", int64(pktSize)})
106         ctx.LogD("rx", les, func(les LEs) string {
107                 return fmt.Sprintf(
108                         "Tossing %s/%s (%s)",
109                         sender.Name, pktName,
110                         humanize.IBytes(pktSize),
111                 )
112         })
113         switch pkt.Type {
114         case PktTypeExec, PktTypeExecFat:
115                 if noExec {
116                         return nil
117                 }
118                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
119                 handle := string(path[0])
120                 args := make([]string, 0, len(path)-1)
121                 for _, p := range path[1:] {
122                         args = append(args, string(p))
123                 }
124                 argsStr := strings.Join(append([]string{handle}, args...), " ")
125                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
126                 cmdline := sender.Exec[handle]
127                 if len(cmdline) == 0 {
128                         err = errors.New("No handle found")
129                         ctx.LogE(
130                                 "rx-no-handle", les, err,
131                                 func(les LEs) string {
132                                         return fmt.Sprintf(
133                                                 "Tossing exec %s/%s (%s): %s",
134                                                 sender.Name, pktName,
135                                                 humanize.IBytes(pktSize), argsStr,
136                                         )
137                                 },
138                         )
139                         return err
140                 }
141                 if pkt.Type == PktTypeExec {
142                         if err = decompressor.Reset(pipeR); err != nil {
143                                 log.Fatalln(err)
144                         }
145                 }
146                 if !dryRun {
147                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
148                         cmd.Env = append(
149                                 cmd.Env,
150                                 "NNCP_SELF="+ctx.Self.Id.String(),
151                                 "NNCP_SENDER="+sender.Id.String(),
152                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
153                         )
154                         if pkt.Type == PktTypeExec {
155                                 cmd.Stdin = decompressor
156                         } else {
157                                 cmd.Stdin = pipeR
158                         }
159                         output, err := cmd.CombinedOutput()
160                         if err != nil {
161                                 les = append(les, LE{"Output", strings.Split(
162                                         strings.Trim(string(output), "\n"), "\n"),
163                                 })
164                                 ctx.LogE("rx-handle", les, err, func(les LEs) string {
165                                         return fmt.Sprintf(
166                                                 "Tossing exec %s/%s (%s): %s: handling",
167                                                 sender.Name, pktName,
168                                                 humanize.IBytes(uint64(pktSize)), argsStr,
169                                         )
170                                 })
171                                 return err
172                         }
173                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
174                                 notify := ctx.NotifyExec[sender.Name+"."+handle]
175                                 if notify == nil {
176                                         notify = ctx.NotifyExec["*."+handle]
177                                 }
178                                 if notify != nil {
179                                         cmd := exec.Command(
180                                                 sendmail[0],
181                                                 append(sendmail[1:], notify.To)...,
182                                         )
183                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
184                                                 "Exec from %s: %s", sender.Name, argsStr,
185                                         ), output)
186                                         if err = cmd.Run(); err != nil {
187                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
188                                                         return fmt.Sprintf(
189                                                                 "Tossing exec %s/%s (%s): %s: notifying",
190                                                                 sender.Name, pktName,
191                                                                 humanize.IBytes(pktSize), argsStr,
192                                                         )
193                                                 })
194                                         }
195                                 }
196                         }
197                 }
198                 ctx.LogI("rx", les, func(les LEs) string {
199                         return fmt.Sprintf(
200                                 "Got exec from %s to %s (%s)",
201                                 sender.Name, argsStr,
202                                 humanize.IBytes(pktSize),
203                         )
204                 })
205                 if !dryRun && jobPath != "" {
206                         if doSeen {
207                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
208                                         return err
209                                 }
210                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
211                                         fd.Close()
212                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
213                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
214                                                         return fmt.Sprintf(
215                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
216                                                                 sender.Name, pktName,
217                                                                 humanize.IBytes(pktSize),
218                                                                 filepath.Base(jobPath),
219                                                         )
220                                                 })
221                                                 return err
222                                         }
223                                 }
224                         }
225                         if err = os.Remove(jobPath); err != nil {
226                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
227                                         return fmt.Sprintf(
228                                                 "Tossing exec %s/%s (%s): %s: notifying",
229                                                 sender.Name, pktName,
230                                                 humanize.IBytes(pktSize), argsStr,
231                                         )
232                                 })
233                                 return err
234                         } else if ctx.HdrUsage {
235                                 os.Remove(JobPath2Hdr(jobPath))
236                         }
237                 }
238
239         case PktTypeFile:
240                 if noFile {
241                         return nil
242                 }
243                 dst := string(pkt.Path[:int(pkt.PathLen)])
244                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
245                 if filepath.IsAbs(dst) {
246                         err = errors.New("non-relative destination path")
247                         ctx.LogE(
248                                 "rx-non-rel", les, err,
249                                 func(les LEs) string {
250                                         return fmt.Sprintf(
251                                                 "Tossing file %s/%s (%s): %s",
252                                                 sender.Name, pktName,
253                                                 humanize.IBytes(pktSize), dst,
254                                         )
255                                 },
256                         )
257                         return err
258                 }
259                 incoming := sender.Incoming
260                 if incoming == nil {
261                         err = errors.New("incoming is not allowed")
262                         ctx.LogE(
263                                 "rx-no-incoming", les, err,
264                                 func(les LEs) string {
265                                         return fmt.Sprintf(
266                                                 "Tossing file %s/%s (%s): %s",
267                                                 sender.Name, pktName,
268                                                 humanize.IBytes(pktSize), dst,
269                                         )
270                                 },
271                         )
272                         return err
273                 }
274                 dir := filepath.Join(*incoming, path.Dir(dst))
275                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
276                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
277                                 return fmt.Sprintf(
278                                         "Tossing file %s/%s (%s): %s: mkdir",
279                                         sender.Name, pktName,
280                                         humanize.IBytes(pktSize), dst,
281                                 )
282                         })
283                         return err
284                 }
285                 if !dryRun {
286                         tmp, err := TempFile(dir, "file")
287                         if err != nil {
288                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
289                                         return fmt.Sprintf(
290                                                 "Tossing file %s/%s (%s): %s: mktemp",
291                                                 sender.Name, pktName,
292                                                 humanize.IBytes(pktSize), dst,
293                                         )
294                                 })
295                                 return err
296                         }
297                         les = append(les, LE{"Tmp", tmp.Name()})
298                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
299                                 return fmt.Sprintf(
300                                         "Tossing file %s/%s (%s): %s: created: %s",
301                                         sender.Name, pktName,
302                                         humanize.IBytes(pktSize), dst, tmp.Name(),
303                                 )
304                         })
305                         bufW := bufio.NewWriter(tmp)
306                         if _, err = CopyProgressed(
307                                 bufW, pipeR, "Rx file",
308                                 append(les, LE{"FullSize", int64(pktSize)}),
309                                 ctx.ShowPrgrs,
310                         ); err != nil {
311                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
312                                         return fmt.Sprintf(
313                                                 "Tossing file %s/%s (%s): %s: copying",
314                                                 sender.Name, pktName,
315                                                 humanize.IBytes(pktSize), dst,
316                                         )
317                                 })
318                                 return err
319                         }
320                         if err = bufW.Flush(); err != nil {
321                                 tmp.Close()
322                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
323                                         return fmt.Sprintf(
324                                                 "Tossing file %s/%s (%s): %s: flushing",
325                                                 sender.Name, pktName,
326                                                 humanize.IBytes(pktSize), dst,
327                                         )
328                                 })
329                                 return err
330                         }
331                         if !NoSync {
332                                 if err = tmp.Sync(); err != nil {
333                                         tmp.Close()
334                                         ctx.LogE("rx-sync", les, err, func(les LEs) string {
335                                                 return fmt.Sprintf(
336                                                         "Tossing file %s/%s (%s): %s: syncing",
337                                                         sender.Name, pktName,
338                                                         humanize.IBytes(pktSize), dst,
339                                                 )
340                                         })
341                                         return err
342                                 }
343                         }
344                         if err = tmp.Close(); err != nil {
345                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
346                                         return fmt.Sprintf(
347                                                 "Tossing file %s/%s (%s): %s: closing",
348                                                 sender.Name, pktName,
349                                                 humanize.IBytes(pktSize), dst,
350                                         )
351                                 })
352                                 return err
353                         }
354                         dstPathOrig := filepath.Join(*incoming, dst)
355                         dstPath := dstPathOrig
356                         dstPathCtr := 0
357                         for {
358                                 if _, err = os.Stat(dstPath); err != nil {
359                                         if os.IsNotExist(err) {
360                                                 break
361                                         }
362                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
363                                                 return fmt.Sprintf(
364                                                         "Tossing file %s/%s (%s): %s: stating: %s",
365                                                         sender.Name, pktName,
366                                                         humanize.IBytes(pktSize), dst, dstPath,
367                                                 )
368                                         })
369                                         return err
370                                 }
371                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
372                                 dstPathCtr++
373                         }
374                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
375                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
376                                         return fmt.Sprintf(
377                                                 "Tossing file %s/%s (%s): %s: renaming",
378                                                 sender.Name, pktName,
379                                                 humanize.IBytes(pktSize), dst,
380                                         )
381                                 })
382                                 return err
383                         }
384                         if err = DirSync(*incoming); err != nil {
385                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
386                                         return fmt.Sprintf(
387                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
388                                                 sender.Name, pktName,
389                                                 humanize.IBytes(pktSize), dst,
390                                         )
391                                 })
392                                 return err
393                         }
394                         les = les[:len(les)-1] // delete Tmp
395                 }
396                 ctx.LogI("rx", les, func(les LEs) string {
397                         return fmt.Sprintf(
398                                 "Got file %s (%s) from %s",
399                                 dst, humanize.IBytes(pktSize), sender.Name,
400                         )
401                 })
402                 if !dryRun {
403                         if jobPath != "" {
404                                 if doSeen {
405                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
406                                                 return err
407                                         }
408                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
409                                                 fd.Close()
410                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
411                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
412                                                                 return fmt.Sprintf(
413                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
414                                                                         sender.Name, pktName,
415                                                                         humanize.IBytes(pktSize),
416                                                                         filepath.Base(jobPath),
417                                                                 )
418                                                         })
419                                                         return err
420                                                 }
421                                         }
422                                 }
423                                 if err = os.Remove(jobPath); err != nil {
424                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
425                                                 return fmt.Sprintf(
426                                                         "Tossing file %s/%s (%s): %s: removing",
427                                                         sender.Name, pktName,
428                                                         humanize.IBytes(pktSize), dst,
429                                                 )
430                                         })
431                                         return err
432                                 } else if ctx.HdrUsage {
433                                         os.Remove(JobPath2Hdr(jobPath))
434                                 }
435                         }
436                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
437                                 cmd := exec.Command(
438                                         sendmail[0],
439                                         append(sendmail[1:], ctx.NotifyFile.To)...,
440                                 )
441                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
442                                         "File from %s: %s (%s)",
443                                         sender.Name, dst, humanize.IBytes(pktSize),
444                                 ), nil)
445                                 if err = cmd.Run(); err != nil {
446                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
447                                                 return fmt.Sprintf(
448                                                         "Tossing file %s/%s (%s): %s: notifying",
449                                                         sender.Name, pktName,
450                                                         humanize.IBytes(pktSize), dst,
451                                                 )
452                                         })
453                                 }
454                         }
455                 }
456
457         case PktTypeFreq:
458                 if noFreq {
459                         return nil
460                 }
461                 src := string(pkt.Path[:int(pkt.PathLen)])
462                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
463                 if filepath.IsAbs(src) {
464                         err = errors.New("non-relative source path")
465                         ctx.LogE(
466                                 "rx-non-rel", les, err,
467                                 func(les LEs) string {
468                                         return fmt.Sprintf(
469                                                 "Tossing freq %s/%s (%s): %s: notifying",
470                                                 sender.Name, pktName,
471                                                 humanize.IBytes(pktSize), src,
472                                         )
473                                 },
474                         )
475                         return err
476                 }
477                 dstRaw, err := io.ReadAll(pipeR)
478                 if err != nil {
479                         ctx.LogE("rx-read", les, err, func(les LEs) string {
480                                 return fmt.Sprintf(
481                                         "Tossing freq %s/%s (%s): %s: reading",
482                                         sender.Name, pktName,
483                                         humanize.IBytes(pktSize), src,
484                                 )
485                         })
486                         return err
487                 }
488                 dst := string(dstRaw)
489                 les = append(les, LE{"Dst", dst})
490                 freqPath := sender.FreqPath
491                 if freqPath == nil {
492                         err = errors.New("freqing is not allowed")
493                         ctx.LogE(
494                                 "rx-no-freq", les, err,
495                                 func(les LEs) string {
496                                         return fmt.Sprintf(
497                                                 "Tossing freq %s/%s (%s): %s -> %s",
498                                                 sender.Name, pktName,
499                                                 humanize.IBytes(pktSize), src, dst,
500                                         )
501                                 },
502                         )
503                         return err
504                 }
505                 if !dryRun {
506                         err = ctx.TxFile(
507                                 sender,
508                                 pkt.Nice,
509                                 filepath.Join(*freqPath, src),
510                                 dst,
511                                 sender.FreqChunked,
512                                 sender.FreqMinSize,
513                                 sender.FreqMaxSize,
514                                 nil,
515                         )
516                         if err != nil {
517                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
518                                         return fmt.Sprintf(
519                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
520                                                 sender.Name, pktName,
521                                                 humanize.IBytes(pktSize), src, dst,
522                                         )
523                                 })
524                                 return err
525                         }
526                 }
527                 ctx.LogI("rx", les, func(les LEs) string {
528                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
529                 })
530                 if !dryRun {
531                         if jobPath != "" {
532                                 if doSeen {
533                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
534                                                 return err
535                                         }
536                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
537                                                 fd.Close()
538                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
539                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
540                                                                 return fmt.Sprintf(
541                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
542                                                                         sender.Name, pktName,
543                                                                         humanize.IBytes(pktSize),
544                                                                         filepath.Base(jobPath),
545                                                                 )
546                                                         })
547                                                         return err
548                                                 }
549                                         }
550                                 }
551                                 if err = os.Remove(jobPath); err != nil {
552                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
553                                                 return fmt.Sprintf(
554                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
555                                                         sender.Name, pktName,
556                                                         humanize.IBytes(pktSize), src, dst,
557                                                 )
558                                         })
559                                         return err
560                                 } else if ctx.HdrUsage {
561                                         os.Remove(JobPath2Hdr(jobPath))
562                                 }
563                         }
564                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
565                                 cmd := exec.Command(
566                                         sendmail[0],
567                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
568                                 )
569                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
570                                         "Freq from %s: %s", sender.Name, src,
571                                 ), nil)
572                                 if err = cmd.Run(); err != nil {
573                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
574                                                 return fmt.Sprintf(
575                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
576                                                         sender.Name, pktName,
577                                                         humanize.IBytes(pktSize), src, dst,
578                                                 )
579                                         })
580                                 }
581                         }
582                 }
583
584         case PktTypeTrns:
585                 if noTrns {
586                         return nil
587                 }
588                 dst := new([MTHSize]byte)
589                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
590                 nodeId := NodeId(*dst)
591                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
592                 logMsg := func(les LEs) string {
593                         return fmt.Sprintf(
594                                 "Tossing trns %s/%s (%s): %s",
595                                 sender.Name, pktName,
596                                 humanize.IBytes(pktSize),
597                                 nodeId.String(),
598                         )
599                 }
600                 node := ctx.Neigh[nodeId]
601                 if node == nil {
602                         err = errors.New("unknown node")
603                         ctx.LogE("rx-unknown", les, err, logMsg)
604                         return err
605                 }
606                 ctx.LogD("rx-tx", les, logMsg)
607                 if !dryRun {
608                         if len(node.Via) == 0 {
609                                 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
610                                         ctx.LogE("rx", les, err, func(les LEs) string {
611                                                 return logMsg(les) + ": txing"
612                                         })
613                                         return err
614                                 }
615                         } else {
616                                 via := node.Via[:len(node.Via)-1]
617                                 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
618                                 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
619                                 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
620                                 if err != nil {
621                                         panic(err)
622                                 }
623                                 if _, _, _, err = ctx.Tx(
624                                         node,
625                                         pktTrns,
626                                         nice,
627                                         int64(pktSize), 0, MaxFileSize,
628                                         pipeR,
629                                         pktName,
630                                         nil,
631                                 ); err != nil {
632                                         ctx.LogE("rx", les, err, func(les LEs) string {
633                                                 return logMsg(les) + ": txing"
634                                         })
635                                         return err
636                                 }
637                         }
638                 }
639                 ctx.LogI("rx", les, func(les LEs) string {
640                         return fmt.Sprintf(
641                                 "Got transitional packet from %s to %s (%s)",
642                                 sender.Name,
643                                 ctx.NodeName(&nodeId),
644                                 humanize.IBytes(pktSize),
645                         )
646                 })
647                 if !dryRun && jobPath != "" {
648                         if doSeen {
649                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
650                                         return err
651                                 }
652                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
653                                         fd.Close()
654                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
655                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
656                                                         return fmt.Sprintf(
657                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
658                                                                 sender.Name, pktName,
659                                                                 humanize.IBytes(pktSize),
660                                                                 filepath.Base(jobPath),
661                                                         )
662                                                 })
663                                                 return err
664                                         }
665                                 }
666                         }
667                         if err = os.Remove(jobPath); err != nil {
668                                 ctx.LogE("rx", les, err, func(les LEs) string {
669                                         return fmt.Sprintf(
670                                                 "Tossing trns %s/%s (%s): %s: removing",
671                                                 sender.Name, pktName,
672                                                 humanize.IBytes(pktSize),
673                                                 ctx.NodeName(&nodeId),
674                                         )
675                                 })
676                                 return err
677                         } else if ctx.HdrUsage {
678                                 os.Remove(JobPath2Hdr(jobPath))
679                         }
680                 }
681
682         case PktTypeArea:
683                 if noArea {
684                         return nil
685                 }
686                 areaId := new(AreaId)
687                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
688                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
689                 logMsg := func(les LEs) string {
690                         return fmt.Sprintf(
691                                 "Tossing %s/%s (%s): area %s",
692                                 sender.Name, pktName,
693                                 humanize.IBytes(pktSize),
694                                 ctx.AreaName(areaId),
695                         )
696                 }
697                 area := ctx.AreaId2Area[*areaId]
698                 if area == nil {
699                         err = errors.New("unknown area")
700                         ctx.LogE("rx-area-unknown", les, err, logMsg)
701                         return err
702                 }
703                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
704                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
705                 if err != nil {
706                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
707                         return err
708                 }
709                 msgHashRaw := blake2b.Sum256(pktEncRaw)
710                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
711                 les = append(les, LE{"AreaMsg", msgHash})
712                 ctx.LogD("rx-area", les, logMsg)
713
714                 if dryRun {
715                         for _, nodeId := range area.Subs {
716                                 node := ctx.Neigh[*nodeId]
717                                 lesEcho := append(les, LE{"Echo", nodeId})
718                                 seenDir := filepath.Join(
719                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
720                                 )
721                                 seenPath := filepath.Join(seenDir, msgHash)
722                                 logMsgNode := func(les LEs) string {
723                                         return fmt.Sprintf(
724                                                 "%s: echoing to: %s", logMsg(les), node.Name,
725                                         )
726                                 }
727                                 if _, err := os.Stat(seenPath); err == nil {
728                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
729                                                 return logMsgNode(les) + ": already sent"
730                                         })
731                                         continue
732                                 }
733                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
734                         }
735                 } else {
736                         for _, nodeId := range area.Subs {
737                                 node := ctx.Neigh[*nodeId]
738                                 lesEcho := append(les, LE{"Echo", nodeId})
739                                 seenDir := filepath.Join(
740                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
741                                 )
742                                 seenPath := filepath.Join(seenDir, msgHash)
743                                 logMsgNode := func(les LEs) string {
744                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
745                                 }
746                                 if _, err := os.Stat(seenPath); err == nil {
747                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
748                                                 return logMsgNode(les) + ": already sent"
749                                         })
750                                         continue
751                                 }
752                                 if nodeId != sender.Id && nodeId != pktEnc.Sender {
753                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
754                                         if _, _, _, err = ctx.Tx(
755                                                 node,
756                                                 &pkt,
757                                                 nice,
758                                                 int64(pktSize), 0, MaxFileSize,
759                                                 fullPipeR,
760                                                 pktName,
761                                                 nil,
762                                         ); err != nil {
763                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
764                                                 return err
765                                         }
766                                 }
767                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
768                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
769                                         return err
770                                 }
771                                 if fd, err := os.Create(seenPath); err == nil {
772                                         fd.Close()
773                                         if err = DirSync(seenDir); err != nil {
774                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
775                                                 return err
776                                         }
777                                 } else {
778                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
779                                         return err
780                                 }
781                                 return JobRepeatProcess
782                         }
783                 }
784
785                 seenDir := filepath.Join(
786                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
787                 )
788                 seenPath := filepath.Join(seenDir, msgHash)
789                 if _, err := os.Stat(seenPath); err == nil {
790                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
791                                 return logMsg(les) + ": already seen"
792                         })
793                         if !dryRun && jobPath != "" {
794                                 if err = os.Remove(jobPath); err != nil {
795                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
796                                                 return fmt.Sprintf(
797                                                         "Tossing area %s/%s (%s): %s: removing",
798                                                         sender.Name, pktName,
799                                                         humanize.IBytes(pktSize),
800                                                         msgHash,
801                                                 )
802                                         })
803                                         return err
804                                 } else if ctx.HdrUsage {
805                                         os.Remove(JobPath2Hdr(jobPath))
806                                 }
807                         }
808                         return nil
809                 }
810
811                 if area.Prv == nil {
812                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
813                                 return logMsg(les) + ": no private key for decoding"
814                         })
815                 } else {
816                         signatureVerify := true
817                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
818                                 if !area.AllowUnknown {
819                                         err = errors.New("unknown sender")
820                                         ctx.LogE(
821                                                 "rx-area-unknown",
822                                                 append(les, LE{"Sender", pktEnc.Sender}),
823                                                 err,
824                                                 func(les LEs) string {
825                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
826                                                 },
827                                         )
828                                         return err
829                                 }
830                                 signatureVerify = false
831                         }
832                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
833                         copy(areaNodeOur.Id[:], area.Id[:])
834                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
835                         areaNode := Node{
836                                 Id:       new(NodeId),
837                                 Name:     area.Name,
838                                 Incoming: area.Incoming,
839                                 Exec:     area.Exec,
840                         }
841                         copy(areaNode.Id[:], area.Id[:])
842                         pktName := fmt.Sprintf(
843                                 "area/%s/%s",
844                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
845                         )
846
847                         pipeR, pipeW := io.Pipe()
848                         errs := make(chan error, 1)
849                         go func() {
850                                 errs <- jobProcess(
851                                         ctx,
852                                         pipeR,
853                                         pktName,
854                                         les,
855                                         &areaNode,
856                                         nice,
857                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
858                                         "",
859                                         decompressor,
860                                         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
861                                 )
862                         }()
863                         _, _, _, err = PktEncRead(
864                                 &areaNodeOur,
865                                 ctx.Neigh,
866                                 fullPipeR,
867                                 pipeW,
868                                 signatureVerify,
869                                 nil,
870                         )
871                         if err != nil {
872                                 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
873                                 pipeW.CloseWithError(err)
874                                 <-errs
875                                 return err
876                         }
877                         pipeW.Close()
878                         if err = <-errs; err != nil {
879                                 return err
880                         }
881                 }
882
883                 if !dryRun && jobPath != "" {
884                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
885                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
886                                 return err
887                         }
888                         if fd, err := os.Create(seenPath); err == nil {
889                                 fd.Close()
890                                 if err = DirSync(seenDir); err != nil {
891                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
892                                         return err
893                                 }
894                         }
895                         if err = os.Remove(jobPath); err != nil {
896                                 ctx.LogE("rx", les, err, func(les LEs) string {
897                                         return fmt.Sprintf(
898                                                 "Tossing area %s/%s (%s): %s: removing",
899                                                 sender.Name, pktName,
900                                                 humanize.IBytes(pktSize),
901                                                 msgHash,
902                                         )
903                                 })
904                                 return err
905                         } else if ctx.HdrUsage {
906                                 os.Remove(JobPath2Hdr(jobPath))
907                         }
908                 }
909
910         case PktTypeACK:
911                 if noACK {
912                         return nil
913                 }
914                 hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
915                 les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
916                 logMsg := func(les LEs) string {
917                         return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
918                 }
919                 ctx.LogD("rx-ack", les, logMsg)
920                 pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
921                 if _, err := os.Stat(pktPath); err == nil {
922                         if !dryRun {
923                                 if err = os.Remove(pktPath); err != nil {
924                                         ctx.LogE("rx-ack", les, err, func(les LEs) string {
925                                                 return logMsg(les) + ": removing packet"
926                                         })
927                                         return err
928                                 } else if ctx.HdrUsage {
929                                         os.Remove(JobPath2Hdr(pktPath))
930                                 }
931                         }
932                 } else {
933                         ctx.LogD("rx-ack", les, func(les LEs) string {
934                                 return logMsg(les) + ": already disappeared"
935                         })
936                 }
937                 if !dryRun && doSeen {
938                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
939                                 return err
940                         }
941                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
942                                 fd.Close()
943                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
944                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
945                                                 return fmt.Sprintf(
946                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
947                                                         sender.Name, pktName,
948                                                         humanize.IBytes(pktSize),
949                                                         filepath.Base(jobPath),
950                                                 )
951                                         })
952                                         return err
953                                 }
954                         }
955                 }
956                 if !dryRun {
957                         if err = os.Remove(jobPath); err != nil {
958                                 ctx.LogE("rx", les, err, func(les LEs) string {
959                                         return logMsg(les) + ": removing job"
960                                 })
961                                 return err
962                         } else if ctx.HdrUsage {
963                                 os.Remove(JobPath2Hdr(jobPath))
964                         }
965                 }
966                 ctx.LogI("rx", les, func(les LEs) string {
967                         return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
968                 })
969
970         default:
971                 err = errors.New("unknown type")
972                 ctx.LogE(
973                         "rx-type-unknown", les, err,
974                         func(les LEs) string {
975                                 return fmt.Sprintf(
976                                         "Tossing %s/%s (%s)",
977                                         sender.Name, pktName, humanize.IBytes(pktSize),
978                                 )
979                         },
980                 )
981                 return err
982         }
983         return nil
984 }
985
986 func (ctx *Ctx) Toss(
987         nodeId *NodeId,
988         xx TRxTx,
989         nice uint8,
990         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
991 ) bool {
992         dirLock, err := ctx.LockDir(nodeId, "toss")
993         if err != nil {
994                 return false
995         }
996         defer ctx.UnlockDir(dirLock)
997         isBad := false
998         decompressor, err := zstd.NewReader(nil)
999         if err != nil {
1000                 panic(err)
1001         }
1002         defer decompressor.Close()
1003         for job := range ctx.Jobs(nodeId, xx) {
1004                 pktName := filepath.Base(job.Path)
1005                 les := LEs{
1006                         {"Node", job.PktEnc.Sender},
1007                         {"Pkt", pktName},
1008                         {"Nice", int(job.PktEnc.Nice)},
1009                 }
1010                 if job.PktEnc.Nice > nice {
1011                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
1012                                 return fmt.Sprintf(
1013                                         "Tossing %s/%s: too nice: %s",
1014                                         ctx.NodeName(job.PktEnc.Sender), pktName,
1015                                         NicenessFmt(job.PktEnc.Nice),
1016                                 )
1017                         })
1018                         continue
1019                 }
1020                 fd, err := os.Open(job.Path)
1021                 if err != nil {
1022                         ctx.LogE("rx-open", les, err, func(les LEs) string {
1023                                 return fmt.Sprintf(
1024                                         "Tossing %s/%s: opening %s",
1025                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
1026                                 )
1027                         })
1028                         isBad = true
1029                         continue
1030                 }
1031                 sender := ctx.Neigh[*job.PktEnc.Sender]
1032                 if sender == nil {
1033                         err := errors.New("unknown node")
1034                         ctx.LogE("rx-open", les, err, func(les LEs) string {
1035                                 return fmt.Sprintf(
1036                                         "Tossing %s/%s",
1037                                         ctx.NodeName(job.PktEnc.Sender), pktName,
1038                                 )
1039                         })
1040                         isBad = true
1041                         continue
1042                 }
1043                 errs := make(chan error, 1)
1044                 var sharedKey []byte
1045         Retry:
1046                 pipeR, pipeW := io.Pipe()
1047                 go func() {
1048                         errs <- jobProcess(
1049                                 ctx,
1050                                 pipeR,
1051                                 pktName,
1052                                 les,
1053                                 sender,
1054                                 job.PktEnc.Nice,
1055                                 uint64(pktSizeWithoutEnc(job.Size)),
1056                                 job.Path,
1057                                 decompressor,
1058                                 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
1059                         )
1060                 }()
1061                 pipeWB := bufio.NewWriter(pipeW)
1062                 sharedKey, _, _, err = PktEncRead(
1063                         ctx.Self,
1064                         ctx.Neigh,
1065                         bufio.NewReaderSize(fd, MTHBlockSize),
1066                         pipeWB,
1067                         sharedKey == nil,
1068                         sharedKey,
1069                 )
1070                 if err != nil {
1071                         pipeW.CloseWithError(err)
1072                 }
1073                 if err := pipeWB.Flush(); err != nil {
1074                         pipeW.CloseWithError(err)
1075                 }
1076                 pipeW.Close()
1077
1078                 if err != nil {
1079                         isBad = true
1080                         fd.Close()
1081                         <-errs
1082                         continue
1083                 }
1084                 if err = <-errs; err == JobRepeatProcess {
1085                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1086                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1087                                         return fmt.Sprintf(
1088                                                 "Tossing %s/%s: can not seek",
1089                                                 ctx.NodeName(job.PktEnc.Sender),
1090                                                 pktName,
1091                                         )
1092                                 })
1093                                 isBad = true
1094                                 break
1095                         }
1096                         goto Retry
1097                 } else if err != nil {
1098                         isBad = true
1099                 }
1100                 fd.Close()
1101         }
1102         return isBad
1103 }
1104
1105 func (ctx *Ctx) AutoToss(
1106         nodeId *NodeId,
1107         nice uint8,
1108         doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
1109 ) (chan struct{}, chan bool) {
1110         dw, err := ctx.NewDirWatcher(
1111                 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1112                 time.Second,
1113         )
1114         if err != nil {
1115                 log.Fatalln(err)
1116         }
1117         finish := make(chan struct{})
1118         badCode := make(chan bool)
1119         go func() {
1120                 bad := false
1121                 for {
1122                         select {
1123                         case <-finish:
1124                                 dw.Close()
1125                                 badCode <- bad
1126                                 return
1127                         case <-dw.C:
1128                                 bad = !ctx.Toss(
1129                                         nodeId, TRx, nice, false,
1130                                         doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad
1131                         }
1132                 }
1133         }()
1134         return finish, badCode
1135 }