From 86715c52c3f841abc351798b671aed6b133d0c8f Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sun, 21 Feb 2021 13:29:03 +0300 Subject: [PATCH] SPCheckers are always running --- src/sp.go | 86 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/src/sp.go b/src/sp.go index e120769..6f1af9f 100644 --- a/src/sp.go +++ b/src/sp.go @@ -40,6 +40,11 @@ const ( SPHeadOverhead = 4 ) +type SPCheckerQueues struct { + appeared chan *[32]byte + checked chan *[32]byte +} + var ( MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1} @@ -57,6 +62,8 @@ var ( DefaultDeadline = 10 * time.Second PingTimeout = time.Minute + + spCheckers = make(map[NodeId]*SPCheckerQueues) ) type FdAndFullSize struct { @@ -225,7 +232,7 @@ type SPState struct { writeSPBuf bytes.Buffer fds map[string]FdAndFullSize fileHashers map[string]*HasherAndOffset - checkerJobs chan *[32]byte + checkerQueues SPCheckerQueues sync.RWMutex } @@ -252,9 +259,6 @@ func (state *SPState) SetDead() { s.fd.Close() } }() - if !state.NoCK { - close(state.checkerJobs) - } } func (state *SPState) NotAlive() bool { @@ -271,28 +275,22 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } -func (state *SPState) SPChecker() { - for hshValue := range state.checkerJobs { +func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) { + for hshValue := range appeared { les := LEs{ {"XX", string(TRx)}, - {"Node", state.Node.Id}, + {"Node", nodeId}, {"Pkt", Base32Codec.EncodeToString(hshValue[:])}, } - state.Ctx.LogD("sp-file", les, "checking") - size, err := state.Ctx.CheckNoCK(state.Node.Id, hshValue) + ctx.LogD("sp-checker", les, "checking") + size, err := ctx.CheckNoCK(nodeId, hshValue) les = append(les, LE{"Size", size}) if err != nil { - state.Ctx.LogE("sp-file", les, err, "") + ctx.LogE("sp-checker", les, err, "") continue } - state.Ctx.LogI("sp-done", les, "") - state.wg.Add(1) - go func(hsh *[32]byte) { - if !state.NotAlive() { - state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh}) - } - state.wg.Done() - }(hshValue) + ctx.LogI("sp-done", les, "") + go func(hsh *[32]byte) { checked <- hsh }(hshValue) } } @@ -583,20 +581,6 @@ func (state *SPState) closeFd(pth string) { } } -func (state *SPState) FillExistingNoCK() { - checkerJobs := make([]*[32]byte, 0) - for job := range state.Ctx.JobsNoCK(state.Node.Id) { - if job.PktEnc.Nice > state.Nice { - continue - } - checkerJobs = append(checkerJobs, job.HshValue) - } - for _, job := range checkerJobs { - state.checkerJobs <- job - } - state.wg.Done() -} - func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, @@ -612,10 +596,35 @@ func (state *SPState) StartWorkers( // Checker if !state.NoCK { - state.checkerJobs = make(chan *[32]byte) - go state.SPChecker() + queues := spCheckers[*state.Node.Id] + if queues == nil { + queues = &SPCheckerQueues{ + appeared: make(chan *[32]byte), + checked: make(chan *[32]byte), + } + spCheckers[*state.Node.Id] = queues + go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked) + } + state.checkerQueues = *queues + go func() { + for job := range state.Ctx.JobsNoCK(state.Node.Id) { + if job.PktEnc.Nice <= state.Nice { + state.checkerQueues.appeared <- job.HshValue + } + } + }() state.wg.Add(1) - go state.FillExistingNoCK() + go func() { + defer state.wg.Done() + for { + select { + case <-state.isDead: + return + case hsh := <-state.checkerQueues.checked: + state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh}) + } + } + }() } // Remaining handshake payload sending @@ -1166,7 +1175,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { delete(state.infosTheir, *file.Hash) state.Unlock() if !state.NoCK { - state.checkerJobs <- file.Hash + state.checkerQueues.appeared <- file.Hash } case SPTypeDone: @@ -1178,6 +1187,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return nil, err } lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])}) + lesp = append(lesp, LE{"XX", string(TTx)}) state.Ctx.LogD("sp-done", lesp, "removing") pth := filepath.Join( state.Ctx.Spool, @@ -1185,9 +1195,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { string(TTx), Base32Codec.EncodeToString(done.Hash[:]), ) - err := os.Remove(pth) - lesp = append(lesp, LE{"XX", string(TTx)}) - if err == nil { + if err = os.Remove(pth); err == nil { state.Ctx.LogI("sp-done", lesp, "") if state.Ctx.HdrUsage { os.Remove(pth + HdrSuffix) -- 2.44.0