]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/toss.go
Remove hdr/ files during ACK tossing
[nncp.git] / src / toss.go
index 05b5a3a6a51cde6862f3f5344bf2d480681f6c1c..4cc4392a6f2404478b5893ecf34193d6b0149e5d 100644 (file)
@@ -1,6 +1,6 @@
 /*
 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
 
 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
@@ -33,6 +33,7 @@ import (
        "path/filepath"
        "strconv"
        "strings"
+       "time"
 
        xdr "github.com/davecgh/go-xdr/xdr2"
        "github.com/dustin/go-humanize"
@@ -42,9 +43,13 @@ import (
 )
 
 const (
-       SeenSuffix = ".seen"
+       SeenDir = "seen"
 )
 
+func jobPath2Seen(jobPath string) string {
+       return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
+}
+
 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
        lines := []string{
                "From: " + fromTo.From,
@@ -52,353 +57,1080 @@ func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader
                "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
        }
        if len(body) > 0 {
-               lines = append(lines, []string{
+               lines = append(
+                       lines,
                        "MIME-Version: 1.0",
                        "Content-Type: text/plain; charset=utf-8",
                        "Content-Transfer-Encoding: base64",
                        "",
                        base64.StdEncoding.EncodeToString(body),
-               }...)
+               )
        }
        return strings.NewReader(strings.Join(lines, "\n"))
 }
 
-func (ctx *Ctx) Toss(
-       nodeId *NodeId,
+func pktSizeWithoutEnc(pktSize int64) int64 {
+       pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
+       pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
+       if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
+               pktSize -= poly1305.TagSize
+       }
+       pktSize -= pktSizeBlocks * poly1305.TagSize
+       return pktSize
+}
+
+var JobRepeatProcess = errors.New("needs processing repeat")
+
+func jobProcess(
+       ctx *Ctx,
+       pipeR *io.PipeReader,
+       pktName string,
+       les LEs,
+       sender *Node,
        nice uint8,
-       dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
-) bool {
-       isBad := false
+       pktSize uint64,
+       jobPath string,
+       decompressor *zstd.Decoder,
+       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
+) error {
+       defer pipeR.Close()
        sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
-       decompressor, err := zstd.NewReader(nil)
+       var pkt Pkt
+       _, err := xdr.Unmarshal(pipeR, &pkt)
        if err != nil {
-               panic(err)
+               ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
+                       return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
+               })
+               return err
        }
-       defer decompressor.Close()
-       for job := range ctx.Jobs(nodeId, TRx) {
-               pktName := filepath.Base(job.Fd.Name())
-               sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
-               if job.PktEnc.Nice > nice {
-                       ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice")
-                       continue
+       les = append(les, LE{"Size", int64(pktSize)})
+       ctx.LogD("rx", les, func(les LEs) string {
+               return fmt.Sprintf(
+                       "Tossing %s/%s (%s)",
+                       sender.Name, pktName,
+                       humanize.IBytes(pktSize),
+               )
+       })
+       switch pkt.Type {
+       case PktTypeExec, PktTypeExecFat:
+               if noExec {
+                       return nil
                }
-               pipeR, pipeW := io.Pipe()
-               errs := make(chan error, 1)
-               go func(job Job) {
-                       pipeWB := bufio.NewWriter(pipeW)
-                       _, _, err := PktEncRead(
-                               ctx.Self,
-                               ctx.Neigh,
-                               bufio.NewReader(job.Fd),
-                               pipeWB,
+               path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
+               handle := string(path[0])
+               args := make([]string, 0, len(path)-1)
+               for _, p := range path[1:] {
+                       args = append(args, string(p))
+               }
+               argsStr := strings.Join(append([]string{handle}, args...), " ")
+               les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
+               cmdline := sender.Exec[handle]
+               if len(cmdline) == 0 {
+                       err = errors.New("No handle found")
+                       ctx.LogE(
+                               "rx-no-handle", les, err,
+                               func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing exec %s/%s (%s): %s",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), argsStr,
+                                       )
+                               },
                        )
-                       errs <- err
-                       pipeWB.Flush()
-                       pipeW.Close()
-                       job.Fd.Close()
-                       if err != nil {
-                               ctx.LogE("rx", sds, err, "decryption")
-                       }
-               }(job)
-               var pkt Pkt
-               var err error
-               var pktSize int64
-               var pktSizeBlocks int64
-               if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
-                       ctx.LogE("rx", sds, err, "unmarshal")
-                       isBad = true
-                       goto Closing
-               }
-               pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
-               pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
-               if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
-                       pktSize -= poly1305.TagSize
-               }
-               pktSize -= pktSizeBlocks * poly1305.TagSize
-               sds["size"] = pktSize
-               ctx.LogD("rx", sds, "taken")
-               switch pkt.Type {
-               case PktTypeExec:
-                       if noExec {
-                               goto Closing
-                       }
-                       path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
-                       handle := string(path[0])
-                       args := make([]string, 0, len(path)-1)
-                       for _, p := range path[1:] {
-                               args = append(args, string(p))
-                       }
-                       argsStr := strings.Join(append([]string{handle}, args...), " ")
-                       sds := SdsAdd(sds, SDS{
-                               "type": "exec",
-                               "dst":  argsStr,
-                       })
-                       sender := ctx.Neigh[*job.PktEnc.Sender]
-                       cmdline, exists := sender.Exec[handle]
-                       if !exists || len(cmdline) == 0 {
-                               ctx.LogE("rx", sds, errors.New("No handle found"), "")
-                               isBad = true
-                               goto Closing
-                       }
+                       return err
+               }
+               if pkt.Type == PktTypeExec {
                        if err = decompressor.Reset(pipeR); err != nil {
                                log.Fatalln(err)
                        }
-                       if !dryRun {
-                               cmd := exec.Command(
-                                       cmdline[0],
-                                       append(cmdline[1:len(cmdline)], args...)...,
-                               )
-                               cmd.Env = append(
-                                       cmd.Env,
-                                       "NNCP_SELF="+ctx.Self.Id.String(),
-                                       "NNCP_SENDER="+sender.Id.String(),
-                                       "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
-                               )
+               }
+               if !dryRun {
+                       cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
+                       cmd.Env = append(
+                               cmd.Env,
+                               "NNCP_SELF="+ctx.Self.Id.String(),
+                               "NNCP_SENDER="+sender.Id.String(),
+                               "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
+                       )
+                       if pkt.Type == PktTypeExec {
                                cmd.Stdin = decompressor
-                               output, err := cmd.Output()
-                               if err != nil {
-                                       ctx.LogE("rx", sds, err, "handle")
-                                       isBad = true
-                                       goto Closing
+                       } else {
+                               cmd.Stdin = pipeR
+                       }
+                       output, err := cmd.CombinedOutput()
+                       if err != nil {
+                               les = append(les, LE{"Output", strings.Split(
+                                       strings.Trim(string(output), "\n"), "\n"),
+                               })
+                               ctx.LogE("rx-handle", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing exec %s/%s (%s): %s: handling",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(uint64(pktSize)), argsStr,
+                                       )
+                               })
+                               return err
+                       }
+                       if len(sendmail) > 0 && ctx.NotifyExec != nil {
+                               notify := ctx.NotifyExec[sender.Name+"."+handle]
+                               if notify == nil {
+                                       notify = ctx.NotifyExec["*."+handle]
                                }
-                               if len(sendmail) > 0 && ctx.NotifyExec != nil {
-                                       notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
-                                       if !exists {
-                                               notify, exists = ctx.NotifyExec["*."+handle]
-                                       }
-                                       if exists {
-                                               cmd := exec.Command(
-                                                       sendmail[0],
-                                                       append(sendmail[1:len(sendmail)], notify.To)...,
-                                               )
-                                               cmd.Stdin = newNotification(notify, fmt.Sprintf(
-                                                       "Exec from %s: %s", sender.Name, argsStr,
-                                               ), output)
-                                               cmd.Run()
+                               if notify != nil {
+                                       cmd := exec.Command(
+                                               sendmail[0],
+                                               append(sendmail[1:], notify.To)...,
+                                       )
+                                       cmd.Stdin = newNotification(notify, fmt.Sprintf(
+                                               "Exec from %s: %s", sender.Name, argsStr,
+                                       ), output)
+                                       if err = cmd.Run(); err != nil {
+                                               ctx.LogE("rx-notify", les, err, func(les LEs) string {
+                                                       return fmt.Sprintf(
+                                                               "Tossing exec %s/%s (%s): %s: notifying",
+                                                               sender.Name, pktName,
+                                                               humanize.IBytes(pktSize), argsStr,
+                                                       )
+                                               })
                                        }
                                }
                        }
-                       ctx.LogI("rx", sds, "")
-                       if !dryRun {
-                               if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
-                                               fd.Close()
-                                       }
+               }
+               ctx.LogI("rx", les, func(les LEs) string {
+                       return fmt.Sprintf(
+                               "Got exec from %s to %s (%s)",
+                               sender.Name, argsStr,
+                               humanize.IBytes(pktSize),
+                       )
+               })
+               if !dryRun && jobPath != "" {
+                       if doSeen {
+                               if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                                       return err
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
-                                       ctx.LogE("rx", sds, err, "remove")
-                                       isBad = true
+                               if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+                                       fd.Close()
+                                       if err = DirSync(filepath.Dir(jobPath)); err != nil {
+                                               ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+                                                       return fmt.Sprintf(
+                                                               "Tossing file %s/%s (%s): %s: dirsyncing",
+                                                               sender.Name, pktName,
+                                                               humanize.IBytes(pktSize),
+                                                               filepath.Base(jobPath),
+                                                       )
+                                               })
+                                               return err
+                                       }
                                }
                        }
-               case PktTypeFile:
-                       if noFile {
-                               goto Closing
+                       if err = os.Remove(jobPath); err != nil {
+                               ctx.LogE("rx-notify", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing exec %s/%s (%s): %s: notifying",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), argsStr,
+                                       )
+                               })
+                               return err
+                       } else if ctx.HdrUsage {
+                               os.Remove(JobPath2Hdr(jobPath))
                        }
-                       dst := string(pkt.Path[:int(pkt.PathLen)])
-                       sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
-                       if filepath.IsAbs(dst) {
-                               ctx.LogE("rx", sds, errors.New("non-relative destination path"), "")
-                               isBad = true
-                               goto Closing
+               }
+
+       case PktTypeFile:
+               if noFile {
+                       return nil
+               }
+               dst := string(pkt.Path[:int(pkt.PathLen)])
+               les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
+               if filepath.IsAbs(dst) {
+                       err = errors.New("non-relative destination path")
+                       ctx.LogE(
+                               "rx-non-rel", les, err,
+                               func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing file %s/%s (%s): %s",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), dst,
+                                       )
+                               },
+                       )
+                       return err
+               }
+               incoming := sender.Incoming
+               if incoming == nil {
+                       err = errors.New("incoming is not allowed")
+                       ctx.LogE(
+                               "rx-no-incoming", les, err,
+                               func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing file %s/%s (%s): %s",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), dst,
+                                       )
+                               },
+                       )
+                       return err
+               }
+               dir := filepath.Join(*incoming, path.Dir(dst))
+               if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
+                       ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tossing file %s/%s (%s): %s: mkdir",
+                                       sender.Name, pktName,
+                                       humanize.IBytes(pktSize), dst,
+                               )
+                       })
+                       return err
+               }
+               if !dryRun {
+                       tmp, err := TempFile(dir, "file")
+                       if err != nil {
+                               ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing file %s/%s (%s): %s: mktemp",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), dst,
+                                       )
+                               })
+                               return err
                        }
-                       incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
-                       if incoming == nil {
-                               ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "")
-                               isBad = true
-                               goto Closing
+                       les = append(les, LE{"Tmp", tmp.Name()})
+                       ctx.LogD("rx-tmp-created", les, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tossing file %s/%s (%s): %s: created: %s",
+                                       sender.Name, pktName,
+                                       humanize.IBytes(pktSize), dst, tmp.Name(),
+                               )
+                       })
+                       bufW := bufio.NewWriter(tmp)
+                       if _, err = CopyProgressed(
+                               bufW, pipeR, "Rx file",
+                               append(les, LE{"FullSize", int64(pktSize)}),
+                               ctx.ShowPrgrs,
+                       ); err != nil {
+                               ctx.LogE("rx-copy", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing file %s/%s (%s): %s: copying",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), dst,
+                                       )
+                               })
+                               return err
                        }
-                       dir := filepath.Join(*incoming, path.Dir(dst))
-                       if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
-                               ctx.LogE("rx", sds, err, "mkdir")
-                               isBad = true
-                               goto Closing
+                       if err = bufW.Flush(); err != nil {
+                               tmp.Close()
+                               ctx.LogE("rx-flush", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing file %s/%s (%s): %s: flushing",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), dst,
+                                       )
+                               })
+                               return err
                        }
-                       if !dryRun {
-                               tmp, err := TempFile(dir, "file")
-                               if err != nil {
-                                       ctx.LogE("rx", sds, err, "mktemp")
-                                       isBad = true
-                                       goto Closing
-                               }
-                               sds["tmp"] = tmp.Name()
-                               ctx.LogD("rx", sds, "created")
-                               bufW := bufio.NewWriter(tmp)
-                               if _, err = CopyProgressed(
-                                       bufW, pipeR, "Rx file",
-                                       SdsAdd(sds, SDS{"fullsize": sds["size"]}),
-                                       ctx.ShowPrgrs,
-                               ); err != nil {
-                                       ctx.LogE("rx", sds, err, "copy")
-                                       isBad = true
-                                       goto Closing
-                               }
-                               if err = bufW.Flush(); err != nil {
-                                       tmp.Close()
-                                       ctx.LogE("rx", sds, err, "copy")
-                                       isBad = true
-                                       goto Closing
-                               }
+                       if !NoSync {
                                if err = tmp.Sync(); err != nil {
                                        tmp.Close()
-                                       ctx.LogE("rx", sds, err, "copy")
-                                       isBad = true
-                                       goto Closing
+                                       ctx.LogE("rx-sync", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing file %s/%s (%s): %s: syncing",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize), dst,
+                                               )
+                                       })
+                                       return err
                                }
-                               tmp.Close()
-                               dstPathOrig := filepath.Join(*incoming, dst)
-                               dstPath := dstPathOrig
-                               dstPathCtr := 0
-                               for {
-                                       if _, err = os.Stat(dstPath); err != nil {
-                                               if os.IsNotExist(err) {
-                                                       break
+                       }
+                       if err = tmp.Close(); err != nil {
+                               ctx.LogE("rx-close", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing file %s/%s (%s): %s: closing",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), dst,
+                                       )
+                               })
+                               return err
+                       }
+                       dstPathOrig := filepath.Join(*incoming, dst)
+                       dstPath := dstPathOrig
+                       dstPathCtr := 0
+                       for {
+                               if _, err = os.Stat(dstPath); err != nil {
+                                       if os.IsNotExist(err) {
+                                               break
+                                       }
+                                       ctx.LogE("rx-stat", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing file %s/%s (%s): %s: stating: %s",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize), dst, dstPath,
+                                               )
+                                       })
+                                       return err
+                               }
+                               dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
+                               dstPathCtr++
+                       }
+                       if err = os.Rename(tmp.Name(), dstPath); err != nil {
+                               ctx.LogE("rx-rename", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing file %s/%s (%s): %s: renaming",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), dst,
+                                       )
+                               })
+                               return err
+                       }
+                       if err = DirSync(*incoming); err != nil {
+                               ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing file %s/%s (%s): %s: dirsyncing",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), dst,
+                                       )
+                               })
+                               return err
+                       }
+                       les = les[:len(les)-1] // delete Tmp
+               }
+               ctx.LogI("rx", les, func(les LEs) string {
+                       return fmt.Sprintf(
+                               "Got file %s (%s) from %s",
+                               dst, humanize.IBytes(pktSize), sender.Name,
+                       )
+               })
+               if !dryRun {
+                       if jobPath != "" {
+                               if doSeen {
+                                       if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                                               return err
+                                       }
+                                       if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+                                               fd.Close()
+                                               if err = DirSync(filepath.Dir(jobPath)); err != nil {
+                                                       ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+                                                               return fmt.Sprintf(
+                                                                       "Tossing file %s/%s (%s): %s: dirsyncing",
+                                                                       sender.Name, pktName,
+                                                                       humanize.IBytes(pktSize),
+                                                                       filepath.Base(jobPath),
+                                                               )
+                                                       })
+                                                       return err
                                                }
-                                               ctx.LogE("rx", sds, err, "stat")
-                                               isBad = true
-                                               goto Closing
                                        }
-                                       dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
-                                       dstPathCtr++
                                }
-                               if err = os.Rename(tmp.Name(), dstPath); err != nil {
-                                       ctx.LogE("rx", sds, err, "rename")
-                                       isBad = true
+                               if err = os.Remove(jobPath); err != nil {
+                                       ctx.LogE("rx-remove", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing file %s/%s (%s): %s: removing",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize), dst,
+                                               )
+                                       })
+                                       return err
+                               } else if ctx.HdrUsage {
+                                       os.Remove(JobPath2Hdr(jobPath))
                                }
-                               if err = DirSync(*incoming); err != nil {
-                                       ctx.LogE("rx", sds, err, "sync")
-                                       isBad = true
+                       }
+                       if len(sendmail) > 0 && ctx.NotifyFile != nil {
+                               cmd := exec.Command(
+                                       sendmail[0],
+                                       append(sendmail[1:], ctx.NotifyFile.To)...,
+                               )
+                               cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
+                                       "File from %s: %s (%s)",
+                                       sender.Name, dst, humanize.IBytes(pktSize),
+                               ), nil)
+                               if err = cmd.Run(); err != nil {
+                                       ctx.LogE("rx-notify", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing file %s/%s (%s): %s: notifying",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize), dst,
+                                               )
+                                       })
                                }
-                               delete(sds, "tmp")
                        }
-                       ctx.LogI("rx", sds, "")
-                       if !dryRun {
+               }
+
+       case PktTypeFreq:
+               if noFreq {
+                       return nil
+               }
+               src := string(pkt.Path[:int(pkt.PathLen)])
+               les := append(les, LE{"Type", "freq"}, LE{"Src", src})
+               if filepath.IsAbs(src) {
+                       err = errors.New("non-relative source path")
+                       ctx.LogE(
+                               "rx-non-rel", les, err,
+                               func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing freq %s/%s (%s): %s: notifying",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), src,
+                                       )
+                               },
+                       )
+                       return err
+               }
+               dstRaw, err := ioutil.ReadAll(pipeR)
+               if err != nil {
+                       ctx.LogE("rx-read", les, err, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tossing freq %s/%s (%s): %s: reading",
+                                       sender.Name, pktName,
+                                       humanize.IBytes(pktSize), src,
+                               )
+                       })
+                       return err
+               }
+               dst := string(dstRaw)
+               les = append(les, LE{"Dst", dst})
+               freqPath := sender.FreqPath
+               if freqPath == nil {
+                       err = errors.New("freqing is not allowed")
+                       ctx.LogE(
+                               "rx-no-freq", les, err,
+                               func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing freq %s/%s (%s): %s -> %s",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), src, dst,
+                                       )
+                               },
+                       )
+                       return err
+               }
+               if !dryRun {
+                       err = ctx.TxFile(
+                               sender,
+                               pkt.Nice,
+                               filepath.Join(*freqPath, src),
+                               dst,
+                               sender.FreqChunked,
+                               sender.FreqMinSize,
+                               sender.FreqMaxSize,
+                               nil,
+                       )
+                       if err != nil {
+                               ctx.LogE("rx-tx", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing freq %s/%s (%s): %s -> %s: txing",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize), src, dst,
+                                       )
+                               })
+                               return err
+                       }
+               }
+               ctx.LogI("rx", les, func(les LEs) string {
+                       return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
+               })
+               if !dryRun {
+                       if jobPath != "" {
                                if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                       if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                                               return err
+                                       }
+                                       if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
                                                fd.Close()
+                                               if err = DirSync(filepath.Dir(jobPath)); err != nil {
+                                                       ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+                                                               return fmt.Sprintf(
+                                                                       "Tossing file %s/%s (%s): %s: dirsyncing",
+                                                                       sender.Name, pktName,
+                                                                       humanize.IBytes(pktSize),
+                                                                       filepath.Base(jobPath),
+                                                               )
+                                                       })
+                                                       return err
+                                               }
                                        }
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
-                                       ctx.LogE("rx", sds, err, "remove")
-                                       isBad = true
+                               if err = os.Remove(jobPath); err != nil {
+                                       ctx.LogE("rx-remove", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing freq %s/%s (%s): %s -> %s: removing",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize), src, dst,
+                                               )
+                                       })
+                                       return err
+                               } else if ctx.HdrUsage {
+                                       os.Remove(JobPath2Hdr(jobPath))
                                }
-                               if len(sendmail) > 0 && ctx.NotifyFile != nil {
-                                       cmd := exec.Command(
-                                               sendmail[0],
-                                               append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
-                                       )
-                                       cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
-                                               "File from %s: %s (%s)",
-                                               ctx.Neigh[*job.PktEnc.Sender].Name,
-                                               dst,
-                                               humanize.IBytes(uint64(pktSize)),
-                                       ), nil)
-                                       cmd.Run()
+                       }
+                       if len(sendmail) > 0 && ctx.NotifyFreq != nil {
+                               cmd := exec.Command(
+                                       sendmail[0],
+                                       append(sendmail[1:], ctx.NotifyFreq.To)...,
+                               )
+                               cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
+                                       "Freq from %s: %s", sender.Name, src,
+                               ), nil)
+                               if err = cmd.Run(); err != nil {
+                                       ctx.LogE("rx-notify", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing freq %s/%s (%s): %s -> %s: notifying",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize), src, dst,
+                                               )
+                                       })
                                }
                        }
-               case PktTypeFreq:
-                       if noFreq {
-                               goto Closing
+               }
+
+       case PktTypeTrns:
+               if noTrns {
+                       return nil
+               }
+               dst := new([MTHSize]byte)
+               copy(dst[:], pkt.Path[:int(pkt.PathLen)])
+               nodeId := NodeId(*dst)
+               les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
+               logMsg := func(les LEs) string {
+                       return fmt.Sprintf(
+                               "Tossing trns %s/%s (%s): %s",
+                               sender.Name, pktName,
+                               humanize.IBytes(pktSize),
+                               nodeId.String(),
+                       )
+               }
+               node := ctx.Neigh[nodeId]
+               if node == nil {
+                       err = errors.New("unknown node")
+                       ctx.LogE("rx-unknown", les, err, logMsg)
+                       return err
+               }
+               ctx.LogD("rx-tx", les, logMsg)
+               if !dryRun {
+                       if len(node.Via) == 0 {
+                               if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
+                                       ctx.LogE("rx", les, err, func(les LEs) string {
+                                               return logMsg(les) + ": txing"
+                                       })
+                                       return err
+                               }
+                       } else {
+                               via := node.Via[:len(node.Via)-1]
+                               node = ctx.Neigh[*node.Via[len(node.Via)-1]]
+                               node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
+                               pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
+                               if err != nil {
+                                       panic(err)
+                               }
+                               if _, _, err = ctx.Tx(
+                                       node,
+                                       pktTrns,
+                                       nice,
+                                       int64(pktSize), 0, MaxFileSize,
+                                       pipeR,
+                                       pktName,
+                                       nil,
+                               ); err != nil {
+                                       ctx.LogE("rx", les, err, func(les LEs) string {
+                                               return logMsg(les) + ": txing"
+                                       })
+                                       return err
+                               }
                        }
-                       src := string(pkt.Path[:int(pkt.PathLen)])
-                       if filepath.IsAbs(src) {
-                               ctx.LogE("rx", sds, errors.New("non-relative source path"), "")
-                               isBad = true
-                               goto Closing
+               }
+               ctx.LogI("rx", les, func(les LEs) string {
+                       return fmt.Sprintf(
+                               "Got transitional packet from %s to %s (%s)",
+                               sender.Name,
+                               ctx.NodeName(&nodeId),
+                               humanize.IBytes(pktSize),
+                       )
+               })
+               if !dryRun && jobPath != "" {
+                       if doSeen {
+                               if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                                       return err
+                               }
+                               if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+                                       fd.Close()
+                                       if err = DirSync(filepath.Dir(jobPath)); err != nil {
+                                               ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+                                                       return fmt.Sprintf(
+                                                               "Tossing file %s/%s (%s): %s: dirsyncing",
+                                                               sender.Name, pktName,
+                                                               humanize.IBytes(pktSize),
+                                                               filepath.Base(jobPath),
+                                                       )
+                                               })
+                                               return err
+                                       }
+                               }
                        }
-                       sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
-                       dstRaw, err := ioutil.ReadAll(pipeR)
-                       if err != nil {
-                               ctx.LogE("rx", sds, err, "read")
-                               isBad = true
-                               goto Closing
-                       }
-                       dst := string(dstRaw)
-                       sds["dst"] = dst
-                       sender := ctx.Neigh[*job.PktEnc.Sender]
-                       freqPath := sender.FreqPath
-                       if freqPath == nil {
-                               ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "")
-                               isBad = true
-                               goto Closing
+                       if err = os.Remove(jobPath); err != nil {
+                               ctx.LogE("rx", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing trns %s/%s (%s): %s: removing",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize),
+                                               ctx.NodeName(&nodeId),
+                                       )
+                               })
+                               return err
+                       } else if ctx.HdrUsage {
+                               os.Remove(JobPath2Hdr(jobPath))
                        }
-                       if !dryRun {
-                               err = ctx.TxFile(
-                                       sender,
-                                       pkt.Nice,
-                                       filepath.Join(*freqPath, src),
-                                       dst,
-                                       sender.FreqChunked,
-                                       sender.FreqMinSize,
-                                       sender.FreqMaxSize,
+               }
+
+       case PktTypeArea:
+               if noArea {
+                       return nil
+               }
+               areaId := new(AreaId)
+               copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
+               les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
+               logMsg := func(les LEs) string {
+                       return fmt.Sprintf(
+                               "Tossing %s/%s (%s): area %s",
+                               sender.Name, pktName,
+                               humanize.IBytes(pktSize),
+                               ctx.AreaName(areaId),
+                       )
+               }
+               area := ctx.AreaId2Area[*areaId]
+               if area == nil {
+                       err = errors.New("unknown area")
+                       ctx.LogE("rx-area-unknown", les, err, logMsg)
+                       return err
+               }
+               pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
+               fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
+               if err != nil {
+                       ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
+                       return err
+               }
+               msgHashRaw := blake2b.Sum256(pktEncRaw)
+               msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
+               les = append(les, LE{"AreaMsg", msgHash})
+               ctx.LogD("rx-area", les, logMsg)
+
+               if dryRun {
+                       for _, nodeId := range area.Subs {
+                               node := ctx.Neigh[*nodeId]
+                               lesEcho := append(les, LE{"Echo", nodeId})
+                               seenDir := filepath.Join(
+                                       ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
                                )
-                               if err != nil {
-                                       ctx.LogE("rx", sds, err, "tx file")
-                                       isBad = true
-                                       goto Closing
+                               seenPath := filepath.Join(seenDir, msgHash)
+                               logMsgNode := func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "%s: echoing to: %s", logMsg(les), node.Name,
+                                       )
+                               }
+                               if _, err := os.Stat(seenPath); err == nil {
+                                       ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
+                                               return logMsgNode(les) + ": already sent"
+                                       })
+                                       continue
                                }
+                               ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
                        }
-                       ctx.LogI("rx", sds, "")
-                       if !dryRun {
-                               if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
-                                               fd.Close()
+               } else {
+                       for _, nodeId := range area.Subs {
+                               node := ctx.Neigh[*nodeId]
+                               lesEcho := append(les, LE{"Echo", nodeId})
+                               seenDir := filepath.Join(
+                                       ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
+                               )
+                               seenPath := filepath.Join(seenDir, msgHash)
+                               logMsgNode := func(les LEs) string {
+                                       return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
+                               }
+                               if _, err := os.Stat(seenPath); err == nil {
+                                       ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
+                                               return logMsgNode(les) + ": already sent"
+                                       })
+                                       continue
+                               }
+                               if nodeId != sender.Id && nodeId != pktEnc.Sender {
+                                       ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
+                                       if _, _, err = ctx.Tx(
+                                               node,
+                                               &pkt,
+                                               nice,
+                                               int64(pktSize), 0, MaxFileSize,
+                                               fullPipeR,
+                                               pktName,
+                                               nil,
+                                       ); err != nil {
+                                               ctx.LogE("rx-area", lesEcho, err, logMsgNode)
+                                               return err
                                        }
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
-                                       ctx.LogE("rx", sds, err, "remove")
-                                       isBad = true
+                               if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
+                                       ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
+                                       return err
                                }
-                               if len(sendmail) > 0 && ctx.NotifyFreq != nil {
-                                       cmd := exec.Command(
-                                               sendmail[0],
-                                               append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
+                               if fd, err := os.Create(seenPath); err == nil {
+                                       fd.Close()
+                                       if err = DirSync(seenDir); err != nil {
+                                               ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
+                                               return err
+                                       }
+                               } else {
+                                       ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
+                                       return err
+                               }
+                               return JobRepeatProcess
+                       }
+               }
+
+               seenDir := filepath.Join(
+                       ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
+               )
+               seenPath := filepath.Join(seenDir, msgHash)
+               if _, err := os.Stat(seenPath); err == nil {
+                       ctx.LogD("rx-area-seen", les, func(les LEs) string {
+                               return logMsg(les) + ": already seen"
+                       })
+                       if !dryRun && jobPath != "" {
+                               if err = os.Remove(jobPath); err != nil {
+                                       ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing area %s/%s (%s): %s: removing",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize),
+                                                       msgHash,
+                                               )
+                                       })
+                                       return err
+                               } else if ctx.HdrUsage {
+                                       os.Remove(JobPath2Hdr(jobPath))
+                               }
+                       }
+                       return nil
+               }
+
+               if area.Prv == nil {
+                       ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
+                               return logMsg(les) + ": no private key for decoding"
+                       })
+               } else {
+                       signatureVerify := true
+                       if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
+                               if !area.AllowUnknown {
+                                       err = errors.New("unknown sender")
+                                       ctx.LogE(
+                                               "rx-area-unknown",
+                                               append(les, LE{"Sender", pktEnc.Sender}),
+                                               err,
+                                               func(les LEs) string {
+                                                       return logMsg(les) + ": sender: " + pktEnc.Sender.String()
+                                               },
                                        )
-                                       cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
-                                               "Freq from %s: %s", sender.Name, src,
-                                       ), nil)
-                                       cmd.Run()
+                                       return err
                                }
+                               signatureVerify = false
                        }
-               case PktTypeTrns:
-                       if noTrns {
-                               goto Closing
-                       }
-                       dst := new([blake2b.Size256]byte)
-                       copy(dst[:], pkt.Path[:int(pkt.PathLen)])
-                       nodeId := NodeId(*dst)
-                       node, known := ctx.Neigh[nodeId]
-                       sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
-                       if !known {
-                               ctx.LogE("rx", sds, errors.New("unknown node"), "")
-                               isBad = true
-                               goto Closing
+                       areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
+                       copy(areaNodeOur.Id[:], area.Id[:])
+                       copy(areaNodeOur.ExchPrv[:], area.Prv[:])
+                       areaNode := Node{
+                               Id:       new(NodeId),
+                               Name:     area.Name,
+                               Incoming: area.Incoming,
+                               Exec:     area.Exec,
                        }
-                       ctx.LogD("rx", sds, "taken")
-                       if !dryRun {
-                               if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
-                                       ctx.LogE("rx", sds, err, "tx trns")
-                                       isBad = true
-                                       goto Closing
+                       copy(areaNode.Id[:], area.Id[:])
+                       pktName := fmt.Sprintf(
+                               "area/%s/%s",
+                               Base32Codec.EncodeToString(areaId[:]), msgHash,
+                       )
+
+                       pipeR, pipeW := io.Pipe()
+                       errs := make(chan error, 1)
+                       go func() {
+                               errs <- jobProcess(
+                                       ctx,
+                                       pipeR,
+                                       pktName,
+                                       les,
+                                       &areaNode,
+                                       nice,
+                                       uint64(pktSizeWithoutEnc(int64(pktSize))),
+                                       "",
+                                       decompressor,
+                                       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
+                               )
+                       }()
+                       _, _, _, err = PktEncRead(
+                               &areaNodeOur,
+                               ctx.Neigh,
+                               fullPipeR,
+                               pipeW,
+                               signatureVerify,
+                               nil,
+                       )
+                       if err != nil {
+                               ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
+                               pipeW.CloseWithError(err)
+                               <-errs
+                               return err
+                       }
+                       pipeW.Close()
+                       if err = <-errs; err != nil {
+                               return err
+                       }
+               }
+
+               if !dryRun && jobPath != "" {
+                       if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
+                               ctx.LogE("rx-area-mkdir", les, err, logMsg)
+                               return err
+                       }
+                       if fd, err := os.Create(seenPath); err == nil {
+                               fd.Close()
+                               if err = DirSync(seenDir); err != nil {
+                                       ctx.LogE("rx-area-dirsync", les, err, logMsg)
+                                       return err
                                }
                        }
-                       ctx.LogI("rx", sds, "")
+                       if err = os.Remove(jobPath); err != nil {
+                               ctx.LogE("rx", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing area %s/%s (%s): %s: removing",
+                                               sender.Name, pktName,
+                                               humanize.IBytes(pktSize),
+                                               msgHash,
+                                       )
+                               })
+                               return err
+                       } else if ctx.HdrUsage {
+                               os.Remove(JobPath2Hdr(jobPath))
+                       }
+               }
+
+       case PktTypeACK:
+               if noACK {
+                       return nil
+               }
+               hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
+               les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
+               logMsg := func(les LEs) string {
+                       return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
+               }
+               ctx.LogD("rx-ack", les, logMsg)
+               pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
+               if _, err := os.Stat(pktPath); err == nil {
                        if !dryRun {
-                               if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
-                                               fd.Close()
-                                       }
+                               if err = os.Remove(pktPath); err != nil {
+                                       ctx.LogE("rx-ack", les, err, func(les LEs) string {
+                                               return logMsg(les) + ": removing packet"
+                                       })
+                                       return err
+                               } else if ctx.HdrUsage {
+                                       os.Remove(JobPath2Hdr(pktPath))
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
-                                       ctx.LogE("rx", sds, err, "remove")
-                                       isBad = true
+                       }
+               } else {
+                       ctx.LogD("rx-ack", les, func(les LEs) string {
+                               return logMsg(les) + ": already disappeared"
+                       })
+               }
+               if !dryRun && doSeen {
+                       if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                               return err
+                       }
+                       if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+                               fd.Close()
+                               if err = DirSync(filepath.Dir(jobPath)); err != nil {
+                                       ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing file %s/%s (%s): %s: dirsyncing",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize),
+                                                       filepath.Base(jobPath),
+                                               )
+                                       })
+                                       return err
                                }
                        }
-               default:
-                       ctx.LogE("rx", sds, errors.New("unknown type"), "")
+               }
+               if !dryRun {
+                       if err = os.Remove(jobPath); err != nil {
+                               ctx.LogE("rx", les, err, func(les LEs) string {
+                                       return logMsg(les) + ": removing job"
+                               })
+                               return err
+                       } else if ctx.HdrUsage {
+                               os.Remove(JobPath2Hdr(jobPath))
+                       }
+               }
+               ctx.LogI("rx", les, func(les LEs) string {
+                       return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
+               })
+
+       default:
+               err = errors.New("unknown type")
+               ctx.LogE(
+                       "rx-type-unknown", les, err,
+                       func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tossing %s/%s (%s)",
+                                       sender.Name, pktName, humanize.IBytes(pktSize),
+                               )
+                       },
+               )
+               return err
+       }
+       return nil
+}
+
+func (ctx *Ctx) Toss(
+       nodeId *NodeId,
+       xx TRxTx,
+       nice uint8,
+       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
+) bool {
+       dirLock, err := ctx.LockDir(nodeId, "toss")
+       if err != nil {
+               return false
+       }
+       defer ctx.UnlockDir(dirLock)
+       isBad := false
+       decompressor, err := zstd.NewReader(nil)
+       if err != nil {
+               panic(err)
+       }
+       defer decompressor.Close()
+       for job := range ctx.Jobs(nodeId, xx) {
+               pktName := filepath.Base(job.Path)
+               les := LEs{
+                       {"Node", job.PktEnc.Sender},
+                       {"Pkt", pktName},
+                       {"Nice", int(job.PktEnc.Nice)},
+               }
+               if job.PktEnc.Nice > nice {
+                       ctx.LogD("rx-too-nice", les, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tossing %s/%s: too nice: %s",
+                                       ctx.NodeName(job.PktEnc.Sender), pktName,
+                                       NicenessFmt(job.PktEnc.Nice),
+                               )
+                       })
+                       continue
+               }
+               fd, err := os.Open(job.Path)
+               if err != nil {
+                       ctx.LogE("rx-open", les, err, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tossing %s/%s: opening %s",
+                                       ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
+                               )
+                       })
                        isBad = true
+                       continue
                }
-       Closing:
-               pipeR.Close()
+               sender := ctx.Neigh[*job.PktEnc.Sender]
+               if sender == nil {
+                       err := errors.New("unknown node")
+                       ctx.LogE("rx-open", les, err, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tossing %s/%s",
+                                       ctx.NodeName(job.PktEnc.Sender), pktName,
+                               )
+                       })
+                       isBad = true
+                       continue
+               }
+               errs := make(chan error, 1)
+               var sharedKey []byte
+       Retry:
+               pipeR, pipeW := io.Pipe()
+               go func() {
+                       errs <- jobProcess(
+                               ctx,
+                               pipeR,
+                               pktName,
+                               les,
+                               sender,
+                               job.PktEnc.Nice,
+                               uint64(pktSizeWithoutEnc(job.Size)),
+                               job.Path,
+                               decompressor,
+                               dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
+                       )
+               }()
+               pipeWB := bufio.NewWriter(pipeW)
+               sharedKey, _, _, err = PktEncRead(
+                       ctx.Self,
+                       ctx.Neigh,
+                       bufio.NewReader(fd),
+                       pipeWB,
+                       sharedKey == nil,
+                       sharedKey,
+               )
+               if err != nil {
+                       pipeW.CloseWithError(err)
+               }
+               if err := pipeWB.Flush(); err != nil {
+                       pipeW.CloseWithError(err)
+               }
+               pipeW.Close()
+
+               if err != nil {
+                       isBad = true
+                       fd.Close()
+                       <-errs
+                       continue
+               }
+               if err = <-errs; err == JobRepeatProcess {
+                       if _, err = fd.Seek(0, io.SeekStart); err != nil {
+                               ctx.LogE("rx-seek", les, err, func(les LEs) string {
+                                       return fmt.Sprintf(
+                                               "Tossing %s/%s: can not seek",
+                                               ctx.NodeName(job.PktEnc.Sender),
+                                               pktName,
+                                       )
+                               })
+                               isBad = true
+                               break
+                       }
+                       goto Retry
+               } else if err != nil {
+                       isBad = true
+               }
+               fd.Close()
        }
        return isBad
 }
+
+func (ctx *Ctx) AutoToss(
+       nodeId *NodeId,
+       nice uint8,
+       doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
+) (chan struct{}, chan bool) {
+       dw, err := ctx.NewDirWatcher(
+               filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
+               time.Second,
+       )
+       if err != nil {
+               log.Fatalln(err)
+       }
+       finish := make(chan struct{})
+       badCode := make(chan bool)
+       go func() {
+               bad := false
+               for {
+                       select {
+                       case <-finish:
+                               dw.Close()
+                               badCode <- bad
+                               return
+                       case <-dw.C:
+                               bad = !ctx.Toss(
+                                       nodeId, TRx, nice, false,
+                                       doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad
+                       }
+               }
+       }()
+       return finish, badCode
+}