]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Move .seen and .hdr to subdirectories
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenDir = "seen"
47 )
48
49 func jobPath2Seen(jobPath string) string {
50         return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
51 }
52
53 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
54         lines := []string{
55                 "From: " + fromTo.From,
56                 "To: " + fromTo.To,
57                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
58         }
59         if len(body) > 0 {
60                 lines = append(
61                         lines,
62                         "MIME-Version: 1.0",
63                         "Content-Type: text/plain; charset=utf-8",
64                         "Content-Transfer-Encoding: base64",
65                         "",
66                         base64.StdEncoding.EncodeToString(body),
67                 )
68         }
69         return strings.NewReader(strings.Join(lines, "\n"))
70 }
71
72 func pktSizeWithoutEnc(pktSize int64) int64 {
73         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
74         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
75         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
76                 pktSize -= poly1305.TagSize
77         }
78         pktSize -= pktSizeBlocks * poly1305.TagSize
79         return pktSize
80 }
81
82 var JobRepeatProcess = errors.New("needs processing repeat")
83
84 func jobProcess(
85         ctx *Ctx,
86         pipeR *io.PipeReader,
87         pktName string,
88         les LEs,
89         sender *Node,
90         nice uint8,
91         pktSize uint64,
92         jobPath string,
93         decompressor *zstd.Decoder,
94         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
95 ) error {
96         defer pipeR.Close()
97         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
98         var pkt Pkt
99         _, err := xdr.Unmarshal(pipeR, &pkt)
100         if err != nil {
101                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
102                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
103                 })
104                 return err
105         }
106         les = append(les, LE{"Size", int64(pktSize)})
107         ctx.LogD("rx", les, func(les LEs) string {
108                 return fmt.Sprintf(
109                         "Tossing %s/%s (%s)",
110                         sender.Name, pktName,
111                         humanize.IBytes(pktSize),
112                 )
113         })
114         switch pkt.Type {
115         case PktTypeExec, PktTypeExecFat:
116                 if noExec {
117                         return nil
118                 }
119                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
120                 handle := string(path[0])
121                 args := make([]string, 0, len(path)-1)
122                 for _, p := range path[1:] {
123                         args = append(args, string(p))
124                 }
125                 argsStr := strings.Join(append([]string{handle}, args...), " ")
126                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
127                 cmdline := sender.Exec[handle]
128                 if len(cmdline) == 0 {
129                         err = errors.New("No handle found")
130                         ctx.LogE(
131                                 "rx-no-handle", les, err,
132                                 func(les LEs) string {
133                                         return fmt.Sprintf(
134                                                 "Tossing exec %s/%s (%s): %s",
135                                                 sender.Name, pktName,
136                                                 humanize.IBytes(pktSize), argsStr,
137                                         )
138                                 },
139                         )
140                         return err
141                 }
142                 if pkt.Type == PktTypeExec {
143                         if err = decompressor.Reset(pipeR); err != nil {
144                                 log.Fatalln(err)
145                         }
146                 }
147                 if !dryRun {
148                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
149                         cmd.Env = append(
150                                 cmd.Env,
151                                 "NNCP_SELF="+ctx.Self.Id.String(),
152                                 "NNCP_SENDER="+sender.Id.String(),
153                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
154                         )
155                         if pkt.Type == PktTypeExec {
156                                 cmd.Stdin = decompressor
157                         } else {
158                                 cmd.Stdin = pipeR
159                         }
160                         output, err := cmd.Output()
161                         if err != nil {
162                                 ctx.LogE("rx-hande", les, err, func(les LEs) string {
163                                         return fmt.Sprintf(
164                                                 "Tossing exec %s/%s (%s): %s: handling",
165                                                 sender.Name, pktName,
166                                                 humanize.IBytes(uint64(pktSize)), argsStr,
167                                         )
168                                 })
169                                 return err
170                         }
171                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
172                                 notify := ctx.NotifyExec[sender.Name+"."+handle]
173                                 if notify == nil {
174                                         notify = ctx.NotifyExec["*."+handle]
175                                 }
176                                 if notify != nil {
177                                         cmd := exec.Command(
178                                                 sendmail[0],
179                                                 append(sendmail[1:], notify.To)...,
180                                         )
181                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
182                                                 "Exec from %s: %s", sender.Name, argsStr,
183                                         ), output)
184                                         if err = cmd.Run(); err != nil {
185                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
186                                                         return fmt.Sprintf(
187                                                                 "Tossing exec %s/%s (%s): %s: notifying",
188                                                                 sender.Name, pktName,
189                                                                 humanize.IBytes(pktSize), argsStr,
190                                                         )
191                                                 })
192                                         }
193                                 }
194                         }
195                 }
196                 ctx.LogI("rx", les, func(les LEs) string {
197                         return fmt.Sprintf(
198                                 "Got exec from %s to %s (%s)",
199                                 sender.Name, argsStr,
200                                 humanize.IBytes(pktSize),
201                         )
202                 })
203                 if !dryRun && jobPath != "" {
204                         if doSeen {
205                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
206                                         return err
207                                 }
208                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
209                                         fd.Close()
210                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
211                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
212                                                         return fmt.Sprintf(
213                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
214                                                                 sender.Name, pktName,
215                                                                 humanize.IBytes(pktSize),
216                                                                 filepath.Base(jobPath),
217                                                         )
218                                                 })
219                                                 return err
220                                         }
221                                 }
222                         }
223                         if err = os.Remove(jobPath); err != nil {
224                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
225                                         return fmt.Sprintf(
226                                                 "Tossing exec %s/%s (%s): %s: notifying",
227                                                 sender.Name, pktName,
228                                                 humanize.IBytes(pktSize), argsStr,
229                                         )
230                                 })
231                                 return err
232                         } else if ctx.HdrUsage {
233                                 os.Remove(JobPath2Hdr(jobPath))
234                         }
235                 }
236
237         case PktTypeFile:
238                 if noFile {
239                         return nil
240                 }
241                 dst := string(pkt.Path[:int(pkt.PathLen)])
242                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
243                 if filepath.IsAbs(dst) {
244                         err = errors.New("non-relative destination path")
245                         ctx.LogE(
246                                 "rx-non-rel", les, err,
247                                 func(les LEs) string {
248                                         return fmt.Sprintf(
249                                                 "Tossing file %s/%s (%s): %s",
250                                                 sender.Name, pktName,
251                                                 humanize.IBytes(pktSize), dst,
252                                         )
253                                 },
254                         )
255                         return err
256                 }
257                 incoming := sender.Incoming
258                 if incoming == nil {
259                         err = errors.New("incoming is not allowed")
260                         ctx.LogE(
261                                 "rx-no-incoming", les, err,
262                                 func(les LEs) string {
263                                         return fmt.Sprintf(
264                                                 "Tossing file %s/%s (%s): %s",
265                                                 sender.Name, pktName,
266                                                 humanize.IBytes(pktSize), dst,
267                                         )
268                                 },
269                         )
270                         return err
271                 }
272                 dir := filepath.Join(*incoming, path.Dir(dst))
273                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
274                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
275                                 return fmt.Sprintf(
276                                         "Tossing file %s/%s (%s): %s: mkdir",
277                                         sender.Name, pktName,
278                                         humanize.IBytes(pktSize), dst,
279                                 )
280                         })
281                         return err
282                 }
283                 if !dryRun {
284                         tmp, err := TempFile(dir, "file")
285                         if err != nil {
286                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
287                                         return fmt.Sprintf(
288                                                 "Tossing file %s/%s (%s): %s: mktemp",
289                                                 sender.Name, pktName,
290                                                 humanize.IBytes(pktSize), dst,
291                                         )
292                                 })
293                                 return err
294                         }
295                         les = append(les, LE{"Tmp", tmp.Name()})
296                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
297                                 return fmt.Sprintf(
298                                         "Tossing file %s/%s (%s): %s: created: %s",
299                                         sender.Name, pktName,
300                                         humanize.IBytes(pktSize), dst, tmp.Name(),
301                                 )
302                         })
303                         bufW := bufio.NewWriter(tmp)
304                         if _, err = CopyProgressed(
305                                 bufW, pipeR, "Rx file",
306                                 append(les, LE{"FullSize", int64(pktSize)}),
307                                 ctx.ShowPrgrs,
308                         ); err != nil {
309                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
310                                         return fmt.Sprintf(
311                                                 "Tossing file %s/%s (%s): %s: copying",
312                                                 sender.Name, pktName,
313                                                 humanize.IBytes(pktSize), dst,
314                                         )
315                                 })
316                                 return err
317                         }
318                         if err = bufW.Flush(); err != nil {
319                                 tmp.Close()
320                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
321                                         return fmt.Sprintf(
322                                                 "Tossing file %s/%s (%s): %s: flushing",
323                                                 sender.Name, pktName,
324                                                 humanize.IBytes(pktSize), dst,
325                                         )
326                                 })
327                                 return err
328                         }
329                         if err = tmp.Sync(); err != nil {
330                                 tmp.Close()
331                                 ctx.LogE("rx-sync", les, err, func(les LEs) string {
332                                         return fmt.Sprintf(
333                                                 "Tossing file %s/%s (%s): %s: syncing",
334                                                 sender.Name, pktName,
335                                                 humanize.IBytes(pktSize), dst,
336                                         )
337                                 })
338                                 return err
339                         }
340                         if err = tmp.Close(); err != nil {
341                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
342                                         return fmt.Sprintf(
343                                                 "Tossing file %s/%s (%s): %s: closing",
344                                                 sender.Name, pktName,
345                                                 humanize.IBytes(pktSize), dst,
346                                         )
347                                 })
348                                 return err
349                         }
350                         dstPathOrig := filepath.Join(*incoming, dst)
351                         dstPath := dstPathOrig
352                         dstPathCtr := 0
353                         for {
354                                 if _, err = os.Stat(dstPath); err != nil {
355                                         if os.IsNotExist(err) {
356                                                 break
357                                         }
358                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
359                                                 return fmt.Sprintf(
360                                                         "Tossing file %s/%s (%s): %s: stating: %s",
361                                                         sender.Name, pktName,
362                                                         humanize.IBytes(pktSize), dst, dstPath,
363                                                 )
364                                         })
365                                         return err
366                                 }
367                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
368                                 dstPathCtr++
369                         }
370                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
371                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
372                                         return fmt.Sprintf(
373                                                 "Tossing file %s/%s (%s): %s: renaming",
374                                                 sender.Name, pktName,
375                                                 humanize.IBytes(pktSize), dst,
376                                         )
377                                 })
378                                 return err
379                         }
380                         if err = DirSync(*incoming); err != nil {
381                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
382                                         return fmt.Sprintf(
383                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
384                                                 sender.Name, pktName,
385                                                 humanize.IBytes(pktSize), dst,
386                                         )
387                                 })
388                                 return err
389                         }
390                         les = les[:len(les)-1] // delete Tmp
391                 }
392                 ctx.LogI("rx", les, func(les LEs) string {
393                         return fmt.Sprintf(
394                                 "Got file %s (%s) from %s",
395                                 dst, humanize.IBytes(pktSize), sender.Name,
396                         )
397                 })
398                 if !dryRun {
399                         if jobPath != "" {
400                                 if doSeen {
401                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
402                                                 return err
403                                         }
404                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
405                                                 fd.Close()
406                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
407                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
408                                                                 return fmt.Sprintf(
409                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
410                                                                         sender.Name, pktName,
411                                                                         humanize.IBytes(pktSize),
412                                                                         filepath.Base(jobPath),
413                                                                 )
414                                                         })
415                                                         return err
416                                                 }
417                                         }
418                                 }
419                                 if err = os.Remove(jobPath); err != nil {
420                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
421                                                 return fmt.Sprintf(
422                                                         "Tossing file %s/%s (%s): %s: removing",
423                                                         sender.Name, pktName,
424                                                         humanize.IBytes(pktSize), dst,
425                                                 )
426                                         })
427                                         return err
428                                 } else if ctx.HdrUsage {
429                                         os.Remove(JobPath2Hdr(jobPath))
430                                 }
431                         }
432                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
433                                 cmd := exec.Command(
434                                         sendmail[0],
435                                         append(sendmail[1:], ctx.NotifyFile.To)...,
436                                 )
437                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
438                                         "File from %s: %s (%s)",
439                                         sender.Name, dst, humanize.IBytes(pktSize),
440                                 ), nil)
441                                 if err = cmd.Run(); err != nil {
442                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
443                                                 return fmt.Sprintf(
444                                                         "Tossing file %s/%s (%s): %s: notifying",
445                                                         sender.Name, pktName,
446                                                         humanize.IBytes(pktSize), dst,
447                                                 )
448                                         })
449                                 }
450                         }
451                 }
452
453         case PktTypeFreq:
454                 if noFreq {
455                         return nil
456                 }
457                 src := string(pkt.Path[:int(pkt.PathLen)])
458                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
459                 if filepath.IsAbs(src) {
460                         err = errors.New("non-relative source path")
461                         ctx.LogE(
462                                 "rx-non-rel", les, err,
463                                 func(les LEs) string {
464                                         return fmt.Sprintf(
465                                                 "Tossing freq %s/%s (%s): %s: notifying",
466                                                 sender.Name, pktName,
467                                                 humanize.IBytes(pktSize), src,
468                                         )
469                                 },
470                         )
471                         return err
472                 }
473                 dstRaw, err := ioutil.ReadAll(pipeR)
474                 if err != nil {
475                         ctx.LogE("rx-read", les, err, func(les LEs) string {
476                                 return fmt.Sprintf(
477                                         "Tossing freq %s/%s (%s): %s: reading",
478                                         sender.Name, pktName,
479                                         humanize.IBytes(pktSize), src,
480                                 )
481                         })
482                         return err
483                 }
484                 dst := string(dstRaw)
485                 les = append(les, LE{"Dst", dst})
486                 freqPath := sender.FreqPath
487                 if freqPath == nil {
488                         err = errors.New("freqing is not allowed")
489                         ctx.LogE(
490                                 "rx-no-freq", les, err,
491                                 func(les LEs) string {
492                                         return fmt.Sprintf(
493                                                 "Tossing freq %s/%s (%s): %s -> %s",
494                                                 sender.Name, pktName,
495                                                 humanize.IBytes(pktSize), src, dst,
496                                         )
497                                 },
498                         )
499                         return err
500                 }
501                 if !dryRun {
502                         err = ctx.TxFile(
503                                 sender,
504                                 pkt.Nice,
505                                 filepath.Join(*freqPath, src),
506                                 dst,
507                                 sender.FreqChunked,
508                                 sender.FreqMinSize,
509                                 sender.FreqMaxSize,
510                                 nil,
511                         )
512                         if err != nil {
513                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
514                                         return fmt.Sprintf(
515                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
516                                                 sender.Name, pktName,
517                                                 humanize.IBytes(pktSize), src, dst,
518                                         )
519                                 })
520                                 return err
521                         }
522                 }
523                 ctx.LogI("rx", les, func(les LEs) string {
524                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
525                 })
526                 if !dryRun {
527                         if jobPath != "" {
528                                 if doSeen {
529                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
530                                                 return err
531                                         }
532                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
533                                                 fd.Close()
534                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
535                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
536                                                                 return fmt.Sprintf(
537                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
538                                                                         sender.Name, pktName,
539                                                                         humanize.IBytes(pktSize),
540                                                                         filepath.Base(jobPath),
541                                                                 )
542                                                         })
543                                                         return err
544                                                 }
545                                         }
546                                 }
547                                 if err = os.Remove(jobPath); err != nil {
548                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
549                                                 return fmt.Sprintf(
550                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
551                                                         sender.Name, pktName,
552                                                         humanize.IBytes(pktSize), src, dst,
553                                                 )
554                                         })
555                                         return err
556                                 } else if ctx.HdrUsage {
557                                         os.Remove(JobPath2Hdr(jobPath))
558                                 }
559                         }
560                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
561                                 cmd := exec.Command(
562                                         sendmail[0],
563                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
564                                 )
565                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
566                                         "Freq from %s: %s", sender.Name, src,
567                                 ), nil)
568                                 if err = cmd.Run(); err != nil {
569                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
570                                                 return fmt.Sprintf(
571                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
572                                                         sender.Name, pktName,
573                                                         humanize.IBytes(pktSize), src, dst,
574                                                 )
575                                         })
576                                 }
577                         }
578                 }
579
580         case PktTypeTrns:
581                 if noTrns {
582                         return nil
583                 }
584                 dst := new([MTHSize]byte)
585                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
586                 nodeId := NodeId(*dst)
587                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
588                 logMsg := func(les LEs) string {
589                         return fmt.Sprintf(
590                                 "Tossing trns %s/%s (%s): %s",
591                                 sender.Name, pktName,
592                                 humanize.IBytes(pktSize),
593                                 nodeId.String(),
594                         )
595                 }
596                 node := ctx.Neigh[nodeId]
597                 if node == nil {
598                         err = errors.New("unknown node")
599                         ctx.LogE("rx-unknown", les, err, logMsg)
600                         return err
601                 }
602                 ctx.LogD("rx-tx", les, logMsg)
603                 if !dryRun {
604                         if len(node.Via) == 0 {
605                                 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
606                                         ctx.LogE("rx", les, err, func(les LEs) string {
607                                                 return logMsg(les) + ": txing"
608                                         })
609                                         return err
610                                 }
611                         } else {
612                                 via := node.Via[:len(node.Via)-1]
613                                 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
614                                 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
615                                 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
616                                 if err != nil {
617                                         panic(err)
618                                 }
619                                 if _, err = ctx.Tx(
620                                         node,
621                                         pktTrns,
622                                         nice,
623                                         int64(pktSize), 0,
624                                         pipeR,
625                                         pktName,
626                                         nil,
627                                 ); err != nil {
628                                         ctx.LogE("rx", les, err, func(les LEs) string {
629                                                 return logMsg(les) + ": txing"
630                                         })
631                                         return err
632                                 }
633                         }
634                 }
635                 ctx.LogI("rx", les, func(les LEs) string {
636                         return fmt.Sprintf(
637                                 "Got transitional packet from %s to %s (%s)",
638                                 sender.Name,
639                                 ctx.NodeName(&nodeId),
640                                 humanize.IBytes(pktSize),
641                         )
642                 })
643                 if !dryRun && jobPath != "" {
644                         if doSeen {
645                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
646                                         return err
647                                 }
648                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
649                                         fd.Close()
650                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
651                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
652                                                         return fmt.Sprintf(
653                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
654                                                                 sender.Name, pktName,
655                                                                 humanize.IBytes(pktSize),
656                                                                 filepath.Base(jobPath),
657                                                         )
658                                                 })
659                                                 return err
660                                         }
661                                 }
662                         }
663                         if err = os.Remove(jobPath); err != nil {
664                                 ctx.LogE("rx", les, err, func(les LEs) string {
665                                         return fmt.Sprintf(
666                                                 "Tossing trns %s/%s (%s): %s: removing",
667                                                 sender.Name, pktName,
668                                                 humanize.IBytes(pktSize),
669                                                 ctx.NodeName(&nodeId),
670                                         )
671                                 })
672                                 return err
673                         } else if ctx.HdrUsage {
674                                 os.Remove(JobPath2Hdr(jobPath))
675                         }
676                 }
677
678         case PktTypeArea:
679                 if noArea {
680                         return nil
681                 }
682                 areaId := new(AreaId)
683                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
684                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
685                 logMsg := func(les LEs) string {
686                         return fmt.Sprintf(
687                                 "Tossing %s/%s (%s): area %s",
688                                 sender.Name, pktName,
689                                 humanize.IBytes(pktSize),
690                                 ctx.AreaName(areaId),
691                         )
692                 }
693                 area := ctx.AreaId2Area[*areaId]
694                 if area == nil {
695                         err = errors.New("unknown area")
696                         ctx.LogE("rx-area-unknown", les, err, logMsg)
697                         return err
698                 }
699                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
700                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
701                 if err != nil {
702                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
703                         return err
704                 }
705                 msgHashRaw := blake2b.Sum256(pktEncRaw)
706                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
707                 les = append(les, LE{"AreaMsg", msgHash})
708                 ctx.LogD("rx-area", les, logMsg)
709
710                 if dryRun {
711                         for _, nodeId := range area.Subs {
712                                 node := ctx.Neigh[*nodeId]
713                                 lesEcho := append(les, LE{"Echo", nodeId})
714                                 seenDir := filepath.Join(
715                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
716                                 )
717                                 seenPath := filepath.Join(seenDir, msgHash)
718                                 logMsgNode := func(les LEs) string {
719                                         return fmt.Sprintf(
720                                                 "%s: echoing to: %s", logMsg(les), node.Name,
721                                         )
722                                 }
723                                 if _, err := os.Stat(seenPath); err == nil {
724                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
725                                                 return logMsgNode(les) + ": already sent"
726                                         })
727                                         continue
728                                 }
729                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
730                         }
731                 } else {
732                         for _, nodeId := range area.Subs {
733                                 node := ctx.Neigh[*nodeId]
734                                 lesEcho := append(les, LE{"Echo", nodeId})
735                                 seenDir := filepath.Join(
736                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
737                                 )
738                                 seenPath := filepath.Join(seenDir, msgHash)
739                                 logMsgNode := func(les LEs) string {
740                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
741                                 }
742                                 if _, err := os.Stat(seenPath); err == nil {
743                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
744                                                 return logMsgNode(les) + ": already sent"
745                                         })
746                                         continue
747                                 }
748                                 if nodeId != sender.Id && nodeId != pktEnc.Sender {
749                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
750                                         if _, err = ctx.Tx(
751                                                 node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil,
752                                         ); err != nil {
753                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
754                                                 return err
755                                         }
756                                 }
757                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
758                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
759                                         return err
760                                 }
761                                 if fd, err := os.Create(seenPath); err == nil {
762                                         fd.Close()
763                                         if err = DirSync(seenDir); err != nil {
764                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
765                                                 return err
766                                         }
767                                 } else {
768                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
769                                         return err
770                                 }
771                                 return JobRepeatProcess
772                         }
773                 }
774
775                 seenDir := filepath.Join(
776                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
777                 )
778                 seenPath := filepath.Join(seenDir, msgHash)
779                 if _, err := os.Stat(seenPath); err == nil {
780                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
781                                 return logMsg(les) + ": already seen"
782                         })
783                         if !dryRun && jobPath != "" {
784                                 if err = os.Remove(jobPath); err != nil {
785                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
786                                                 return fmt.Sprintf(
787                                                         "Tossing area %s/%s (%s): %s: removing",
788                                                         sender.Name, pktName,
789                                                         humanize.IBytes(pktSize),
790                                                         msgHash,
791                                                 )
792                                         })
793                                         return err
794                                 } else if ctx.HdrUsage {
795                                         os.Remove(JobPath2Hdr(jobPath))
796                                 }
797                         }
798                         return nil
799                 }
800
801                 if area.Prv == nil {
802                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
803                                 return logMsg(les) + ": no private key for decoding"
804                         })
805                 } else {
806                         signatureVerify := true
807                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
808                                 if !area.AllowUnknown {
809                                         err = errors.New("unknown sender")
810                                         ctx.LogE(
811                                                 "rx-area-unknown",
812                                                 append(les, LE{"Sender", pktEnc.Sender}),
813                                                 err,
814                                                 func(les LEs) string {
815                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
816                                                 },
817                                         )
818                                         return err
819                                 }
820                                 signatureVerify = false
821                         }
822                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
823                         copy(areaNodeOur.Id[:], area.Id[:])
824                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
825                         areaNode := Node{
826                                 Id:       new(NodeId),
827                                 Name:     area.Name,
828                                 Incoming: area.Incoming,
829                                 Exec:     area.Exec,
830                         }
831                         copy(areaNode.Id[:], area.Id[:])
832                         pktName := fmt.Sprintf(
833                                 "area/%s/%s",
834                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
835                         )
836
837                         pipeR, pipeW := io.Pipe()
838                         errs := make(chan error, 1)
839                         go func() {
840                                 errs <- jobProcess(
841                                         ctx,
842                                         pipeR,
843                                         pktName,
844                                         les,
845                                         &areaNode,
846                                         nice,
847                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
848                                         "",
849                                         decompressor,
850                                         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
851                                 )
852                         }()
853                         _, _, _, err = PktEncRead(
854                                 &areaNodeOur,
855                                 ctx.Neigh,
856                                 fullPipeR,
857                                 pipeW,
858                                 signatureVerify,
859                                 nil,
860                         )
861                         if err != nil {
862                                 pipeW.CloseWithError(err)
863                                 <-errs
864                                 return err
865                         }
866                         pipeW.Close()
867                         if err = <-errs; err != nil {
868                                 return err
869                         }
870                 }
871
872                 if !dryRun && jobPath != "" {
873                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
874                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
875                                 return err
876                         }
877                         if fd, err := os.Create(seenPath); err == nil {
878                                 fd.Close()
879                                 if err = DirSync(seenDir); err != nil {
880                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
881                                         return err
882                                 }
883                         }
884                         if err = os.Remove(jobPath); err != nil {
885                                 ctx.LogE("rx", les, err, func(les LEs) string {
886                                         return fmt.Sprintf(
887                                                 "Tossing area %s/%s (%s): %s: removing",
888                                                 sender.Name, pktName,
889                                                 humanize.IBytes(pktSize),
890                                                 msgHash,
891                                         )
892                                 })
893                                 return err
894                         } else if ctx.HdrUsage {
895                                 os.Remove(JobPath2Hdr(jobPath))
896                         }
897                 }
898
899         default:
900                 err = errors.New("unknown type")
901                 ctx.LogE(
902                         "rx-type-unknown", les, err,
903                         func(les LEs) string {
904                                 return fmt.Sprintf(
905                                         "Tossing %s/%s (%s)",
906                                         sender.Name, pktName, humanize.IBytes(pktSize),
907                                 )
908                         },
909                 )
910                 return err
911         }
912         return nil
913 }
914
915 func (ctx *Ctx) Toss(
916         nodeId *NodeId,
917         xx TRxTx,
918         nice uint8,
919         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
920 ) bool {
921         dirLock, err := ctx.LockDir(nodeId, "toss")
922         if err != nil {
923                 return false
924         }
925         defer ctx.UnlockDir(dirLock)
926         isBad := false
927         decompressor, err := zstd.NewReader(nil)
928         if err != nil {
929                 panic(err)
930         }
931         defer decompressor.Close()
932         for job := range ctx.Jobs(nodeId, xx) {
933                 pktName := filepath.Base(job.Path)
934                 les := LEs{
935                         {"Node", job.PktEnc.Sender},
936                         {"Pkt", pktName},
937                         {"Nice", int(job.PktEnc.Nice)},
938                 }
939                 if job.PktEnc.Nice > nice {
940                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
941                                 return fmt.Sprintf(
942                                         "Tossing %s/%s: too nice: %s",
943                                         ctx.NodeName(job.PktEnc.Sender), pktName,
944                                         NicenessFmt(job.PktEnc.Nice),
945                                 )
946                         })
947                         continue
948                 }
949                 fd, err := os.Open(job.Path)
950                 if err != nil {
951                         ctx.LogE("rx-open", les, err, func(les LEs) string {
952                                 return fmt.Sprintf(
953                                         "Tossing %s/%s: opening %s",
954                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
955                                 )
956                         })
957                         isBad = true
958                         continue
959                 }
960                 sender := ctx.Neigh[*job.PktEnc.Sender]
961                 if sender == nil {
962                         err := errors.New("unknown node")
963                         ctx.LogE("rx-open", les, err, func(les LEs) string {
964                                 return fmt.Sprintf(
965                                         "Tossing %s/%s",
966                                         ctx.NodeName(job.PktEnc.Sender), pktName,
967                                 )
968                         })
969                         isBad = true
970                         continue
971                 }
972                 errs := make(chan error, 1)
973                 var sharedKey []byte
974         Retry:
975                 pipeR, pipeW := io.Pipe()
976                 go func() {
977                         errs <- jobProcess(
978                                 ctx,
979                                 pipeR,
980                                 pktName,
981                                 les,
982                                 sender,
983                                 job.PktEnc.Nice,
984                                 uint64(pktSizeWithoutEnc(job.Size)),
985                                 job.Path,
986                                 decompressor,
987                                 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
988                         )
989                 }()
990                 pipeWB := bufio.NewWriter(pipeW)
991                 sharedKey, _, _, err = PktEncRead(
992                         ctx.Self,
993                         ctx.Neigh,
994                         bufio.NewReader(fd),
995                         pipeWB,
996                         sharedKey == nil,
997                         sharedKey,
998                 )
999                 if err != nil {
1000                         pipeW.CloseWithError(err)
1001                 }
1002                 if err := pipeWB.Flush(); err != nil {
1003                         pipeW.CloseWithError(err)
1004                 }
1005                 pipeW.Close()
1006
1007                 if err != nil {
1008                         isBad = true
1009                         fd.Close()
1010                         <-errs
1011                         continue
1012                 }
1013                 if err = <-errs; err == JobRepeatProcess {
1014                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1015                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1016                                         return fmt.Sprintf(
1017                                                 "Tossing %s/%s: can not seek",
1018                                                 ctx.NodeName(job.PktEnc.Sender),
1019                                                 pktName,
1020                                         )
1021                                 })
1022                                 isBad = true
1023                                 break
1024                         }
1025                         goto Retry
1026                 } else if err != nil {
1027                         isBad = true
1028                 }
1029                 fd.Close()
1030         }
1031         return isBad
1032 }
1033
1034 func (ctx *Ctx) AutoToss(
1035         nodeId *NodeId,
1036         nice uint8,
1037         doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
1038 ) (chan struct{}, chan bool) {
1039         dw, err := ctx.NewDirWatcher(
1040                 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1041                 time.Second,
1042         )
1043         if err != nil {
1044                 log.Fatalln(err)
1045         }
1046         finish := make(chan struct{})
1047         badCode := make(chan bool)
1048         go func() {
1049                 bad := false
1050                 for {
1051                         select {
1052                         case <-finish:
1053                                 dw.Close()
1054                                 badCode <- bad
1055                                 return
1056                         case <-dw.C:
1057                                 bad = !ctx.Toss(
1058                                         nodeId, TRx, nice, false,
1059                                         doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
1060                         }
1061                 }
1062         }()
1063         return finish, badCode
1064 }