Payload []byte
}
+type FreqWithNice struct {
+ freq *SPFreq
+ nice uint8
+}
+
func init() {
var buf bytes.Buffer
spHead := SPHead{Type: SPTypeHalt}
csTheir *noise.CipherState
payloads chan []byte
infosTheir map[[32]byte]*SPInfo
- infosOurSeen map[[32]byte]struct{}
- queueTheir []*SPFreq
+ infosOurSeen map[[32]byte]uint8
+ queueTheir []*FreqWithNice
wg sync.WaitGroup
RxBytes int64
RxLastSeen time.Time
return sp.Payload, nil
}
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
var infos []*SPInfo
var totalSize int64
for job := range ctx.Jobs(nodeId, TTx) {
Size: uint64(job.Size),
Hash: job.HshValue,
})
- (*seen)[*job.HshValue] = struct{}{}
+ (*seen)[*job.HshValue] = job.PktEnc.Nice
}
sort.Sort(ByNice(infos))
var payloads [][]byte
nice: nice,
payloads: make(chan []byte),
infosTheir: make(map[[32]byte]*SPInfo),
- infosOurSeen: make(map[[32]byte]struct{}),
+ infosOurSeen: make(map[[32]byte]uint8),
started: started,
rxLock: rxLock,
txLock: txLock,
hs: hs,
nice: nice,
payloads: make(chan []byte),
- infosOurSeen: make(map[[32]byte]struct{}),
+ infosOurSeen: make(map[[32]byte]uint8),
infosTheir: make(map[[32]byte]*SPInfo),
started: started,
xxOnly: xxOnly,
time.Sleep(100 * time.Millisecond)
continue
}
- freq := state.queueTheir[0]
+ freq := state.queueTheir[0].freq
state.RUnlock()
sdsp := SdsAdd(sds, SDS{
"xx": string(TTx),
sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
state.ctx.LogP("sp-file", sdsp, "")
state.Lock()
- if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
+ if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
if ourSize == fullSize {
state.ctx.LogD("sp-file", sdsp, "finished")
if len(state.queueTheir) > 1 {
state.queueTheir = state.queueTheir[:0]
}
} else {
- state.queueTheir[0].Offset += uint64(len(buf))
+ state.queueTheir[0].freq.Offset += uint64(len(buf))
}
} else {
state.ctx.LogD("sp-file", sdsp, "queue disappeared")
"hash": ToBase32(freq.Hash[:]),
"offset": strconv.FormatInt(int64(freq.Offset), 10),
}), "queueing")
- state.Lock()
- state.queueTheir = append(state.queueTheir, &freq)
- state.Unlock()
+ nice, exists := state.infosOurSeen[*freq.Hash]
+ if exists {
+ state.Lock()
+ insertIdx := 0
+ var freqWithNice *FreqWithNice
+ for insertIdx, freqWithNice = range state.queueTheir {
+ if freqWithNice.nice > nice {
+ break
+ }
+ }
+ state.queueTheir = append(state.queueTheir, nil)
+ copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
+ state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
+ state.Unlock()
+ } else {
+ state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
+ "hash": ToBase32(freq.Hash[:]),
+ "offset": strconv.FormatInt(int64(freq.Offset), 10),
+ }), "unknown")
+ }
case SPTypeHalt:
sdsp := SdsAdd(sds, SDS{"type": "halt"})
state.ctx.LogD("sp-process", sdsp, "")