]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/sp.go
Fixed possible data race
[nncp.git] / src / sp.go
index f2c707f1029bd15bd8585bcb974b3da017097cf7..6d31375e137ff3a044d0bb5c1ad8ea4c481b4dd0 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -233,6 +233,7 @@ type SPState struct {
        onlyPkts       map[[32]byte]bool
        writeSPBuf     bytes.Buffer
        fds            map[string]FdAndFullSize
+       fdsLock        sync.RWMutex
        fileHashers    map[string]*HasherAndOffset
        checkerQueues  SPCheckerQueues
        sync.RWMutex
@@ -669,11 +670,12 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
 }
 
 func (state *SPState) closeFd(pth string) {
-       s, exists := state.fds[pth]
-       delete(state.fds, pth)
-       if exists {
+       state.fdsLock.Lock()
+       if s, exists := state.fds[pth]; exists {
+               delete(state.fds, pth)
                s.fd.Close()
        }
+       state.fdsLock.Unlock()
 }
 
 func (state *SPState) StartWorkers(
@@ -729,7 +731,7 @@ func (state *SPState) StartWorkers(
                        for _, payload := range infosPayloads[1:] {
                                state.Ctx.LogD(
                                        "sp-queue-remaining",
-                                       append(les, LE{"Size", len(payload)}),
+                                       append(les, LE{"Size", int64(len(payload))}),
                                        func(les LEs) string {
                                                return fmt.Sprintf(
                                                        "SP with %s (nice %s): queuing remaining payload (%s)",
@@ -752,7 +754,7 @@ func (state *SPState) StartWorkers(
                        humanize.IBytes(uint64(len(payload))),
                )
        }
-       state.Ctx.LogD("sp-process", append(les, LE{"Size", len(payload)}), logMsg)
+       state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
        replies, err := state.ProcessSP(payload)
        if err != nil {
                state.Ctx.LogE("sp-process", les, err, logMsg)
@@ -763,7 +765,7 @@ func (state *SPState) StartWorkers(
                for _, reply := range replies {
                        state.Ctx.LogD(
                                "sp-queue-reply",
-                               append(les, LE{"Size", len(reply)}),
+                               append(les, LE{"Size", int64(len(reply))}),
                                func(les LEs) string {
                                        return fmt.Sprintf(
                                                "SP with %s (nice %s): queuing reply (%s)",
@@ -828,7 +830,7 @@ func (state *SPState) StartWorkers(
                                        ) {
                                                state.Ctx.LogD(
                                                        "sp-queue-info",
-                                                       append(les, LE{"Size", len(payload)}),
+                                                       append(les, LE{"Size", int64(len(payload))}),
                                                        func(les LEs) string {
                                                                return fmt.Sprintf(
                                                                        "SP with %s (nice %s): queuing new info (%s)",
@@ -869,7 +871,7 @@ func (state *SPState) StartWorkers(
                        case payload = <-state.payloads:
                                state.Ctx.LogD(
                                        "sp-got-payload",
-                                       append(les, LE{"Size", len(payload)}),
+                                       append(les, LE{"Size", int64(len(payload))}),
                                        func(les LEs) string {
                                                return fmt.Sprintf(
                                                        "SP with %s (nice %s): got payload (%s)",
@@ -914,7 +916,9 @@ func (state *SPState) StartWorkers(
                                        string(TTx),
                                        Base32Codec.EncodeToString(freq.Hash[:]),
                                )
+                               state.fdsLock.RLock()
                                fdAndFullSize, exists := state.fds[pth]
+                               state.fdsLock.RUnlock()
                                if !exists {
                                        fd, err := os.Open(pth)
                                        if err != nil {
@@ -931,7 +935,9 @@ func (state *SPState) StartWorkers(
                                                return
                                        }
                                        fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+                                       state.fdsLock.Lock()
                                        state.fds[pth] = fdAndFullSize
+                                       state.fdsLock.Unlock()
                                }
                                fd := fdAndFullSize.fd
                                fullSize := fdAndFullSize.fullSize
@@ -955,16 +961,18 @@ func (state *SPState) StartWorkers(
                                                return
                                        }
                                        buf = buf[:n]
-                                       state.Ctx.LogD(
-                                               "sp-file-read",
-                                               append(lesp, LE{"Size", n}),
-                                               func(les LEs) string {
-                                                       return fmt.Sprintf(
-                                                               "%s: read %s",
-                                                               logMsg(les), humanize.IBytes(uint64(n)),
-                                                       )
-                                               },
+                                       lesp = append(
+                                               les,
+                                               LE{"XX", string(TTx)},
+                                               LE{"Pkt", pktName},
+                                               LE{"Size", int64(n)},
                                        )
+                                       state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
+                                               return fmt.Sprintf(
+                                                       "%s: read %s",
+                                                       logMsg(les), humanize.IBytes(uint64(n)),
+                                               )
+                                       })
                                }
                                state.closeFd(pth)
                                payload = MarshalSP(SPTypeFile, SPFile{
@@ -973,7 +981,13 @@ func (state *SPState) StartWorkers(
                                        Payload: buf,
                                })
                                ourSize := freq.Offset + uint64(len(buf))
-                               lesp = append(lesp, LE{"Size", int64(ourSize)}, LE{"FullSize", fullSize})
+                               lesp = append(
+                                       les,
+                                       LE{"XX", string(TTx)},
+                                       LE{"Pkt", pktName},
+                                       LE{"Size", int64(ourSize)},
+                                       LE{"FullSize", fullSize},
+                               )
                                if state.Ctx.ShowPrgrs {
                                        Progress("Tx", lesp)
                                }
@@ -1005,7 +1019,7 @@ func (state *SPState) StartWorkers(
                                        humanize.IBytes(uint64(len(payload))),
                                )
                        }
-                       state.Ctx.LogD("sp-sending", append(les, LE{"Size", len(payload)}), logMsg)
+                       state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
                        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
                        if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
                                state.Ctx.LogE("sp-sending", les, err, logMsg)
@@ -1053,7 +1067,7 @@ func (state *SPState) StartWorkers(
                        }
                        state.Ctx.LogD(
                                "sp-recv-got",
-                               append(les, LE{"Size", len(payload)}),
+                               append(les, LE{"Size", int64(len(payload))}),
                                func(les LEs) string { return logMsg(les) + ": got" },
                        )
                        payload, err = state.csTheir.Decrypt(nil, nil, payload)
@@ -1065,7 +1079,7 @@ func (state *SPState) StartWorkers(
                        }
                        state.Ctx.LogD(
                                "sp-recv-process",
-                               append(les, LE{"Size", len(payload)}),
+                               append(les, LE{"Size", int64(len(payload))}),
                                func(les LEs) string {
                                        return logMsg(les) + ": processing"
                                },
@@ -1082,7 +1096,7 @@ func (state *SPState) StartWorkers(
                                for _, reply := range replies {
                                        state.Ctx.LogD(
                                                "sp-recv-reply",
-                                               append(les[:len(les)-1], LE{"Size", len(reply)}),
+                                               append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
                                                func(les LEs) string {
                                                        return fmt.Sprintf(
                                                                "SP with %s (nice %s): queuing reply (%s)",
@@ -1326,7 +1340,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                lesp,
                                LE{"XX", string(TRx)},
                                LE{"Pkt", pktName},
-                               LE{"Size", len(file.Payload)},
+                               LE{"Size", int64(len(file.Payload))},
                        )
                        logMsg := func(les LEs) string {
                                return fmt.Sprintf(
@@ -1344,7 +1358,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
                                return logMsg(les) + ": opening part"
                        })
+                       state.fdsLock.RLock()
                        fdAndFullSize, exists := state.fds[filePathPart]
+                       state.fdsLock.RUnlock()
                        var fd *os.File
                        if exists {
                                fd = fdAndFullSize.fd
@@ -1360,7 +1376,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        })
                                        return nil, err
                                }
+                               state.fdsLock.Lock()
                                state.fds[filePathPart] = FdAndFullSize{fd: fd}
+                               state.fdsLock.Unlock()
                                if file.Offset == 0 {
                                        h, err := blake2b.New256(nil)
                                        if err != nil {