]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/sp.go
Wait for background checksummers completion
[nncp.git] / src / sp.go
index f2c707f1029bd15bd8585bcb974b3da017097cf7..00eaac6b50d58ee0dc8665fa04d2a0a54b6c913b 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -65,7 +65,8 @@ var (
        DefaultDeadline = 10 * time.Second
        PingTimeout     = time.Minute
 
-       spCheckers = make(map[NodeId]*SPCheckerQueues)
+       spCheckers   = make(map[NodeId]*SPCheckerQueues)
+       SPCheckersWg sync.WaitGroup
 )
 
 type FdAndFullSize struct {
@@ -233,8 +234,10 @@ type SPState struct {
        onlyPkts       map[[32]byte]bool
        writeSPBuf     bytes.Buffer
        fds            map[string]FdAndFullSize
+       fdsLock        sync.RWMutex
        fileHashers    map[string]*HasherAndOffset
        checkerQueues  SPCheckerQueues
+       progressBars   map[string]struct{}
        sync.RWMutex
 }
 
@@ -256,11 +259,6 @@ func (state *SPState) SetDead() {
                for range state.pings {
                }
        }()
-       go func() {
-               for _, s := range state.fds {
-                       s.fd.Close()
-               }
-       }()
 }
 
 func (state *SPState) NotAlive() bool {
@@ -285,6 +283,7 @@ func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
                        {"Node", nodeId},
                        {"Pkt", pktName},
                }
+               SPCheckersWg.Add(1)
                ctx.LogD("sp-checker", les, func(les LEs) string {
                        return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
                })
@@ -305,6 +304,7 @@ func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
                                pktName, humanize.IBytes(uint64(size)),
                        )
                })
+               SPCheckersWg.Done()
                go func(hsh *[32]byte) { checked <- hsh }(hshValue)
        }
 }
@@ -440,6 +440,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error {
        state.pings = make(chan struct{})
        state.infosTheir = make(map[[32]byte]*SPInfo)
        state.infosOurSeen = make(map[[32]byte]uint8)
+       state.progressBars = make(map[string]struct{})
        state.started = started
        state.rxLock = rxLock
        state.txLock = txLock
@@ -557,6 +558,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        state.pings = make(chan struct{})
        state.infosOurSeen = make(map[[32]byte]uint8)
        state.infosTheir = make(map[[32]byte]*SPInfo)
+       state.progressBars = make(map[string]struct{})
        state.started = started
        state.xxOnly = xxOnly
 
@@ -669,11 +671,12 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
 }
 
 func (state *SPState) closeFd(pth string) {
-       s, exists := state.fds[pth]
-       delete(state.fds, pth)
-       if exists {
+       state.fdsLock.Lock()
+       if s, exists := state.fds[pth]; exists {
+               delete(state.fds, pth)
                s.fd.Close()
        }
+       state.fdsLock.Unlock()
 }
 
 func (state *SPState) StartWorkers(
@@ -729,7 +732,7 @@ func (state *SPState) StartWorkers(
                        for _, payload := range infosPayloads[1:] {
                                state.Ctx.LogD(
                                        "sp-queue-remaining",
-                                       append(les, LE{"Size", len(payload)}),
+                                       append(les, LE{"Size", int64(len(payload))}),
                                        func(les LEs) string {
                                                return fmt.Sprintf(
                                                        "SP with %s (nice %s): queuing remaining payload (%s)",
@@ -752,7 +755,7 @@ func (state *SPState) StartWorkers(
                        humanize.IBytes(uint64(len(payload))),
                )
        }
-       state.Ctx.LogD("sp-process", append(les, LE{"Size", len(payload)}), logMsg)
+       state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
        replies, err := state.ProcessSP(payload)
        if err != nil {
                state.Ctx.LogE("sp-process", les, err, logMsg)
@@ -763,7 +766,7 @@ func (state *SPState) StartWorkers(
                for _, reply := range replies {
                        state.Ctx.LogD(
                                "sp-queue-reply",
-                               append(les, LE{"Size", len(reply)}),
+                               append(les, LE{"Size", int64(len(reply))}),
                                func(les LEs) string {
                                        return fmt.Sprintf(
                                                "SP with %s (nice %s): queuing reply (%s)",
@@ -790,13 +793,20 @@ func (state *SPState) StartWorkers(
                                pingTicker.Stop()
                                return
                        case now := <-deadlineTicker.C:
-                               if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
-                                       now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
-                                       (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
-                                       (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
-                                       state.SetDead()
-                                       conn.Close() // #nosec G104
+                               if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
+                                       now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
+                                       goto Deadlined
+                               }
+                               if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
+                                       goto Deadlined
                                }
+                               if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
+                                       goto Deadlined
+                               }
+                               break
+                       Deadlined:
+                               state.SetDead()
+                               conn.Close() // #nosec G104
                        case now := <-pingTicker.C:
                                if now.After(state.TxLastSeen.Add(PingTimeout)) {
                                        state.wg.Add(1)
@@ -828,7 +838,7 @@ func (state *SPState) StartWorkers(
                                        ) {
                                                state.Ctx.LogD(
                                                        "sp-queue-info",
-                                                       append(les, LE{"Size", len(payload)}),
+                                                       append(les, LE{"Size", int64(len(payload))}),
                                                        func(les LEs) string {
                                                                return fmt.Sprintf(
                                                                        "SP with %s (nice %s): queuing new info (%s)",
@@ -869,7 +879,7 @@ func (state *SPState) StartWorkers(
                        case payload = <-state.payloads:
                                state.Ctx.LogD(
                                        "sp-got-payload",
-                                       append(les, LE{"Size", len(payload)}),
+                                       append(les, LE{"Size", int64(len(payload))}),
                                        func(les LEs) string {
                                                return fmt.Sprintf(
                                                        "SP with %s (nice %s): got payload (%s)",
@@ -914,7 +924,9 @@ func (state *SPState) StartWorkers(
                                        string(TTx),
                                        Base32Codec.EncodeToString(freq.Hash[:]),
                                )
+                               state.fdsLock.RLock()
                                fdAndFullSize, exists := state.fds[pth]
+                               state.fdsLock.RUnlock()
                                if !exists {
                                        fd, err := os.Open(pth)
                                        if err != nil {
@@ -931,7 +943,9 @@ func (state *SPState) StartWorkers(
                                                return
                                        }
                                        fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+                                       state.fdsLock.Lock()
                                        state.fds[pth] = fdAndFullSize
+                                       state.fdsLock.Unlock()
                                }
                                fd := fdAndFullSize.fd
                                fullSize := fdAndFullSize.fullSize
@@ -955,16 +969,18 @@ func (state *SPState) StartWorkers(
                                                return
                                        }
                                        buf = buf[:n]
-                                       state.Ctx.LogD(
-                                               "sp-file-read",
-                                               append(lesp, LE{"Size", n}),
-                                               func(les LEs) string {
-                                                       return fmt.Sprintf(
-                                                               "%s: read %s",
-                                                               logMsg(les), humanize.IBytes(uint64(n)),
-                                                       )
-                                               },
+                                       lesp = append(
+                                               les,
+                                               LE{"XX", string(TTx)},
+                                               LE{"Pkt", pktName},
+                                               LE{"Size", int64(n)},
                                        )
+                                       state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "%s: read %s",
+                                                       logMsg(les), humanize.IBytes(uint64(n)),
+                                               )
+                                       })
                                }
                                state.closeFd(pth)
                                payload = MarshalSP(SPTypeFile, SPFile{
@@ -973,8 +989,15 @@ func (state *SPState) StartWorkers(
                                        Payload: buf,
                                })
                                ourSize := freq.Offset + uint64(len(buf))
-                               lesp = append(lesp, LE{"Size", int64(ourSize)}, LE{"FullSize", fullSize})
+                               lesp = append(
+                                       les,
+                                       LE{"XX", string(TTx)},
+                                       LE{"Pkt", pktName},
+                                       LE{"Size", int64(ourSize)},
+                                       LE{"FullSize", fullSize},
+                               )
                                if state.Ctx.ShowPrgrs {
+                                       state.progressBars[pktName] = struct{}{}
                                        Progress("Tx", lesp)
                                }
                                state.Lock()
@@ -988,6 +1011,9 @@ func (state *SPState) StartWorkers(
                                                } else {
                                                        state.queueTheir = state.queueTheir[:0]
                                                }
+                                               if state.Ctx.ShowPrgrs {
+                                                       delete(state.progressBars, pktName)
+                                               }
                                        } else {
                                                state.queueTheir[0].freq.Offset += uint64(len(buf))
                                        }
@@ -1005,7 +1031,7 @@ func (state *SPState) StartWorkers(
                                        humanize.IBytes(uint64(len(payload))),
                                )
                        }
-                       state.Ctx.LogD("sp-sending", append(les, LE{"Size", len(payload)}), logMsg)
+                       state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
                        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
                        if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
                                state.Ctx.LogE("sp-sending", les, err, logMsg)
@@ -1053,7 +1079,7 @@ func (state *SPState) StartWorkers(
                        }
                        state.Ctx.LogD(
                                "sp-recv-got",
-                               append(les, LE{"Size", len(payload)}),
+                               append(les, LE{"Size", int64(len(payload))}),
                                func(les LEs) string { return logMsg(les) + ": got" },
                        )
                        payload, err = state.csTheir.Decrypt(nil, nil, payload)
@@ -1065,7 +1091,7 @@ func (state *SPState) StartWorkers(
                        }
                        state.Ctx.LogD(
                                "sp-recv-process",
-                               append(les, LE{"Size", len(payload)}),
+                               append(les, LE{"Size", int64(len(payload))}),
                                func(les LEs) string {
                                        return logMsg(les) + ": processing"
                                },
@@ -1082,7 +1108,7 @@ func (state *SPState) StartWorkers(
                                for _, reply := range replies {
                                        state.Ctx.LogD(
                                                "sp-recv-reply",
-                                               append(les[:len(les)-1], LE{"Size", len(reply)}),
+                                               append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
                                                func(les LEs) string {
                                                        return fmt.Sprintf(
                                                                "SP with %s (nice %s): queuing reply (%s)",
@@ -1112,8 +1138,9 @@ func (state *SPState) Wait() {
        state.wg.Wait()
        close(state.payloads)
        close(state.pings)
-       state.dirUnlock()
        state.Duration = time.Now().Sub(state.started)
+       SPCheckersWg.Wait()
+       state.dirUnlock()
        state.RxSpeed = state.RxBytes
        state.TxSpeed = state.TxBytes
        rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
@@ -1124,6 +1151,12 @@ func (state *SPState) Wait() {
        if txDuration > 0 {
                state.TxSpeed = state.TxBytes / txDuration
        }
+       for _, s := range state.fds {
+               s.fd.Close()
+       }
+       for pktName := range state.progressBars {
+               ProgressKill(pktName)
+       }
 }
 
 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
@@ -1326,7 +1359,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                lesp,
                                LE{"XX", string(TRx)},
                                LE{"Pkt", pktName},
-                               LE{"Size", len(file.Payload)},
+                               LE{"Size", int64(len(file.Payload))},
                        )
                        logMsg := func(les LEs) string {
                                return fmt.Sprintf(
@@ -1344,7 +1377,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
                                return logMsg(les) + ": opening part"
                        })
+                       state.fdsLock.RLock()
                        fdAndFullSize, exists := state.fds[filePathPart]
+                       state.fdsLock.RUnlock()
                        var fd *os.File
                        if exists {
                                fd = fdAndFullSize.fd
@@ -1360,7 +1395,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        })
                                        return nil, err
                                }
+                               state.fdsLock.Lock()
                                state.fds[filePathPart] = FdAndFullSize{fd: fd}
+                               state.fdsLock.Unlock()
                                if file.Offset == 0 {
                                        h, err := blake2b.New256(nil)
                                        if err != nil {
@@ -1421,11 +1458,15 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        }
                        lesp = append(lesp, LE{"FullSize", fullsize})
                        if state.Ctx.ShowPrgrs {
+                               state.progressBars[pktName] = struct{}{}
                                Progress("Rx", lesp)
                        }
                        if fullsize != ourSize {
                                continue
                        }
+                       if state.Ctx.ShowPrgrs {
+                               delete(state.progressBars, pktName)
+                       }
                        logMsg = func(les LEs) string {
                                return fmt.Sprintf(
                                        "Got packet %s %d%% (%s / %s)",