]> Cypherpunks.ru repositories - udpobfs.git/blobdiff - cmd/init/main.go
RWMutex is not that bad
[udpobfs.git] / cmd / init / main.go
index 77b82139b67ad12d1163e9419729589a037043f78c9847fbe2ff84ff3f83604e..3564dbf0ed737d1a4c718e6774742fddbf2ffce3c45b5fbf264fea4c00584b48 100644 (file)
@@ -44,6 +44,7 @@ var (
        TLSConfig  *tls.Config
        LnUDP      *net.UDPConn
        Peers      = make(map[string]chan udpobfs.Buf)
+       PeersM     sync.RWMutex
        Bufs       = sync.Pool{New: func() any { return new([udpobfs.BufLen]byte) }}
 )
 
@@ -83,8 +84,17 @@ func newPeer(localAddr net.Addr, dataInitial []byte) {
        }
        cryptoState := udpobfs.NewCryptoState(seed, true)
        txs := make(chan udpobfs.Buf)
-       rxFinished := make(chan struct{})
+       PeersM.Lock()
+       Peers[localAddr.String()] = txs
+       PeersM.Unlock()
        var rxPkts, txPkts, rxBytes, txBytes int64
+       {
+               txPkts++
+               txBytes += int64(len(dataInitial))
+               tmp := make([]byte, udpobfs.SeqLen+len(dataInitial))
+               connUDP.WriteTo(cryptoState.Tx(tmp, dataInitial), DstAddrUDP)
+       }
+       rxFinished := make(chan struct{})
        go func() {
                var n int
                var err error
@@ -129,6 +139,7 @@ func newPeer(localAddr net.Addr, dataInitial []byte) {
                lastPing := now
                last := now
                var got []byte
+               var ok bool
                for {
                        select {
                        case <-ticker.C:
@@ -142,8 +153,8 @@ func newPeer(localAddr net.Addr, dataInitial []byte) {
                                                cryptoState.Tx(buf[:udpobfs.SeqLen], nil), DstAddrUDP)
                                        lastPing = now
                                }
-                       case tx = <-txs:
-                               if tx.Buf == nil {
+                       case tx, ok = <-txs:
+                               if !ok {
                                        return
                                }
                                got = cryptoState.Tx(buf[:udpobfs.SeqLen+tx.N], (*tx.Buf)[:tx.N])
@@ -156,13 +167,6 @@ func newPeer(localAddr net.Addr, dataInitial []byte) {
                        }
                }
        }()
-       Peers[localAddr.String()] = txs
-       {
-               txPkts++
-               txBytes += int64(len(dataInitial))
-               tmp := make([]byte, udpobfs.SeqLen+len(dataInitial))
-               connUDP.WriteTo(cryptoState.Tx(tmp, dataInitial), DstAddrUDP)
-       }
        go func() {
                defer connUDP.Close()
                ticker := time.NewTicker(udpobfs.LifetimeDuration)
@@ -202,12 +206,10 @@ func newPeer(localAddr net.Addr, dataInitial []byte) {
                "rxBytes", rxBytes,
                "txPkts", txPkts,
                "txBytes", txBytes)
+       PeersM.Lock()
        delete(Peers, localAddr.String())
-       txs <- udpobfs.Buf{Buf: nil}
-       go func() {
-               for range txs {
-               }
-       }()
+       PeersM.Unlock()
+       close(txs)
 }
 
 func main() {
@@ -277,10 +279,12 @@ func main() {
                if n == 0 {
                        continue
                }
+               PeersM.RLock()
                txs = Peers[from.String()]
                if txs != nil {
                        txs <- udpobfs.Buf{Buf: buf, N: n}
                }
+               PeersM.RUnlock()
                if txs == nil {
                        neu := make([]byte, n)
                        copy(neu, (*buf)[:n])