]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/sp.go
Wait for background checksummers completion
[nncp.git] / src / sp.go
index d7e4882b4a757f8863e08928912e950a4d87162d..00eaac6b50d58ee0dc8665fa04d2a0a54b6c913b 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -65,7 +65,8 @@ var (
        DefaultDeadline = 10 * time.Second
        PingTimeout     = time.Minute
 
-       spCheckers = make(map[NodeId]*SPCheckerQueues)
+       spCheckers   = make(map[NodeId]*SPCheckerQueues)
+       SPCheckersWg sync.WaitGroup
 )
 
 type FdAndFullSize struct {
@@ -258,11 +259,6 @@ func (state *SPState) SetDead() {
                for range state.pings {
                }
        }()
-       go func() {
-               for _, s := range state.fds {
-                       s.fd.Close()
-               }
-       }()
 }
 
 func (state *SPState) NotAlive() bool {
@@ -287,6 +283,7 @@ func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
                        {"Node", nodeId},
                        {"Pkt", pktName},
                }
+               SPCheckersWg.Add(1)
                ctx.LogD("sp-checker", les, func(les LEs) string {
                        return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
                })
@@ -307,6 +304,7 @@ func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
                                pktName, humanize.IBytes(uint64(size)),
                        )
                })
+               SPCheckersWg.Done()
                go func(hsh *[32]byte) { checked <- hsh }(hshValue)
        }
 }
@@ -795,13 +793,20 @@ func (state *SPState) StartWorkers(
                                pingTicker.Stop()
                                return
                        case now := <-deadlineTicker.C:
-                               if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
-                                       now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
-                                       (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
-                                       (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
-                                       state.SetDead()
-                                       conn.Close() // #nosec G104
+                               if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
+                                       now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
+                                       goto Deadlined
+                               }
+                               if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
+                                       goto Deadlined
+                               }
+                               if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
+                                       goto Deadlined
                                }
+                               break
+                       Deadlined:
+                               state.SetDead()
+                               conn.Close() // #nosec G104
                        case now := <-pingTicker.C:
                                if now.After(state.TxLastSeen.Add(PingTimeout)) {
                                        state.wg.Add(1)
@@ -1133,8 +1138,9 @@ func (state *SPState) Wait() {
        state.wg.Wait()
        close(state.payloads)
        close(state.pings)
-       state.dirUnlock()
        state.Duration = time.Now().Sub(state.started)
+       SPCheckersWg.Wait()
+       state.dirUnlock()
        state.RxSpeed = state.RxBytes
        state.TxSpeed = state.TxBytes
        rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
@@ -1145,6 +1151,9 @@ func (state *SPState) Wait() {
        if txDuration > 0 {
                state.TxSpeed = state.TxBytes / txDuration
        }
+       for _, s := range state.fds {
+               s.fd.Close()
+       }
        for pktName := range state.progressBars {
                ProgressKill(pktName)
        }