+ }
+ if !dryRun {
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return logMsg(les) + ": removing job"
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
+ }
+ }
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
+ })
+
+ default:
+ err = errors.New("unknown type")
+ ctx.LogE(
+ "rx-type-unknown", les, err,
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s (%s)",
+ sender.Name, pktName, humanize.IBytes(pktSize),
+ )
+ },
+ )
+ return err
+ }
+ return nil
+}
+
+func (ctx *Ctx) Toss(
+ nodeId *NodeId,
+ xx TRxTx,
+ nice uint8,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
+) bool {
+ dirLock, err := ctx.LockDir(nodeId, "toss")
+ if err != nil {
+ return false
+ }
+ defer ctx.UnlockDir(dirLock)
+ isBad := false
+ decompressor, err := zstd.NewReader(nil)
+ if err != nil {
+ panic(err)
+ }
+ defer decompressor.Close()
+ for job := range ctx.Jobs(nodeId, xx) {
+ pktName := filepath.Base(job.Path)
+ les := LEs{
+ {"Node", job.PktEnc.Sender},
+ {"Pkt", pktName},
+ {"Nice", int(job.PktEnc.Nice)},
+ }
+ if job.PktEnc.Nice > nice {
+ ctx.LogD("rx-too-nice", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: too nice: %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ NicenessFmt(job.PktEnc.Nice),
+ )
+ })
+ continue
+ }
+ fd, err := os.Open(job.Path)
+ if err != nil {
+ ctx.LogE("rx-open", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: opening %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
+ )
+ })
+ isBad = true
+ continue
+ }
+ sender := ctx.Neigh[*job.PktEnc.Sender]
+ if sender == nil {
+ err := errors.New("unknown node")
+ ctx.LogE("rx-open", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ )
+ })
+ isBad = true
+ continue
+ }
+ errs := make(chan error, 1)
+ var sharedKey []byte
+ Retry:
+ pipeR, pipeW := io.Pipe()
+ go func() {
+ errs <- jobProcess(
+ ctx,
+ pipeR,
+ pktName,
+ les,
+ sender,
+ job.PktEnc.Nice,
+ uint64(pktSizeWithoutEnc(job.Size)),
+ job.Path,
+ decompressor,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
+ )
+ }()
+ pipeWB := bufio.NewWriter(pipeW)
+ sharedKey, _, _, err = PktEncRead(
+ ctx.Self,
+ ctx.Neigh,
+ bufio.NewReaderSize(fd, MTHBlockSize),
+ pipeWB,
+ sharedKey == nil,
+ sharedKey,
+ )
+ if err != nil {
+ pipeW.CloseWithError(err)
+ }
+ if err := pipeWB.Flush(); err != nil {
+ pipeW.CloseWithError(err)
+ }
+ pipeW.Close()
+
+ if err != nil {
+ isBad = true
+ fd.Close()
+ <-errs
+ continue
+ }
+ if err = <-errs; err == JobRepeatProcess {
+ if _, err = fd.Seek(0, io.SeekStart); err != nil {
+ ctx.LogE("rx-seek", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: can not seek",
+ ctx.NodeName(job.PktEnc.Sender),
+ pktName,
+ )
+ })
+ isBad = true
+ break