}
type SPState struct {
- ctx *Ctx
- NodeId *NodeId
- nice uint8
- hs *noise.HandshakeState
- csOur *noise.CipherState
- csTheir *noise.CipherState
- payloads chan []byte
- infosTheir map[[32]byte]*SPInfo
- queueTheir []*SPFreq
- wg sync.WaitGroup
- RxBytes int64
- RxLastSeen time.Time
- TxBytes int64
- TxLastSeen time.Time
- started time.Time
- Duration time.Duration
- RxSpeed int64
- TxSpeed int64
- rxLock *os.File
- txLock *os.File
- xxOnly *TRxTx
+ ctx *Ctx
+ NodeId *NodeId
+ nice uint8
+ hs *noise.HandshakeState
+ csOur *noise.CipherState
+ csTheir *noise.CipherState
+ payloads chan []byte
+ infosTheir map[[32]byte]*SPInfo
+ infosOurSeen map[[32]byte]struct{}
+ queueTheir []*SPFreq
+ wg sync.WaitGroup
+ RxBytes int64
+ RxLastSeen time.Time
+ TxBytes int64
+ TxLastSeen time.Time
+ started time.Time
+ Duration time.Duration
+ RxSpeed int64
+ TxSpeed int64
+ rxLock *os.File
+ txLock *os.File
+ xxOnly *TRxTx
sync.RWMutex
}
return sp.Payload, nil
}
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte {
var infos []*SPInfo
var totalSize int64
for job := range ctx.Jobs(nodeId, TTx) {
if job.PktEnc.Nice > nice {
continue
}
+ if _, known := (*seen)[*job.HshValue]; known {
+ continue
+ }
totalSize += job.Size
infos = append(infos, &SPInfo{
Nice: job.PktEnc.Nice,
Size: uint64(job.Size),
Hash: job.HshValue,
})
+ (*seen)[*job.HshValue] = struct{}{}
}
sort.Sort(ByNice(infos))
var payloads [][]byte
"size": strconv.FormatInt(int64(info.Size), 10),
}, "")
}
- ctx.LogI("sp-infos", SDS{
- "xx": string(TTx),
- "node": nodeId,
- "pkts": strconv.Itoa(len(payloads)),
- "size": strconv.FormatInt(totalSize, 10),
- }, "")
+ if totalSize > 0 {
+ ctx.LogI("sp-infos", SDS{
+ "xx": string(TTx),
+ "node": nodeId,
+ "pkts": strconv.Itoa(len(payloads)),
+ "size": strconv.FormatInt(totalSize, 10),
+ }, "")
+ }
return payloadsSplit(payloads)
}
PeerStatic: ctx.Neigh[*nodeId].NoisePub[:],
}
state := SPState{
- ctx: ctx,
- hs: noise.NewHandshakeState(conf),
- NodeId: nodeId,
- nice: nice,
- payloads: make(chan []byte),
- infosTheir: make(map[[32]byte]*SPInfo),
- started: started,
- rxLock: rxLock,
- txLock: txLock,
- xxOnly: xxOnly,
+ ctx: ctx,
+ hs: noise.NewHandshakeState(conf),
+ NodeId: nodeId,
+ nice: nice,
+ payloads: make(chan []byte),
+ infosTheir: make(map[[32]byte]*SPInfo),
+ infosOurSeen: make(map[[32]byte]struct{}),
+ started: started,
+ rxLock: rxLock,
+ txLock: txLock,
+ xxOnly: xxOnly,
}
var infosPayloads [][]byte
if xxOnly == nil || *xxOnly != TTx {
- infosPayloads = ctx.infosOur(nodeId, nice)
+ infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
}
var firstPayload []byte
if len(infosPayloads) > 0 {
},
}
state := SPState{
- ctx: ctx,
- hs: noise.NewHandshakeState(conf),
- nice: nice,
- payloads: make(chan []byte),
- infosTheir: make(map[[32]byte]*SPInfo),
- started: started,
- xxOnly: xxOnly,
+ ctx: ctx,
+ hs: noise.NewHandshakeState(conf),
+ nice: nice,
+ payloads: make(chan []byte),
+ infosOurSeen: make(map[[32]byte]struct{}),
+ infosTheir: make(map[[32]byte]*SPInfo),
+ started: started,
+ xxOnly: xxOnly,
}
var buf []byte
var payload []byte
var infosPayloads [][]byte
if xxOnly == nil || *xxOnly != TTx {
- infosPayloads = ctx.infosOur(nodeId, nice)
+ infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
}
var firstPayload []byte
if len(infosPayloads) > 0 {
state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
return err
}
+
go func() {
for _, reply := range replies {
state.ctx.LogD(
state.payloads <- reply
}
}()
+
+ go func() {
+ for range time.Tick(time.Second) {
+ for _, payload := range state.ctx.infosOur(
+ state.NodeId,
+ state.nice,
+ &state.infosOurSeen,
+ ) {
+ state.ctx.LogD(
+ "sp-work",
+ SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+ "queuing new info",
+ )
+ state.payloads <- payload
+ }
+ }
+ }()
+
state.wg.Add(1)
go func() {
defer state.wg.Done()
}
}
}()
+
state.wg.Add(1)
go func() {
defer state.wg.Done()
}()
}
}()
+
return nil
}