]> Cypherpunks.ru repositories - nncp.git/commitdiff
Check tx directory all the time while connected
authorSergey Matveev <stargrave@stargrave.org>
Sat, 14 Jan 2017 18:41:39 +0000 (21:41 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sat, 14 Jan 2017 19:32:55 +0000 (22:32 +0300)
src/cypherpunks.ru/nncp/sp.go

index 59f912f005bb702849c0c4a53e0afa44999a164a..235770d487896063304ca5cd53a9f9d5f472b1be 100644 (file)
@@ -157,27 +157,28 @@ func payloadsSplit(payloads [][]byte) [][]byte {
 }
 
 type SPState struct {
-       ctx        *Ctx
-       NodeId     *NodeId
-       nice       uint8
-       hs         *noise.HandshakeState
-       csOur      *noise.CipherState
-       csTheir    *noise.CipherState
-       payloads   chan []byte
-       infosTheir map[[32]byte]*SPInfo
-       queueTheir []*SPFreq
-       wg         sync.WaitGroup
-       RxBytes    int64
-       RxLastSeen time.Time
-       TxBytes    int64
-       TxLastSeen time.Time
-       started    time.Time
-       Duration   time.Duration
-       RxSpeed    int64
-       TxSpeed    int64
-       rxLock     *os.File
-       txLock     *os.File
-       xxOnly     *TRxTx
+       ctx          *Ctx
+       NodeId       *NodeId
+       nice         uint8
+       hs           *noise.HandshakeState
+       csOur        *noise.CipherState
+       csTheir      *noise.CipherState
+       payloads     chan []byte
+       infosTheir   map[[32]byte]*SPInfo
+       infosOurSeen map[[32]byte]struct{}
+       queueTheir   []*SPFreq
+       wg           sync.WaitGroup
+       RxBytes      int64
+       RxLastSeen   time.Time
+       TxBytes      int64
+       TxLastSeen   time.Time
+       started      time.Time
+       Duration     time.Duration
+       RxSpeed      int64
+       TxSpeed      int64
+       rxLock       *os.File
+       txLock       *os.File
+       xxOnly       *TRxTx
        sync.RWMutex
 }
 
@@ -214,7 +215,7 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
        return sp.Payload, nil
 }
 
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte {
        var infos []*SPInfo
        var totalSize int64
        for job := range ctx.Jobs(nodeId, TTx) {
@@ -222,12 +223,16 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte {
                if job.PktEnc.Nice > nice {
                        continue
                }
+               if _, known := (*seen)[*job.HshValue]; known {
+                       continue
+               }
                totalSize += job.Size
                infos = append(infos, &SPInfo{
                        Nice: job.PktEnc.Nice,
                        Size: uint64(job.Size),
                        Hash: job.HshValue,
                })
+               (*seen)[*job.HshValue] = struct{}{}
        }
        sort.Sort(ByNice(infos))
        var payloads [][]byte
@@ -239,12 +244,14 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte {
                        "size": strconv.FormatInt(int64(info.Size), 10),
                }, "")
        }
-       ctx.LogI("sp-infos", SDS{
-               "xx":   string(TTx),
-               "node": nodeId,
-               "pkts": strconv.Itoa(len(payloads)),
-               "size": strconv.FormatInt(totalSize, 10),
-       }, "")
+       if totalSize > 0 {
+               ctx.LogI("sp-infos", SDS{
+                       "xx":   string(TTx),
+                       "node": nodeId,
+                       "pkts": strconv.Itoa(len(payloads)),
+                       "size": strconv.FormatInt(totalSize, 10),
+               }, "")
+       }
        return payloadsSplit(payloads)
 }
 
@@ -279,21 +286,22 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx)
                PeerStatic: ctx.Neigh[*nodeId].NoisePub[:],
        }
        state := SPState{
-               ctx:        ctx,
-               hs:         noise.NewHandshakeState(conf),
-               NodeId:     nodeId,
-               nice:       nice,
-               payloads:   make(chan []byte),
-               infosTheir: make(map[[32]byte]*SPInfo),
-               started:    started,
-               rxLock:     rxLock,
-               txLock:     txLock,
-               xxOnly:     xxOnly,
+               ctx:          ctx,
+               hs:           noise.NewHandshakeState(conf),
+               NodeId:       nodeId,
+               nice:         nice,
+               payloads:     make(chan []byte),
+               infosTheir:   make(map[[32]byte]*SPInfo),
+               infosOurSeen: make(map[[32]byte]struct{}),
+               started:      started,
+               rxLock:       rxLock,
+               txLock:       txLock,
+               xxOnly:       xxOnly,
        }
 
        var infosPayloads [][]byte
        if xxOnly == nil || *xxOnly != TTx {
-               infosPayloads = ctx.infosOur(nodeId, nice)
+               infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
        }
        var firstPayload []byte
        if len(infosPayloads) > 0 {
@@ -350,13 +358,14 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro
                },
        }
        state := SPState{
-               ctx:        ctx,
-               hs:         noise.NewHandshakeState(conf),
-               nice:       nice,
-               payloads:   make(chan []byte),
-               infosTheir: make(map[[32]byte]*SPInfo),
-               started:    started,
-               xxOnly:     xxOnly,
+               ctx:          ctx,
+               hs:           noise.NewHandshakeState(conf),
+               nice:         nice,
+               payloads:     make(chan []byte),
+               infosOurSeen: make(map[[32]byte]struct{}),
+               infosTheir:   make(map[[32]byte]*SPInfo),
+               started:      started,
+               xxOnly:       xxOnly,
        }
        var buf []byte
        var payload []byte
@@ -413,7 +422,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro
 
        var infosPayloads [][]byte
        if xxOnly == nil || *xxOnly != TTx {
-               infosPayloads = ctx.infosOur(nodeId, nice)
+               infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
        }
        var firstPayload []byte
        if len(infosPayloads) > 0 {
@@ -465,6 +474,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa
                state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
                return err
        }
+
        go func() {
                for _, reply := range replies {
                        state.ctx.LogD(
@@ -475,6 +485,24 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa
                        state.payloads <- reply
                }
        }()
+
+       go func() {
+               for range time.Tick(time.Second) {
+                       for _, payload := range state.ctx.infosOur(
+                               state.NodeId,
+                               state.nice,
+                               &state.infosOurSeen,
+                       ) {
+                               state.ctx.LogD(
+                                       "sp-work",
+                                       SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+                                       "queuing new info",
+                               )
+                               state.payloads <- payload
+                       }
+               }
+       }()
+
        state.wg.Add(1)
        go func() {
                defer state.wg.Done()
@@ -583,6 +611,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa
                        }
                }
        }()
+
        state.wg.Add(1)
        go func() {
                defer state.wg.Done()
@@ -635,6 +664,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa
                        }()
                }
        }()
+
        return nil
 }