pktSize uint64,
jobPath string,
decompressor *zstd.Decoder,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
) error {
defer pipeR.Close()
sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
})
return err
}
- if err = tmp.Sync(); err != nil {
- tmp.Close()
- ctx.LogE("rx-sync", les, err, func(les LEs) string {
- return fmt.Sprintf(
- "Tossing file %s/%s (%s): %s: syncing",
- sender.Name, pktName,
- humanize.IBytes(pktSize), dst,
- )
- })
- return err
+ if !NoSync {
+ if err = tmp.Sync(); err != nil {
+ tmp.Close()
+ ctx.LogE("rx-sync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: syncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
+ }
}
if err = tmp.Close(); err != nil {
ctx.LogE("rx-close", les, err, func(les LEs) string {
if err != nil {
panic(err)
}
- if _, _, err = ctx.Tx(
+ if _, _, _, err = ctx.Tx(
node,
pktTrns,
nice,
}
if nodeId != sender.Id && nodeId != pktEnc.Sender {
ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
- if _, _, err = ctx.Tx(
+ if _, _, _, err = ctx.Tx(
node,
&pkt,
nice,
uint64(pktSizeWithoutEnc(int64(pktSize))),
"",
decompressor,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
)
}()
_, _, _, err = PktEncRead(
}
}
+ case PktTypeACK:
+ if noACK {
+ return nil
+ }
+ hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
+ les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
+ }
+ ctx.LogD("rx-ack", les, logMsg)
+ pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
+ if _, err := os.Stat(pktPath); err == nil {
+ if !dryRun {
+ if err = os.Remove(pktPath); err != nil {
+ ctx.LogE("rx-ack", les, err, func(les LEs) string {
+ return logMsg(les) + ": removing packet"
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(pktPath))
+ }
+ }
+ } else {
+ ctx.LogD("rx-ack", les, func(les LEs) string {
+ return logMsg(les) + ": already disappeared"
+ })
+ }
+ if !dryRun && doSeen {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+ fd.Close()
+ if err = DirSync(filepath.Dir(jobPath)); err != nil {
+ ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: dirsyncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ filepath.Base(jobPath),
+ )
+ })
+ return err
+ }
+ }
+ }
+ if !dryRun {
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return logMsg(les) + ": removing job"
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
+ }
+ }
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
+ })
+
default:
err = errors.New("unknown type")
ctx.LogE(
nodeId *NodeId,
xx TRxTx,
nice uint8,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
) bool {
dirLock, err := ctx.LockDir(nodeId, "toss")
if err != nil {
uint64(pktSizeWithoutEnc(job.Size)),
job.Path,
decompressor,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
)
}()
pipeWB := bufio.NewWriter(pipeW)
func (ctx *Ctx) AutoToss(
nodeId *NodeId,
nice uint8,
- doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+ doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
) (chan struct{}, chan bool) {
dw, err := ctx.NewDirWatcher(
filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
case <-dw.C:
bad = !ctx.Toss(
nodeId, TRx, nice, false,
- doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
+ doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad
}
}
}()