From 148e4baaa06ace0ac893f03b6c5b1cc1fc19ec3e Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 14 Jan 2017 21:41:39 +0300 Subject: [PATCH] Check tx directory all the time while connected --- src/cypherpunks.ru/nncp/sp.go | 124 +++++++++++++++++++++------------- 1 file changed, 77 insertions(+), 47 deletions(-) diff --git a/src/cypherpunks.ru/nncp/sp.go b/src/cypherpunks.ru/nncp/sp.go index 59f912f..235770d 100644 --- a/src/cypherpunks.ru/nncp/sp.go +++ b/src/cypherpunks.ru/nncp/sp.go @@ -157,27 +157,28 @@ func payloadsSplit(payloads [][]byte) [][]byte { } type SPState struct { - ctx *Ctx - NodeId *NodeId - nice uint8 - hs *noise.HandshakeState - csOur *noise.CipherState - csTheir *noise.CipherState - payloads chan []byte - infosTheir map[[32]byte]*SPInfo - queueTheir []*SPFreq - wg sync.WaitGroup - RxBytes int64 - RxLastSeen time.Time - TxBytes int64 - TxLastSeen time.Time - started time.Time - Duration time.Duration - RxSpeed int64 - TxSpeed int64 - rxLock *os.File - txLock *os.File - xxOnly *TRxTx + ctx *Ctx + NodeId *NodeId + nice uint8 + hs *noise.HandshakeState + csOur *noise.CipherState + csTheir *noise.CipherState + payloads chan []byte + infosTheir map[[32]byte]*SPInfo + infosOurSeen map[[32]byte]struct{} + queueTheir []*SPFreq + wg sync.WaitGroup + RxBytes int64 + RxLastSeen time.Time + TxBytes int64 + TxLastSeen time.Time + started time.Time + Duration time.Duration + RxSpeed int64 + TxSpeed int64 + rxLock *os.File + txLock *os.File + xxOnly *TRxTx sync.RWMutex } @@ -214,7 +215,7 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { return sp.Payload, nil } -func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte { +func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte { var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { @@ -222,12 +223,16 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte { if job.PktEnc.Nice > nice { continue } + if _, known := (*seen)[*job.HshValue]; known { + continue + } totalSize += job.Size infos = append(infos, &SPInfo{ Nice: job.PktEnc.Nice, Size: uint64(job.Size), Hash: job.HshValue, }) + (*seen)[*job.HshValue] = struct{}{} } sort.Sort(ByNice(infos)) var payloads [][]byte @@ -239,12 +244,14 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte { "size": strconv.FormatInt(int64(info.Size), 10), }, "") } - ctx.LogI("sp-infos", SDS{ - "xx": string(TTx), - "node": nodeId, - "pkts": strconv.Itoa(len(payloads)), - "size": strconv.FormatInt(totalSize, 10), - }, "") + if totalSize > 0 { + ctx.LogI("sp-infos", SDS{ + "xx": string(TTx), + "node": nodeId, + "pkts": strconv.Itoa(len(payloads)), + "size": strconv.FormatInt(totalSize, 10), + }, "") + } return payloadsSplit(payloads) } @@ -279,21 +286,22 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) PeerStatic: ctx.Neigh[*nodeId].NoisePub[:], } state := SPState{ - ctx: ctx, - hs: noise.NewHandshakeState(conf), - NodeId: nodeId, - nice: nice, - payloads: make(chan []byte), - infosTheir: make(map[[32]byte]*SPInfo), - started: started, - rxLock: rxLock, - txLock: txLock, - xxOnly: xxOnly, + ctx: ctx, + hs: noise.NewHandshakeState(conf), + NodeId: nodeId, + nice: nice, + payloads: make(chan []byte), + infosTheir: make(map[[32]byte]*SPInfo), + infosOurSeen: make(map[[32]byte]struct{}), + started: started, + rxLock: rxLock, + txLock: txLock, + xxOnly: xxOnly, } var infosPayloads [][]byte if xxOnly == nil || *xxOnly != TTx { - infosPayloads = ctx.infosOur(nodeId, nice) + infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen) } var firstPayload []byte if len(infosPayloads) > 0 { @@ -350,13 +358,14 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro }, } state := SPState{ - ctx: ctx, - hs: noise.NewHandshakeState(conf), - nice: nice, - payloads: make(chan []byte), - infosTheir: make(map[[32]byte]*SPInfo), - started: started, - xxOnly: xxOnly, + ctx: ctx, + hs: noise.NewHandshakeState(conf), + nice: nice, + payloads: make(chan []byte), + infosOurSeen: make(map[[32]byte]struct{}), + infosTheir: make(map[[32]byte]*SPInfo), + started: started, + xxOnly: xxOnly, } var buf []byte var payload []byte @@ -413,7 +422,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro var infosPayloads [][]byte if xxOnly == nil || *xxOnly != TTx { - infosPayloads = ctx.infosOur(nodeId, nice) + infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen) } var firstPayload []byte if len(infosPayloads) > 0 { @@ -465,6 +474,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "") return err } + go func() { for _, reply := range replies { state.ctx.LogD( @@ -475,6 +485,24 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa state.payloads <- reply } }() + + go func() { + for range time.Tick(time.Second) { + for _, payload := range state.ctx.infosOur( + state.NodeId, + state.nice, + &state.infosOurSeen, + ) { + state.ctx.LogD( + "sp-work", + SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + "queuing new info", + ) + state.payloads <- payload + } + } + }() + state.wg.Add(1) go func() { defer state.wg.Done() @@ -583,6 +611,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa } } }() + state.wg.Add(1) go func() { defer state.wg.Done() @@ -635,6 +664,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa }() } }() + return nil } -- 2.44.0