]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/sp.go
Debug friendly if conditions
[nncp.git] / src / sp.go
index f2c707f1029bd15bd8585bcb974b3da017097cf7..2dde928eaeecff5a84841cb0d8889a1a8306793c 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -233,8 +233,10 @@ type SPState struct {
        onlyPkts       map[[32]byte]bool
        writeSPBuf     bytes.Buffer
        fds            map[string]FdAndFullSize
+       fdsLock        sync.RWMutex
        fileHashers    map[string]*HasherAndOffset
        checkerQueues  SPCheckerQueues
+       progressBars   map[string]struct{}
        sync.RWMutex
 }
 
@@ -440,6 +442,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error {
        state.pings = make(chan struct{})
        state.infosTheir = make(map[[32]byte]*SPInfo)
        state.infosOurSeen = make(map[[32]byte]uint8)
+       state.progressBars = make(map[string]struct{})
        state.started = started
        state.rxLock = rxLock
        state.txLock = txLock
@@ -557,6 +560,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        state.pings = make(chan struct{})
        state.infosOurSeen = make(map[[32]byte]uint8)
        state.infosTheir = make(map[[32]byte]*SPInfo)
+       state.progressBars = make(map[string]struct{})
        state.started = started
        state.xxOnly = xxOnly
 
@@ -669,11 +673,12 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
 }
 
 func (state *SPState) closeFd(pth string) {
-       s, exists := state.fds[pth]
-       delete(state.fds, pth)
-       if exists {
+       state.fdsLock.Lock()
+       if s, exists := state.fds[pth]; exists {
+               delete(state.fds, pth)
                s.fd.Close()
        }
+       state.fdsLock.Unlock()
 }
 
 func (state *SPState) StartWorkers(
@@ -729,7 +734,7 @@ func (state *SPState) StartWorkers(
                        for _, payload := range infosPayloads[1:] {
                                state.Ctx.LogD(
                                        "sp-queue-remaining",
-                                       append(les, LE{"Size", len(payload)}),
+                                       append(les, LE{"Size", int64(len(payload))}),
                                        func(les LEs) string {
                                                return fmt.Sprintf(
                                                        "SP with %s (nice %s): queuing remaining payload (%s)",
@@ -752,7 +757,7 @@ func (state *SPState) StartWorkers(
                        humanize.IBytes(uint64(len(payload))),
                )
        }
-       state.Ctx.LogD("sp-process", append(les, LE{"Size", len(payload)}), logMsg)
+       state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
        replies, err := state.ProcessSP(payload)
        if err != nil {
                state.Ctx.LogE("sp-process", les, err, logMsg)
@@ -763,7 +768,7 @@ func (state *SPState) StartWorkers(
                for _, reply := range replies {
                        state.Ctx.LogD(
                                "sp-queue-reply",
-                               append(les, LE{"Size", len(reply)}),
+                               append(les, LE{"Size", int64(len(reply))}),
                                func(les LEs) string {
                                        return fmt.Sprintf(
                                                "SP with %s (nice %s): queuing reply (%s)",
@@ -790,13 +795,20 @@ func (state *SPState) StartWorkers(
                                pingTicker.Stop()
                                return
                        case now := <-deadlineTicker.C:
-                               if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
-                                       now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
-                                       (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
-                                       (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
-                                       state.SetDead()
-                                       conn.Close() // #nosec G104
+                               if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
+                                       now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
+                                       goto Deadlined
                                }
+                               if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
+                                       goto Deadlined
+                               }
+                               if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
+                                       goto Deadlined
+                               }
+                               break
+                       Deadlined:
+                               state.SetDead()
+                               conn.Close() // #nosec G104
                        case now := <-pingTicker.C:
                                if now.After(state.TxLastSeen.Add(PingTimeout)) {
                                        state.wg.Add(1)
@@ -828,7 +840,7 @@ func (state *SPState) StartWorkers(
                                        ) {
                                                state.Ctx.LogD(
                                                        "sp-queue-info",
-                                                       append(les, LE{"Size", len(payload)}),
+                                                       append(les, LE{"Size", int64(len(payload))}),
                                                        func(les LEs) string {
                                                                return fmt.Sprintf(
                                                                        "SP with %s (nice %s): queuing new info (%s)",
@@ -869,7 +881,7 @@ func (state *SPState) StartWorkers(
                        case payload = <-state.payloads:
                                state.Ctx.LogD(
                                        "sp-got-payload",
-                                       append(les, LE{"Size", len(payload)}),
+                                       append(les, LE{"Size", int64(len(payload))}),
                                        func(les LEs) string {
                                                return fmt.Sprintf(
                                                        "SP with %s (nice %s): got payload (%s)",
@@ -914,7 +926,9 @@ func (state *SPState) StartWorkers(
                                        string(TTx),
                                        Base32Codec.EncodeToString(freq.Hash[:]),
                                )
+                               state.fdsLock.RLock()
                                fdAndFullSize, exists := state.fds[pth]
+                               state.fdsLock.RUnlock()
                                if !exists {
                                        fd, err := os.Open(pth)
                                        if err != nil {
@@ -931,7 +945,9 @@ func (state *SPState) StartWorkers(
                                                return
                                        }
                                        fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+                                       state.fdsLock.Lock()
                                        state.fds[pth] = fdAndFullSize
+                                       state.fdsLock.Unlock()
                                }
                                fd := fdAndFullSize.fd
                                fullSize := fdAndFullSize.fullSize
@@ -955,16 +971,18 @@ func (state *SPState) StartWorkers(
                                                return
                                        }
                                        buf = buf[:n]
-                                       state.Ctx.LogD(
-                                               "sp-file-read",
-                                               append(lesp, LE{"Size", n}),
-                                               func(les LEs) string {
-                                                       return fmt.Sprintf(
-                                                               "%s: read %s",
-                                                               logMsg(les), humanize.IBytes(uint64(n)),
-                                                       )
-                                               },
+                                       lesp = append(
+                                               les,
+                                               LE{"XX", string(TTx)},
+                                               LE{"Pkt", pktName},
+                                               LE{"Size", int64(n)},
                                        )
+                                       state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "%s: read %s",
+                                                       logMsg(les), humanize.IBytes(uint64(n)),
+                                               )
+                                       })
                                }
                                state.closeFd(pth)
                                payload = MarshalSP(SPTypeFile, SPFile{
@@ -973,8 +991,15 @@ func (state *SPState) StartWorkers(
                                        Payload: buf,
                                })
                                ourSize := freq.Offset + uint64(len(buf))
-                               lesp = append(lesp, LE{"Size", int64(ourSize)}, LE{"FullSize", fullSize})
+                               lesp = append(
+                                       les,
+                                       LE{"XX", string(TTx)},
+                                       LE{"Pkt", pktName},
+                                       LE{"Size", int64(ourSize)},
+                                       LE{"FullSize", fullSize},
+                               )
                                if state.Ctx.ShowPrgrs {
+                                       state.progressBars[pktName] = struct{}{}
                                        Progress("Tx", lesp)
                                }
                                state.Lock()
@@ -988,6 +1013,9 @@ func (state *SPState) StartWorkers(
                                                } else {
                                                        state.queueTheir = state.queueTheir[:0]
                                                }
+                                               if state.Ctx.ShowPrgrs {
+                                                       delete(state.progressBars, pktName)
+                                               }
                                        } else {
                                                state.queueTheir[0].freq.Offset += uint64(len(buf))
                                        }
@@ -1005,7 +1033,7 @@ func (state *SPState) StartWorkers(
                                        humanize.IBytes(uint64(len(payload))),
                                )
                        }
-                       state.Ctx.LogD("sp-sending", append(les, LE{"Size", len(payload)}), logMsg)
+                       state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
                        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
                        if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
                                state.Ctx.LogE("sp-sending", les, err, logMsg)
@@ -1053,7 +1081,7 @@ func (state *SPState) StartWorkers(
                        }
                        state.Ctx.LogD(
                                "sp-recv-got",
-                               append(les, LE{"Size", len(payload)}),
+                               append(les, LE{"Size", int64(len(payload))}),
                                func(les LEs) string { return logMsg(les) + ": got" },
                        )
                        payload, err = state.csTheir.Decrypt(nil, nil, payload)
@@ -1065,7 +1093,7 @@ func (state *SPState) StartWorkers(
                        }
                        state.Ctx.LogD(
                                "sp-recv-process",
-                               append(les, LE{"Size", len(payload)}),
+                               append(les, LE{"Size", int64(len(payload))}),
                                func(les LEs) string {
                                        return logMsg(les) + ": processing"
                                },
@@ -1082,7 +1110,7 @@ func (state *SPState) StartWorkers(
                                for _, reply := range replies {
                                        state.Ctx.LogD(
                                                "sp-recv-reply",
-                                               append(les[:len(les)-1], LE{"Size", len(reply)}),
+                                               append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
                                                func(les LEs) string {
                                                        return fmt.Sprintf(
                                                                "SP with %s (nice %s): queuing reply (%s)",
@@ -1124,6 +1152,9 @@ func (state *SPState) Wait() {
        if txDuration > 0 {
                state.TxSpeed = state.TxBytes / txDuration
        }
+       for pktName := range state.progressBars {
+               ProgressKill(pktName)
+       }
 }
 
 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
@@ -1326,7 +1357,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                lesp,
                                LE{"XX", string(TRx)},
                                LE{"Pkt", pktName},
-                               LE{"Size", len(file.Payload)},
+                               LE{"Size", int64(len(file.Payload))},
                        )
                        logMsg := func(les LEs) string {
                                return fmt.Sprintf(
@@ -1344,7 +1375,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
                                return logMsg(les) + ": opening part"
                        })
+                       state.fdsLock.RLock()
                        fdAndFullSize, exists := state.fds[filePathPart]
+                       state.fdsLock.RUnlock()
                        var fd *os.File
                        if exists {
                                fd = fdAndFullSize.fd
@@ -1360,7 +1393,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        })
                                        return nil, err
                                }
+                               state.fdsLock.Lock()
                                state.fds[filePathPart] = FdAndFullSize{fd: fd}
+                               state.fdsLock.Unlock()
                                if file.Offset == 0 {
                                        h, err := blake2b.New256(nil)
                                        if err != nil {
@@ -1421,11 +1456,15 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        }
                        lesp = append(lesp, LE{"FullSize", fullsize})
                        if state.Ctx.ShowPrgrs {
+                               state.progressBars[pktName] = struct{}{}
                                Progress("Rx", lesp)
                        }
                        if fullsize != ourSize {
                                continue
                        }
+                       if state.Ctx.ShowPrgrs {
+                               delete(state.progressBars, pktName)
+                       }
                        logMsg = func(les LEs) string {
                                return fmt.Sprintf(
                                        "Got packet %s %d%% (%s / %s)",