]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/sp.go
Wait for background checksummers completion
[nncp.git] / src / sp.go
index e6e4900a47052ceb81098d3e8d0f453889c48e7a..00eaac6b50d58ee0dc8665fa04d2a0a54b6c913b 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -65,7 +65,8 @@ var (
        DefaultDeadline = 10 * time.Second
        PingTimeout     = time.Minute
 
-       spCheckers = make(map[NodeId]*SPCheckerQueues)
+       spCheckers   = make(map[NodeId]*SPCheckerQueues)
+       SPCheckersWg sync.WaitGroup
 )
 
 type FdAndFullSize struct {
@@ -233,8 +234,10 @@ type SPState struct {
        onlyPkts       map[[32]byte]bool
        writeSPBuf     bytes.Buffer
        fds            map[string]FdAndFullSize
+       fdsLock        sync.RWMutex
        fileHashers    map[string]*HasherAndOffset
        checkerQueues  SPCheckerQueues
+       progressBars   map[string]struct{}
        sync.RWMutex
 }
 
@@ -256,11 +259,6 @@ func (state *SPState) SetDead() {
                for range state.pings {
                }
        }()
-       go func() {
-               for _, s := range state.fds {
-                       s.fd.Close()
-               }
-       }()
 }
 
 func (state *SPState) NotAlive() bool {
@@ -285,6 +283,7 @@ func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
                        {"Node", nodeId},
                        {"Pkt", pktName},
                }
+               SPCheckersWg.Add(1)
                ctx.LogD("sp-checker", les, func(les LEs) string {
                        return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
                })
@@ -305,6 +304,7 @@ func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
                                pktName, humanize.IBytes(uint64(size)),
                        )
                })
+               SPCheckersWg.Done()
                go func(hsh *[32]byte) { checked <- hsh }(hshValue)
        }
 }
@@ -440,6 +440,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error {
        state.pings = make(chan struct{})
        state.infosTheir = make(map[[32]byte]*SPInfo)
        state.infosOurSeen = make(map[[32]byte]uint8)
+       state.progressBars = make(map[string]struct{})
        state.started = started
        state.rxLock = rxLock
        state.txLock = txLock
@@ -557,6 +558,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        state.pings = make(chan struct{})
        state.infosOurSeen = make(map[[32]byte]uint8)
        state.infosTheir = make(map[[32]byte]*SPInfo)
+       state.progressBars = make(map[string]struct{})
        state.started = started
        state.xxOnly = xxOnly
 
@@ -669,11 +671,12 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
 }
 
 func (state *SPState) closeFd(pth string) {
-       s, exists := state.fds[pth]
-       delete(state.fds, pth)
-       if exists {
+       state.fdsLock.Lock()
+       if s, exists := state.fds[pth]; exists {
+               delete(state.fds, pth)
                s.fd.Close()
        }
+       state.fdsLock.Unlock()
 }
 
 func (state *SPState) StartWorkers(
@@ -790,13 +793,20 @@ func (state *SPState) StartWorkers(
                                pingTicker.Stop()
                                return
                        case now := <-deadlineTicker.C:
-                               if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
-                                       now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
-                                       (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
-                                       (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
-                                       state.SetDead()
-                                       conn.Close() // #nosec G104
+                               if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
+                                       now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
+                                       goto Deadlined
+                               }
+                               if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
+                                       goto Deadlined
                                }
+                               if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
+                                       goto Deadlined
+                               }
+                               break
+                       Deadlined:
+                               state.SetDead()
+                               conn.Close() // #nosec G104
                        case now := <-pingTicker.C:
                                if now.After(state.TxLastSeen.Add(PingTimeout)) {
                                        state.wg.Add(1)
@@ -914,7 +924,9 @@ func (state *SPState) StartWorkers(
                                        string(TTx),
                                        Base32Codec.EncodeToString(freq.Hash[:]),
                                )
+                               state.fdsLock.RLock()
                                fdAndFullSize, exists := state.fds[pth]
+                               state.fdsLock.RUnlock()
                                if !exists {
                                        fd, err := os.Open(pth)
                                        if err != nil {
@@ -931,7 +943,9 @@ func (state *SPState) StartWorkers(
                                                return
                                        }
                                        fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+                                       state.fdsLock.Lock()
                                        state.fds[pth] = fdAndFullSize
+                                       state.fdsLock.Unlock()
                                }
                                fd := fdAndFullSize.fd
                                fullSize := fdAndFullSize.fullSize
@@ -983,6 +997,7 @@ func (state *SPState) StartWorkers(
                                        LE{"FullSize", fullSize},
                                )
                                if state.Ctx.ShowPrgrs {
+                                       state.progressBars[pktName] = struct{}{}
                                        Progress("Tx", lesp)
                                }
                                state.Lock()
@@ -996,6 +1011,9 @@ func (state *SPState) StartWorkers(
                                                } else {
                                                        state.queueTheir = state.queueTheir[:0]
                                                }
+                                               if state.Ctx.ShowPrgrs {
+                                                       delete(state.progressBars, pktName)
+                                               }
                                        } else {
                                                state.queueTheir[0].freq.Offset += uint64(len(buf))
                                        }
@@ -1120,8 +1138,9 @@ func (state *SPState) Wait() {
        state.wg.Wait()
        close(state.payloads)
        close(state.pings)
-       state.dirUnlock()
        state.Duration = time.Now().Sub(state.started)
+       SPCheckersWg.Wait()
+       state.dirUnlock()
        state.RxSpeed = state.RxBytes
        state.TxSpeed = state.TxBytes
        rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
@@ -1132,6 +1151,12 @@ func (state *SPState) Wait() {
        if txDuration > 0 {
                state.TxSpeed = state.TxBytes / txDuration
        }
+       for _, s := range state.fds {
+               s.fd.Close()
+       }
+       for pktName := range state.progressBars {
+               ProgressKill(pktName)
+       }
 }
 
 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
@@ -1352,7 +1377,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
                                return logMsg(les) + ": opening part"
                        })
+                       state.fdsLock.RLock()
                        fdAndFullSize, exists := state.fds[filePathPart]
+                       state.fdsLock.RUnlock()
                        var fd *os.File
                        if exists {
                                fd = fdAndFullSize.fd
@@ -1368,7 +1395,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        })
                                        return nil, err
                                }
+                               state.fdsLock.Lock()
                                state.fds[filePathPart] = FdAndFullSize{fd: fd}
+                               state.fdsLock.Unlock()
                                if file.Offset == 0 {
                                        h, err := blake2b.New256(nil)
                                        if err != nil {
@@ -1429,11 +1458,15 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        }
                        lesp = append(lesp, LE{"FullSize", fullsize})
                        if state.Ctx.ShowPrgrs {
+                               state.progressBars[pktName] = struct{}{}
                                Progress("Rx", lesp)
                        }
                        if fullsize != ourSize {
                                continue
                        }
+                       if state.Ctx.ShowPrgrs {
+                               delete(state.progressBars, pktName)
+                       }
                        logMsg = func(les LEs) string {
                                return fmt.Sprintf(
                                        "Got packet %s %d%% (%s / %s)",