+ }
+ if !opts.DryRun {
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return logMsg(les) + ": removing job"
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
+ }
+ }
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
+ })
+
+ default:
+ err = errors.New("unknown type")
+ ctx.LogE(
+ "rx-type-unknown", les, err,
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s (%s)",
+ sender.Name, pktName, humanize.IBytes(pktSize),
+ )
+ },
+ )
+ return err
+ }
+ return nil
+}
+
+func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool {
+ dirLock, err := ctx.LockDir(nodeId, "toss")
+ if err != nil {
+ return false
+ }
+ defer ctx.UnlockDir(dirLock)
+ isBad := false
+ decompressor, err := zstd.NewReader(nil)
+ if err != nil {
+ panic(err)
+ }
+ defer decompressor.Close()
+ for job := range ctx.Jobs(nodeId, xx) {
+ pktName := filepath.Base(job.Path)
+ les := LEs{
+ {"Node", job.PktEnc.Sender},
+ {"Pkt", pktName},
+ {"Nice", int(job.PktEnc.Nice)},
+ }
+ if job.PktEnc.Nice > opts.Nice {
+ ctx.LogD("rx-too-nice", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: too nice: %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ NicenessFmt(job.PktEnc.Nice),
+ )
+ })
+ continue
+ }
+ fd, err := os.Open(job.Path)
+ if err != nil {
+ ctx.LogE("rx-open", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: opening %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
+ )
+ })
+ isBad = true
+ continue
+ }
+ sender := ctx.Neigh[*job.PktEnc.Sender]
+ if sender == nil {
+ err := errors.New("unknown node")
+ ctx.LogE("rx-open", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ )
+ })