]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/toss.go
Generate ACKs during tossing
[nncp.git] / src / toss.go
index 69ad45536e2ba0427e2ba90b5b9d104e6b924301..c4710494bc1a56d8ba5d785f89e0ce44ba23eb29 100644 (file)
@@ -1,6 +1,6 @@
 /*
 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
 
 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
@@ -24,7 +24,7 @@ import (
        "errors"
        "fmt"
        "io"
-       "io/ioutil"
+       "io/fs"
        "log"
        "mime"
        "os"
@@ -43,9 +43,27 @@ import (
 )
 
 const (
-       SeenSuffix = ".seen"
+       SeenDir = "seen"
+       ACKDir  = "ack"
 )
 
+type TossOpts struct {
+       Nice   uint8
+       DryRun bool
+       DoSeen bool
+       NoFile bool
+       NoFreq bool
+       NoExec bool
+       NoTrns bool
+       NoArea bool
+       NoACK  bool
+       GenACK bool
+}
+
+func jobPath2Seen(jobPath string) string {
+       return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
+}
+
 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
        lines := []string{
                "From: " + fromTo.From,
@@ -87,7 +105,7 @@ func jobProcess(
        pktSize uint64,
        jobPath string,
        decompressor *zstd.Decoder,
-       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+       opts *TossOpts,
 ) error {
        defer pipeR.Close()
        sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
@@ -107,9 +125,36 @@ func jobProcess(
                        humanize.IBytes(pktSize),
                )
        })
+       if opts.GenACK && pkt.Type != PktTypeACK {
+               newPktName, err := ctx.TxACK(
+                       sender, sender.ACKNice, pktName, sender.ACKMinSize,
+               )
+               if err != nil {
+                       ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
+                               return fmt.Sprintf("Tossing %s/%s: generating ACK", sender.Name, pktName)
+                       })
+                       return err
+               }
+               ackDir := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), ACKDir)
+               os.MkdirAll(ackDir, os.FileMode(0777))
+               if fd, err := os.Create(filepath.Join(ackDir, newPktName)); err == nil {
+                       fd.Close()
+                       if err = DirSync(ackDir); err != nil {
+                               ctx.LogE("rx-genack", les, err, func(les LEs) string {
+                                       return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
+                               })
+                               return err
+                       }
+               } else {
+                       ctx.LogE("rx-genack", les, err, func(les LEs) string {
+                               return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
+                       })
+                       return err
+               }
+       }
        switch pkt.Type {
        case PktTypeExec, PktTypeExecFat:
-               if noExec {
+               if opts.NoExec {
                        return nil
                }
                path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
@@ -120,8 +165,8 @@ func jobProcess(
                }
                argsStr := strings.Join(append([]string{handle}, args...), " ")
                les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
-               cmdline, exists := sender.Exec[handle]
-               if !exists || len(cmdline) == 0 {
+               cmdline := sender.Exec[handle]
+               if len(cmdline) == 0 {
                        err = errors.New("No handle found")
                        ctx.LogE(
                                "rx-no-handle", les, err,
@@ -140,7 +185,7 @@ func jobProcess(
                                log.Fatalln(err)
                        }
                }
-               if !dryRun {
+               if !opts.DryRun {
                        cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
                        cmd.Env = append(
                                cmd.Env,
@@ -153,9 +198,12 @@ func jobProcess(
                        } else {
                                cmd.Stdin = pipeR
                        }
-                       output, err := cmd.Output()
+                       output, err := cmd.CombinedOutput()
                        if err != nil {
-                               ctx.LogE("rx-hande", les, err, func(les LEs) string {
+                               les = append(les, LE{"Output", strings.Split(
+                                       strings.Trim(string(output), "\n"), "\n"),
+                               })
+                               ctx.LogE("rx-handle", les, err, func(les LEs) string {
                                        return fmt.Sprintf(
                                                "Tossing exec %s/%s (%s): %s: handling",
                                                sender.Name, pktName,
@@ -165,11 +213,11 @@ func jobProcess(
                                return err
                        }
                        if len(sendmail) > 0 && ctx.NotifyExec != nil {
-                               notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
-                               if !exists {
-                                       notify, exists = ctx.NotifyExec["*."+handle]
+                               notify := ctx.NotifyExec[sender.Name+"."+handle]
+                               if notify == nil {
+                                       notify = ctx.NotifyExec["*."+handle]
                                }
-                               if exists {
+                               if notify != nil {
                                        cmd := exec.Command(
                                                sendmail[0],
                                                append(sendmail[1:], notify.To)...,
@@ -196,9 +244,12 @@ func jobProcess(
                                humanize.IBytes(pktSize),
                        )
                })
-               if !dryRun && jobPath != "" {
-                       if doSeen {
-                               if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
+               if !opts.DryRun && jobPath != "" {
+                       if opts.DoSeen {
+                               if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                                       return err
+                               }
+                               if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
                                        fd.Close()
                                        if err = DirSync(filepath.Dir(jobPath)); err != nil {
                                                ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
@@ -223,12 +274,12 @@ func jobProcess(
                                })
                                return err
                        } else if ctx.HdrUsage {
-                               os.Remove(jobPath + HdrSuffix)
+                               os.Remove(JobPath2Hdr(jobPath))
                        }
                }
 
        case PktTypeFile:
-               if noFile {
+               if opts.NoFile {
                        return nil
                }
                dst := string(pkt.Path[:int(pkt.PathLen)])
@@ -273,7 +324,7 @@ func jobProcess(
                        })
                        return err
                }
-               if !dryRun {
+               if !opts.DryRun {
                        tmp, err := TempFile(dir, "file")
                        if err != nil {
                                ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
@@ -309,7 +360,7 @@ func jobProcess(
                                return err
                        }
                        if err = bufW.Flush(); err != nil {
-                               tmp.Close() // #nosec G104
+                               tmp.Close()
                                ctx.LogE("rx-flush", les, err, func(les LEs) string {
                                        return fmt.Sprintf(
                                                "Tossing file %s/%s (%s): %s: flushing",
@@ -319,16 +370,18 @@ func jobProcess(
                                })
                                return err
                        }
-                       if err = tmp.Sync(); err != nil {
-                               tmp.Close() // #nosec G104
-                               ctx.LogE("rx-sync", les, err, func(les LEs) string {
-                                       return fmt.Sprintf(
-                                               "Tossing file %s/%s (%s): %s: syncing",
-                                               sender.Name, pktName,
-                                               humanize.IBytes(pktSize), dst,
-                                       )
-                               })
-                               return err
+                       if !NoSync {
+                               if err = tmp.Sync(); err != nil {
+                                       tmp.Close()
+                                       ctx.LogE("rx-sync", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing file %s/%s (%s): %s: syncing",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize), dst,
+                                               )
+                                       })
+                                       return err
+                               }
                        }
                        if err = tmp.Close(); err != nil {
                                ctx.LogE("rx-close", les, err, func(les LEs) string {
@@ -345,7 +398,7 @@ func jobProcess(
                        dstPathCtr := 0
                        for {
                                if _, err = os.Stat(dstPath); err != nil {
-                                       if os.IsNotExist(err) {
+                                       if errors.Is(err, fs.ErrNotExist) {
                                                break
                                        }
                                        ctx.LogE("rx-stat", les, err, func(les LEs) string {
@@ -388,10 +441,13 @@ func jobProcess(
                                dst, humanize.IBytes(pktSize), sender.Name,
                        )
                })
-               if !dryRun {
+               if !opts.DryRun {
                        if jobPath != "" {
-                               if doSeen {
-                                       if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
+                               if opts.DoSeen {
+                                       if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                                               return err
+                                       }
+                                       if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
                                                fd.Close()
                                                if err = DirSync(filepath.Dir(jobPath)); err != nil {
                                                        ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
@@ -416,7 +472,7 @@ func jobProcess(
                                        })
                                        return err
                                } else if ctx.HdrUsage {
-                                       os.Remove(jobPath + HdrSuffix)
+                                       os.Remove(JobPath2Hdr(jobPath))
                                }
                        }
                        if len(sendmail) > 0 && ctx.NotifyFile != nil {
@@ -441,7 +497,7 @@ func jobProcess(
                }
 
        case PktTypeFreq:
-               if noFreq {
+               if opts.NoFreq {
                        return nil
                }
                src := string(pkt.Path[:int(pkt.PathLen)])
@@ -460,7 +516,7 @@ func jobProcess(
                        )
                        return err
                }
-               dstRaw, err := ioutil.ReadAll(pipeR)
+               dstRaw, err := io.ReadAll(pipeR)
                if err != nil {
                        ctx.LogE("rx-read", les, err, func(les LEs) string {
                                return fmt.Sprintf(
@@ -488,7 +544,7 @@ func jobProcess(
                        )
                        return err
                }
-               if !dryRun {
+               if !opts.DryRun {
                        err = ctx.TxFile(
                                sender,
                                pkt.Nice,
@@ -513,10 +569,13 @@ func jobProcess(
                ctx.LogI("rx", les, func(les LEs) string {
                        return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
                })
-               if !dryRun {
+               if !opts.DryRun {
                        if jobPath != "" {
-                               if doSeen {
-                                       if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
+                               if opts.DoSeen {
+                                       if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                                               return err
+                                       }
+                                       if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
                                                fd.Close()
                                                if err = DirSync(filepath.Dir(jobPath)); err != nil {
                                                        ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
@@ -541,7 +600,7 @@ func jobProcess(
                                        })
                                        return err
                                } else if ctx.HdrUsage {
-                                       os.Remove(jobPath + HdrSuffix)
+                                       os.Remove(JobPath2Hdr(jobPath))
                                }
                        }
                        if len(sendmail) > 0 && ctx.NotifyFreq != nil {
@@ -565,7 +624,7 @@ func jobProcess(
                }
 
        case PktTypeTrns:
-               if noTrns {
+               if opts.NoTrns {
                        return nil
                }
                dst := new([MTHSize]byte)
@@ -587,12 +646,36 @@ func jobProcess(
                        return err
                }
                ctx.LogD("rx-tx", les, logMsg)
-               if !dryRun {
-                       if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
-                               ctx.LogE("rx", les, err, func(les LEs) string {
-                                       return logMsg(les) + ": txing"
-                               })
-                               return err
+               if !opts.DryRun {
+                       if len(node.Via) == 0 {
+                               if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
+                                       ctx.LogE("rx", les, err, func(les LEs) string {
+                                               return logMsg(les) + ": txing"
+                                       })
+                                       return err
+                               }
+                       } else {
+                               via := node.Via[:len(node.Via)-1]
+                               node = ctx.Neigh[*node.Via[len(node.Via)-1]]
+                               node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
+                               pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
+                               if err != nil {
+                                       panic(err)
+                               }
+                               if _, _, _, err = ctx.Tx(
+                                       node,
+                                       pktTrns,
+                                       nice,
+                                       int64(pktSize), 0, MaxFileSize,
+                                       pipeR,
+                                       pktName,
+                                       nil,
+                               ); err != nil {
+                                       ctx.LogE("rx", les, err, func(les LEs) string {
+                                               return logMsg(les) + ": txing"
+                                       })
+                                       return err
+                               }
                        }
                }
                ctx.LogI("rx", les, func(les LEs) string {
@@ -603,9 +686,12 @@ func jobProcess(
                                humanize.IBytes(pktSize),
                        )
                })
-               if !dryRun && jobPath != "" {
-                       if doSeen {
-                               if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
+               if !opts.DryRun && jobPath != "" {
+                       if opts.DoSeen {
+                               if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                                       return err
+                               }
+                               if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
                                        fd.Close()
                                        if err = DirSync(filepath.Dir(jobPath)); err != nil {
                                                ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
@@ -631,12 +717,12 @@ func jobProcess(
                                })
                                return err
                        } else if ctx.HdrUsage {
-                               os.Remove(jobPath + HdrSuffix)
+                               os.Remove(JobPath2Hdr(jobPath))
                        }
                }
 
        case PktTypeArea:
-               if noArea {
+               if opts.NoArea {
                        return nil
                }
                areaId := new(AreaId)
@@ -667,14 +753,14 @@ func jobProcess(
                les = append(les, LE{"AreaMsg", msgHash})
                ctx.LogD("rx-area", les, logMsg)
 
-               if dryRun {
+               if opts.DryRun {
                        for _, nodeId := range area.Subs {
                                node := ctx.Neigh[*nodeId]
                                lesEcho := append(les, LE{"Echo", nodeId})
                                seenDir := filepath.Join(
                                        ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
                                )
-                               seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
+                               seenPath := filepath.Join(seenDir, msgHash)
                                logMsgNode := func(les LEs) string {
                                        return fmt.Sprintf(
                                                "%s: echoing to: %s", logMsg(les), node.Name,
@@ -695,7 +781,7 @@ func jobProcess(
                                seenDir := filepath.Join(
                                        ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
                                )
-                               seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
+                               seenPath := filepath.Join(seenDir, msgHash)
                                logMsgNode := func(les LEs) string {
                                        return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
                                }
@@ -705,10 +791,16 @@ func jobProcess(
                                        })
                                        continue
                                }
-                               if nodeId != sender.Id {
+                               if nodeId != sender.Id && nodeId != pktEnc.Sender {
                                        ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
-                                       if _, err = ctx.Tx(
-                                               node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil,
+                                       if _, _, _, err = ctx.Tx(
+                                               node,
+                                               &pkt,
+                                               nice,
+                                               int64(pktSize), 0, MaxFileSize,
+                                               fullPipeR,
+                                               pktName,
+                                               nil,
                                        ); err != nil {
                                                ctx.LogE("rx-area", lesEcho, err, logMsgNode)
                                                return err
@@ -735,12 +827,12 @@ func jobProcess(
                seenDir := filepath.Join(
                        ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
                )
-               seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
+               seenPath := filepath.Join(seenDir, msgHash)
                if _, err := os.Stat(seenPath); err == nil {
                        ctx.LogD("rx-area-seen", les, func(les LEs) string {
                                return logMsg(les) + ": already seen"
                        })
-                       if !dryRun && jobPath != "" {
+                       if !opts.DryRun && jobPath != "" {
                                if err = os.Remove(jobPath); err != nil {
                                        ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
                                                return fmt.Sprintf(
@@ -752,7 +844,7 @@ func jobProcess(
                                        })
                                        return err
                                } else if ctx.HdrUsage {
-                                       os.Remove(jobPath + HdrSuffix)
+                                       os.Remove(JobPath2Hdr(jobPath))
                                }
                        }
                        return nil
@@ -807,7 +899,7 @@ func jobProcess(
                                        uint64(pktSizeWithoutEnc(int64(pktSize))),
                                        "",
                                        decompressor,
-                                       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+                                       opts,
                                )
                        }()
                        _, _, _, err = PktEncRead(
@@ -819,8 +911,9 @@ func jobProcess(
                                nil,
                        )
                        if err != nil {
+                               ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
                                pipeW.CloseWithError(err)
-                               go func() { <-errs }()
+                               <-errs
                                return err
                        }
                        pipeW.Close()
@@ -829,7 +922,7 @@ func jobProcess(
                        }
                }
 
-               if !dryRun && jobPath != "" {
+               if !opts.DryRun && jobPath != "" {
                        if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
                                ctx.LogE("rx-area-mkdir", les, err, logMsg)
                                return err
@@ -852,10 +945,70 @@ func jobProcess(
                                })
                                return err
                        } else if ctx.HdrUsage {
-                               os.Remove(jobPath + HdrSuffix)
+                               os.Remove(JobPath2Hdr(jobPath))
                        }
                }
 
+       case PktTypeACK:
+               if opts.NoACK {
+                       return nil
+               }
+               hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
+               les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
+               logMsg := func(les LEs) string {
+                       return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
+               }
+               ctx.LogD("rx-ack", les, logMsg)
+               pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
+               if _, err := os.Stat(pktPath); err == nil {
+                       if !opts.DryRun {
+                               if err = os.Remove(pktPath); err != nil {
+                                       ctx.LogE("rx-ack", les, err, func(les LEs) string {
+                                               return logMsg(les) + ": removing packet"
+                                       })
+                                       return err
+                               } else if ctx.HdrUsage {
+                                       os.Remove(JobPath2Hdr(pktPath))
+                               }
+                       }
+               } else {
+                       ctx.LogD("rx-ack", les, func(les LEs) string {
+                               return logMsg(les) + ": already disappeared"
+                       })
+               }
+               if !opts.DryRun && opts.DoSeen {
+                       if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                               return err
+                       }
+                       if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+                               fd.Close()
+                               if err = DirSync(filepath.Dir(jobPath)); err != nil {
+                                       ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing file %s/%s (%s): %s: dirsyncing",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize),
+                                                       filepath.Base(jobPath),
+                                               )
+                                       })
+                                       return err
+                               }
+                       }
+               }
+               if !opts.DryRun {
+                       if err = os.Remove(jobPath); err != nil {
+                               ctx.LogE("rx", les, err, func(les LEs) string {
+                                       return logMsg(les) + ": removing job"
+                               })
+                               return err
+                       } else if ctx.HdrUsage {
+                               os.Remove(JobPath2Hdr(jobPath))
+                       }
+               }
+               ctx.LogI("rx", les, func(les LEs) string {
+                       return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
+               })
+
        default:
                err = errors.New("unknown type")
                ctx.LogE(
@@ -872,12 +1025,7 @@ func jobProcess(
        return nil
 }
 
-func (ctx *Ctx) Toss(
-       nodeId *NodeId,
-       xx TRxTx,
-       nice uint8,
-       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
-) bool {
+func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool {
        dirLock, err := ctx.LockDir(nodeId, "toss")
        if err != nil {
                return false
@@ -896,7 +1044,7 @@ func (ctx *Ctx) Toss(
                        {"Pkt", pktName},
                        {"Nice", int(job.PktEnc.Nice)},
                }
-               if job.PktEnc.Nice > nice {
+               if job.PktEnc.Nice > opts.Nice {
                        ctx.LogD("rx-too-nice", les, func(les LEs) string {
                                return fmt.Sprintf(
                                        "Tossing %s/%s: too nice: %s",
@@ -917,6 +1065,18 @@ func (ctx *Ctx) Toss(
                        isBad = true
                        continue
                }
+               sender := ctx.Neigh[*job.PktEnc.Sender]
+               if sender == nil {
+                       err := errors.New("unknown node")
+                       ctx.LogE("rx-open", les, err, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Tossing %s/%s",
+                                       ctx.NodeName(job.PktEnc.Sender), pktName,
+                               )
+                       })
+                       isBad = true
+                       continue
+               }
                errs := make(chan error, 1)
                var sharedKey []byte
        Retry:
@@ -927,19 +1087,19 @@ func (ctx *Ctx) Toss(
                                pipeR,
                                pktName,
                                les,
-                               ctx.Neigh[*job.PktEnc.Sender],
+                               sender,
                                job.PktEnc.Nice,
                                uint64(pktSizeWithoutEnc(job.Size)),
                                job.Path,
                                decompressor,
-                               dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+                               opts,
                        )
                }()
                pipeWB := bufio.NewWriter(pipeW)
                sharedKey, _, _, err = PktEncRead(
                        ctx.Self,
                        ctx.Neigh,
-                       bufio.NewReader(fd),
+                       bufio.NewReaderSize(fd, MTHBlockSize),
                        pipeWB,
                        sharedKey == nil,
                        sharedKey,
@@ -955,7 +1115,7 @@ func (ctx *Ctx) Toss(
                if err != nil {
                        isBad = true
                        fd.Close()
-                       go func() { <-errs }()
+                       <-errs
                        continue
                }
                if err = <-errs; err == JobRepeatProcess {
@@ -979,11 +1139,14 @@ func (ctx *Ctx) Toss(
        return isBad
 }
 
-func (ctx *Ctx) AutoToss(
-       nodeId *NodeId,
-       nice uint8,
-       doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
-) (chan struct{}, chan bool) {
+func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) {
+       dw, err := ctx.NewDirWatcher(
+               filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
+               time.Second,
+       )
+       if err != nil {
+               log.Fatalln(err)
+       }
        finish := make(chan struct{})
        badCode := make(chan bool)
        go func() {
@@ -991,14 +1154,12 @@ func (ctx *Ctx) AutoToss(
                for {
                        select {
                        case <-finish:
+                               dw.Close()
                                badCode <- bad
-                               break
-                       default:
+                               return
+                       case <-dw.C:
+                               bad = !ctx.Toss(nodeId, TRx, opts) || bad
                        }
-                       time.Sleep(time.Second)
-                       bad = !ctx.Toss(
-                               nodeId, TRx, nice, false,
-                               doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
                }
        }()
        return finish, badCode