]> Cypherpunks.ru repositories - nncp.git/commitdiff
SPCheckers are always running
authorSergey Matveev <stargrave@stargrave.org>
Sun, 21 Feb 2021 10:29:03 +0000 (13:29 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sun, 21 Feb 2021 10:29:03 +0000 (13:29 +0300)
src/sp.go

index e1207693cd9c985d13c9d67e9d250e15a5691304..6f1af9ff7e2ed6815b4740afdf55946f1cce48c9 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -40,6 +40,11 @@ const (
        SPHeadOverhead = 4
 )
 
+type SPCheckerQueues struct {
+       appeared chan *[32]byte
+       checked  chan *[32]byte
+}
+
 var (
        MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
 
@@ -57,6 +62,8 @@ var (
 
        DefaultDeadline = 10 * time.Second
        PingTimeout     = time.Minute
+
+       spCheckers = make(map[NodeId]*SPCheckerQueues)
 )
 
 type FdAndFullSize struct {
@@ -225,7 +232,7 @@ type SPState struct {
        writeSPBuf     bytes.Buffer
        fds            map[string]FdAndFullSize
        fileHashers    map[string]*HasherAndOffset
-       checkerJobs    chan *[32]byte
+       checkerQueues  SPCheckerQueues
        sync.RWMutex
 }
 
@@ -252,9 +259,6 @@ func (state *SPState) SetDead() {
                        s.fd.Close()
                }
        }()
-       if !state.NoCK {
-               close(state.checkerJobs)
-       }
 }
 
 func (state *SPState) NotAlive() bool {
@@ -271,28 +275,22 @@ func (state *SPState) dirUnlock() {
        state.Ctx.UnlockDir(state.txLock)
 }
 
-func (state *SPState) SPChecker() {
-       for hshValue := range state.checkerJobs {
+func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
+       for hshValue := range appeared {
                les := LEs{
                        {"XX", string(TRx)},
-                       {"Node", state.Node.Id},
+                       {"Node", nodeId},
                        {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
                }
-               state.Ctx.LogD("sp-file", les, "checking")
-               size, err := state.Ctx.CheckNoCK(state.Node.Id, hshValue)
+               ctx.LogD("sp-checker", les, "checking")
+               size, err := ctx.CheckNoCK(nodeId, hshValue)
                les = append(les, LE{"Size", size})
                if err != nil {
-                       state.Ctx.LogE("sp-file", les, err, "")
+                       ctx.LogE("sp-checker", les, err, "")
                        continue
                }
-               state.Ctx.LogI("sp-done", les, "")
-               state.wg.Add(1)
-               go func(hsh *[32]byte) {
-                       if !state.NotAlive() {
-                               state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
-                       }
-                       state.wg.Done()
-               }(hshValue)
+               ctx.LogI("sp-done", les, "")
+               go func(hsh *[32]byte) { checked <- hsh }(hshValue)
        }
 }
 
@@ -583,20 +581,6 @@ func (state *SPState) closeFd(pth string) {
        }
 }
 
-func (state *SPState) FillExistingNoCK() {
-       checkerJobs := make([]*[32]byte, 0)
-       for job := range state.Ctx.JobsNoCK(state.Node.Id) {
-               if job.PktEnc.Nice > state.Nice {
-                       continue
-               }
-               checkerJobs = append(checkerJobs, job.HshValue)
-       }
-       for _, job := range checkerJobs {
-               state.checkerJobs <- job
-       }
-       state.wg.Done()
-}
-
 func (state *SPState) StartWorkers(
        conn ConnDeadlined,
        infosPayloads [][]byte,
@@ -612,10 +596,35 @@ func (state *SPState) StartWorkers(
 
        // Checker
        if !state.NoCK {
-               state.checkerJobs = make(chan *[32]byte)
-               go state.SPChecker()
+               queues := spCheckers[*state.Node.Id]
+               if queues == nil {
+                       queues = &SPCheckerQueues{
+                               appeared: make(chan *[32]byte),
+                               checked:  make(chan *[32]byte),
+                       }
+                       spCheckers[*state.Node.Id] = queues
+                       go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
+               }
+               state.checkerQueues = *queues
+               go func() {
+                       for job := range state.Ctx.JobsNoCK(state.Node.Id) {
+                               if job.PktEnc.Nice <= state.Nice {
+                                       state.checkerQueues.appeared <- job.HshValue
+                               }
+                       }
+               }()
                state.wg.Add(1)
-               go state.FillExistingNoCK()
+               go func() {
+                       defer state.wg.Done()
+                       for {
+                               select {
+                               case <-state.isDead:
+                                       return
+                               case hsh := <-state.checkerQueues.checked:
+                                       state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
+                               }
+                       }
+               }()
        }
 
        // Remaining handshake payload sending
@@ -1166,7 +1175,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        delete(state.infosTheir, *file.Hash)
                        state.Unlock()
                        if !state.NoCK {
-                               state.checkerJobs <- file.Hash
+                               state.checkerQueues.appeared <- file.Hash
                        }
 
                case SPTypeDone:
@@ -1178,6 +1187,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                return nil, err
                        }
                        lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
+                       lesp = append(lesp, LE{"XX", string(TTx)})
                        state.Ctx.LogD("sp-done", lesp, "removing")
                        pth := filepath.Join(
                                state.Ctx.Spool,
@@ -1185,9 +1195,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                string(TTx),
                                Base32Codec.EncodeToString(done.Hash[:]),
                        )
-                       err := os.Remove(pth)
-                       lesp = append(lesp, LE{"XX", string(TTx)})
-                       if err == nil {
+                       if err = os.Remove(pth); err == nil {
                                state.Ctx.LogI("sp-done", lesp, "")
                                if state.Ctx.HdrUsage {
                                        os.Remove(pth + HdrSuffix)