]> Cypherpunks.ru repositories - nncp.git/commitdiff
Finish SP related goroutines
authorSergey Matveev <stargrave@stargrave.org>
Fri, 20 Dec 2019 08:05:58 +0000 (11:05 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sun, 22 Dec 2019 11:49:43 +0000 (14:49 +0300)
doc/news.ru.texi
doc/news.texi
src/sp.go

index 28c092229fb9c9cc49d21adcd402ffc89d69a0bb..71c82d8a3896471d7a60a9ea4299a1e5bc508a6e 100644 (file)
@@ -11,6 +11,9 @@
 @item
 Исправлено не происходящее дополнение (padding) handshake сообщений.
 
+@item
+Завершать все порождаемые в SP протоколе горутины, меньше утечек памяти.
+
 @end itemize
 
 @node Релиз 5.2.1
index 9af9ba49681ecc013875463f0f1e96f55636eca0..7c725402c656f1e159d8717ffe2232b2754432b1 100644 (file)
@@ -13,6 +13,9 @@ Progress messages contain prefix, describing the running action.
 @item
 Fixed not occurring handshake messages padding.
 
+@item
+Finish all SP protocol related goroutines, less memory leak.
+
 @end itemize
 
 @node Release 5.2.1
index 580b97efb725c6024c80321196164393afbc2b15..2aebb07b381e668c2bcebea3544a8f2fc86af24d 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -38,7 +38,7 @@ const (
        PartSuffix      = ".part"
        DefaultDeadline = 10
 
-       SPHeadOverhead  = 4
+       SPHeadOverhead = 4
 )
 
 var (
@@ -195,15 +195,37 @@ type SPState struct {
        xxOnly         TRxTx
        rxRate         int
        txRate         int
-       isDead         bool
+       isDead         chan struct{}
        listOnly       bool
        onlyPkts       map[[32]byte]bool
        sync.RWMutex
 }
 
+func (state *SPState) SetDead() {
+       state.Lock()
+       defer state.Unlock()
+       select {
+       case _, ok := <-state.isDead:
+               if !ok {
+                       // Already closed channel, dead
+                       return
+               }
+       default:
+       }
+       close(state.isDead)
+       go func() {
+               for _ = range state.payloads {
+               }
+       }()
+}
+
 func (state *SPState) NotAlive() bool {
-       if state.isDead {
-               return true
+       select {
+       case _, ok := <-state.isDead:
+               if !ok {
+                       return true
+               }
+       default:
        }
        now := time.Now()
        if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
@@ -488,9 +510,13 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
 func (state *SPState) StartWorkers(
        conn ConnDeadlined,
        infosPayloads [][]byte,
-       payload []byte) error {
+       payload []byte,
+) error {
+       state.isDead = make(chan struct{})
        sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
+
        if len(infosPayloads) > 1 {
+               state.wg.Add(1)
                go func() {
                        for _, payload := range infosPayloads[1:] {
                                state.Ctx.LogD(
@@ -500,6 +526,7 @@ func (state *SPState) StartWorkers(
                                )
                                state.payloads <- payload
                        }
+                       state.wg.Done()
                }()
        }
        state.Ctx.LogD(
@@ -513,6 +540,7 @@ func (state *SPState) StartWorkers(
                return err
        }
 
+       state.wg.Add(1)
        go func() {
                for _, reply := range replies {
                        state.Ctx.LogD(
@@ -522,25 +550,34 @@ func (state *SPState) StartWorkers(
                        )
                        state.payloads <- reply
                }
+               state.wg.Done()
        }()
 
        if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
+               state.wg.Add(1)
                go func() {
-                       for range time.Tick(time.Second) {
-                               if state.NotAlive() {
-                                       return
-                               }
-                               for _, payload := range state.Ctx.infosOur(
-                                       state.Node.Id,
-                                       state.Nice,
-                                       &state.infosOurSeen,
-                               ) {
-                                       state.Ctx.LogD(
-                                               "sp-work",
-                                               SdsAdd(sds, SDS{"size": len(payload)}),
-                                               "queuing new info",
-                                       )
-                                       state.payloads <- payload
+                       ticker := time.NewTicker(time.Second)
+                       for {
+                               select {
+                               case _, ok := <-state.isDead:
+                                       if !ok {
+                                               state.wg.Done()
+                                               ticker.Stop()
+                                               return
+                                       }
+                               case <-ticker.C:
+                                       for _, payload := range state.Ctx.infosOur(
+                                               state.Node.Id,
+                                               state.Nice,
+                                               &state.infosOurSeen,
+                                       ) {
+                                               state.Ctx.LogD(
+                                                       "sp-work",
+                                                       SdsAdd(sds, SDS{"size": len(payload)}),
+                                                       "queuing new info",
+                                               )
+                                               state.payloads <- payload
+                                       }
                                }
                        }
                }()
@@ -548,13 +585,9 @@ func (state *SPState) StartWorkers(
 
        state.wg.Add(1)
        go func() {
-               defer func() {
-                       state.isDead = true
-                       state.wg.Done()
-               }()
                for {
                        if state.NotAlive() {
-                               return
+                               break
                        }
                        var payload []byte
                        select {
@@ -662,17 +695,15 @@ func (state *SPState) StartWorkers(
                                break
                        }
                }
+               state.SetDead()
+               state.wg.Done()
        }()
 
        state.wg.Add(1)
        go func() {
-               defer func() {
-                       state.isDead = true
-                       state.wg.Done()
-               }()
                for {
                        if state.NotAlive() {
-                               return
+                               break
                        }
                        state.Ctx.LogD("sp-recv", sds, "waiting for payload")
                        conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
@@ -712,6 +743,7 @@ func (state *SPState) StartWorkers(
                                state.Ctx.LogE("sp-recv", sds, err, "")
                                break
                        }
+                       state.wg.Add(1)
                        go func() {
                                for _, reply := range replies {
                                        state.Ctx.LogD(
@@ -721,11 +753,14 @@ func (state *SPState) StartWorkers(
                                        )
                                        state.payloads <- reply
                                }
+                               state.wg.Done()
                        }()
                        if state.rxRate > 0 {
                                time.Sleep(time.Second / time.Duration(state.rxRate))
                        }
                }
+               state.SetDead()
+               state.wg.Done()
        }()
 
        return nil
@@ -733,6 +768,7 @@ func (state *SPState) StartWorkers(
 
 func (state *SPState) Wait() {
        state.wg.Wait()
+       close(state.payloads)
        state.dirUnlock()
        state.Duration = time.Now().Sub(state.started)
        state.RxSpeed = state.RxBytes
@@ -915,8 +951,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                delete(state.infosTheir, *file.Hash)
                                state.Unlock()
                                spWorkersGroup.Done()
+                               state.wg.Add(1)
                                go func() {
                                        state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
+                                       state.wg.Done()
                                }()
                        }()
                case SPTypeDone: