SPHeadOverhead = 4
)
+type SPCheckerQueues struct {
+ appeared chan *[32]byte
+ checked chan *[32]byte
+}
+
var (
MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
DefaultDeadline = 10 * time.Second
PingTimeout = time.Minute
+
+ spCheckers = make(map[NodeId]*SPCheckerQueues)
)
type FdAndFullSize struct {
writeSPBuf bytes.Buffer
fds map[string]FdAndFullSize
fileHashers map[string]*HasherAndOffset
- checkerJobs chan *[32]byte
+ checkerQueues SPCheckerQueues
sync.RWMutex
}
s.fd.Close()
}
}()
- if !state.NoCK {
- close(state.checkerJobs)
- }
}
func (state *SPState) NotAlive() bool {
state.Ctx.UnlockDir(state.txLock)
}
-func (state *SPState) SPChecker() {
- for hshValue := range state.checkerJobs {
+func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
+ for hshValue := range appeared {
les := LEs{
{"XX", string(TRx)},
- {"Node", state.Node.Id},
+ {"Node", nodeId},
{"Pkt", Base32Codec.EncodeToString(hshValue[:])},
}
- state.Ctx.LogD("sp-file", les, "checking")
- size, err := state.Ctx.CheckNoCK(state.Node.Id, hshValue)
+ ctx.LogD("sp-checker", les, "checking")
+ size, err := ctx.CheckNoCK(nodeId, hshValue)
les = append(les, LE{"Size", size})
if err != nil {
- state.Ctx.LogE("sp-file", les, err, "")
+ ctx.LogE("sp-checker", les, err, "")
continue
}
- state.Ctx.LogI("sp-done", les, "")
- state.wg.Add(1)
- go func(hsh *[32]byte) {
- if !state.NotAlive() {
- state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
- }
- state.wg.Done()
- }(hshValue)
+ ctx.LogI("sp-done", les, "")
+ go func(hsh *[32]byte) { checked <- hsh }(hshValue)
}
}
}
}
-func (state *SPState) FillExistingNoCK() {
- checkerJobs := make([]*[32]byte, 0)
- for job := range state.Ctx.JobsNoCK(state.Node.Id) {
- if job.PktEnc.Nice > state.Nice {
- continue
- }
- checkerJobs = append(checkerJobs, job.HshValue)
- }
- for _, job := range checkerJobs {
- state.checkerJobs <- job
- }
- state.wg.Done()
-}
-
func (state *SPState) StartWorkers(
conn ConnDeadlined,
infosPayloads [][]byte,
// Checker
if !state.NoCK {
- state.checkerJobs = make(chan *[32]byte)
- go state.SPChecker()
+ queues := spCheckers[*state.Node.Id]
+ if queues == nil {
+ queues = &SPCheckerQueues{
+ appeared: make(chan *[32]byte),
+ checked: make(chan *[32]byte),
+ }
+ spCheckers[*state.Node.Id] = queues
+ go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
+ }
+ state.checkerQueues = *queues
+ go func() {
+ for job := range state.Ctx.JobsNoCK(state.Node.Id) {
+ if job.PktEnc.Nice <= state.Nice {
+ state.checkerQueues.appeared <- job.HshValue
+ }
+ }
+ }()
state.wg.Add(1)
- go state.FillExistingNoCK()
+ go func() {
+ defer state.wg.Done()
+ for {
+ select {
+ case <-state.isDead:
+ return
+ case hsh := <-state.checkerQueues.checked:
+ state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
+ }
+ }
+ }()
}
// Remaining handshake payload sending
delete(state.infosTheir, *file.Hash)
state.Unlock()
if !state.NoCK {
- state.checkerJobs <- file.Hash
+ state.checkerQueues.appeared <- file.Hash
}
case SPTypeDone:
return nil, err
}
lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
+ lesp = append(lesp, LE{"XX", string(TTx)})
state.Ctx.LogD("sp-done", lesp, "removing")
pth := filepath.Join(
state.Ctx.Spool,
string(TTx),
Base32Codec.EncodeToString(done.Hash[:]),
)
- err := os.Remove(pth)
- lesp = append(lesp, LE{"XX", string(TTx)})
- if err == nil {
+ if err = os.Remove(pth); err == nil {
state.Ctx.LogI("sp-done", lesp, "")
if state.Ctx.HdrUsage {
os.Remove(pth + HdrSuffix)