]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/toss.go
ACK
[nncp.git] / src / toss.go
index e383257db77c43e31ddd8127160276526376983e..ebe20fbf0dd159c626d7b91801e814ddbc34ce0b 100644 (file)
@@ -91,7 +91,7 @@ func jobProcess(
        pktSize uint64,
        jobPath string,
        decompressor *zstd.Decoder,
-       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
 ) error {
        defer pipeR.Close()
        sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
@@ -858,7 +858,7 @@ func jobProcess(
                                        uint64(pktSizeWithoutEnc(int64(pktSize))),
                                        "",
                                        decompressor,
-                                       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+                                       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
                                )
                        }()
                        _, _, _, err = PktEncRead(
@@ -908,6 +908,64 @@ func jobProcess(
                        }
                }
 
+       case PktTypeACK:
+               if noACK {
+                       return nil
+               }
+               hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
+               les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
+               logMsg := func(les LEs) string {
+                       return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
+               }
+               ctx.LogD("rx-ack", les, logMsg)
+               pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
+               if _, err := os.Stat(pktPath); err == nil {
+                       if !dryRun {
+                               if err = os.Remove(pktPath); err != nil {
+                                       ctx.LogE("rx-ack", les, err, func(les LEs) string {
+                                               return logMsg(les) + ": removing packet"
+                                       })
+                                       return err
+                               }
+                       }
+               } else {
+                       ctx.LogD("rx-ack", les, func(les LEs) string {
+                               return logMsg(les) + ": already disappeared"
+                       })
+               }
+               if !dryRun && doSeen {
+                       if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+                               return err
+                       }
+                       if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+                               fd.Close()
+                               if err = DirSync(filepath.Dir(jobPath)); err != nil {
+                                       ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "Tossing file %s/%s (%s): %s: dirsyncing",
+                                                       sender.Name, pktName,
+                                                       humanize.IBytes(pktSize),
+                                                       filepath.Base(jobPath),
+                                               )
+                                       })
+                                       return err
+                               }
+                       }
+               }
+               if !dryRun {
+                       if err = os.Remove(jobPath); err != nil {
+                               ctx.LogE("rx", les, err, func(les LEs) string {
+                                       return logMsg(les) + ": removing job"
+                               })
+                               return err
+                       } else if ctx.HdrUsage {
+                               os.Remove(JobPath2Hdr(jobPath))
+                       }
+               }
+               ctx.LogI("rx", les, func(les LEs) string {
+                       return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
+               })
+
        default:
                err = errors.New("unknown type")
                ctx.LogE(
@@ -928,7 +986,7 @@ func (ctx *Ctx) Toss(
        nodeId *NodeId,
        xx TRxTx,
        nice uint8,
-       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+       dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
 ) bool {
        dirLock, err := ctx.LockDir(nodeId, "toss")
        if err != nil {
@@ -996,7 +1054,7 @@ func (ctx *Ctx) Toss(
                                uint64(pktSizeWithoutEnc(job.Size)),
                                job.Path,
                                decompressor,
-                               dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+                               dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
                        )
                }()
                pipeWB := bufio.NewWriter(pipeW)
@@ -1046,7 +1104,7 @@ func (ctx *Ctx) Toss(
 func (ctx *Ctx) AutoToss(
        nodeId *NodeId,
        nice uint8,
-       doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+       doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
 ) (chan struct{}, chan bool) {
        dw, err := ctx.NewDirWatcher(
                filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
@@ -1068,7 +1126,7 @@ func (ctx *Ctx) AutoToss(
                        case <-dw.C:
                                bad = !ctx.Toss(
                                        nodeId, TRx, nice, false,
-                                       doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
+                                       doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad
                        }
                }
        }()