]> Cypherpunks.ru repositories - govpn.git/blobdiff - src/cypherpunks.ru/govpn/peer.go
Refactor and simplify CPR work code
[govpn.git] / src / cypherpunks.ru / govpn / peer.go
index 5ea245bbd44a02aa196c66801b3bd22b4c3f19f3..9312b64b69710cef8a3993d7a9ba1b8444811e7e 100644 (file)
@@ -39,7 +39,7 @@ const (
        // S20BS is Salsa20's internal blocksize in bytes
        S20BS = 64
        // Maximal amount of bytes transfered with single key (4 GiB)
-       MaxBytesPerKey int64 = 1 << 32
+       MaxBytesPerKey uint64 = 1 << 32
        // Heartbeat rate, relative to Timeout
        TimeoutHeartbeat = 4
        // Minimal valid packet length
@@ -64,9 +64,22 @@ func newNonceCipher(key *[32]byte) *xtea.Cipher {
 }
 
 type Peer struct {
+       // Statistics (they are at the beginning for correct int64 alignment)
+       BytesIn         uint64
+       BytesOut        uint64
+       BytesPayloadIn  uint64
+       BytesPayloadOut uint64
+       FramesIn        uint64
+       FramesOut       uint64
+       FramesUnauth    uint64
+       FramesDup       uint64
+       HeartbeatRecv   uint64
+       HeartbeatSent   uint64
+
+       // Basic
        Addr string
        Id   *PeerId
-       Conn io.Writer
+       Conn io.Writer `json:"-"`
 
        // Traffic behaviour
        NoiseEnable bool
@@ -89,23 +102,9 @@ type Peer struct {
        nonceBucketN int32
 
        // Timers
-       Timeout       time.Duration `json:"-"`
-       Established   time.Time
-       LastPing      time.Time
-       LastSent      time.Time
-       willSentCycle time.Time
-
-       // Statistics
-       BytesIn         int64
-       BytesOut        int64
-       BytesPayloadIn  int64
-       BytesPayloadOut int64
-       FramesIn        int
-       FramesOut       int
-       FramesUnauth    int
-       FramesDup       int
-       HeartbeatRecv   int
-       HeartbeatSent   int
+       Timeout     time.Duration `json:"-"`
+       Established time.Time
+       LastPing    time.Time
 
        // Receiver
        BusyR    sync.Mutex `json:"-"`
@@ -120,7 +119,6 @@ type Peer struct {
        tagT     *[TagSize]byte
        keyAuthT *[SSize]byte
        frameT   []byte
-       now      time.Time
 }
 
 func (p *Peer) String() string {
@@ -223,17 +221,11 @@ func (p *Peer) EthProcess(data []byte) {
                log.Println("Padded data packet size", len(data)+1, "is bigger than MTU", p.MTU, p)
                return
        }
-       p.now = time.Now()
        p.BusyT.Lock()
 
        // Zero size is a heartbeat packet
        SliceZero(p.bufT)
        if len(data) == 0 {
-               // If this heartbeat is necessary
-               if !p.LastSent.Add(p.Timeout).Before(p.now) {
-                       p.BusyT.Unlock()
-                       return
-               }
                p.bufT[S20BS+0] = PadByte
                p.HeartbeatSent++
        } else {
@@ -241,7 +233,7 @@ func (p *Peer) EthProcess(data []byte) {
                // accept the next one
                copy(p.bufT[S20BS:], data)
                p.bufT[S20BS+len(data)] = PadByte
-               p.BytesPayloadOut += int64(len(data))
+               p.BytesPayloadOut += uint64(len(data))
        }
 
        if p.NoiseEnable && !p.Encless {
@@ -278,20 +270,10 @@ func (p *Peer) EthProcess(data []byte) {
                )
                copy(p.keyAuthT[:], p.bufT[:SSize])
                poly1305.Sum(p.tagT, p.frameT, p.keyAuthT)
-               atomic.AddInt64(&p.BytesOut, int64(len(p.frameT)+TagSize))
+               atomic.AddUint64(&p.BytesOut, uint64(len(p.frameT)+TagSize))
                out = append(p.tagT[:], p.frameT...)
        }
        p.FramesOut++
-
-       if p.CPRCycle != time.Duration(0) {
-               p.willSentCycle = p.LastSent.Add(p.CPRCycle)
-               if p.willSentCycle.After(p.now) {
-                       time.Sleep(p.willSentCycle.Sub(p.now))
-                       p.now = p.willSentCycle
-               }
-       }
-
-       p.LastSent = p.now
        p.Conn.Write(out)
        p.BusyT.Unlock()
 }
@@ -376,7 +358,7 @@ func (p *Peer) PktProcess(data []byte, tap io.Writer, reorderable bool) bool {
        }
 
        p.FramesIn++
-       atomic.AddInt64(&p.BytesIn, int64(len(data)))
+       atomic.AddUint64(&p.BytesIn, uint64(len(data)))
        p.LastPing = time.Now()
        p.pktSizeR = bytes.LastIndexByte(out, PadByte)
        if p.pktSizeR == -1 {
@@ -396,8 +378,52 @@ func (p *Peer) PktProcess(data []byte, tap io.Writer, reorderable bool) bool {
                p.BusyR.Unlock()
                return true
        }
-       p.BytesPayloadIn += int64(p.pktSizeR)
+       p.BytesPayloadIn += uint64(p.pktSizeR)
        tap.Write(out[:p.pktSizeR])
        p.BusyR.Unlock()
        return true
 }
+
+func PeerTapProcessor(peer *Peer, tap *TAP, terminator chan struct{}) {
+       var data []byte
+       var now time.Time
+       lastSent := time.Now()
+       heartbeat := time.NewTicker(peer.Timeout)
+       if peer.CPRCycle == time.Duration(0) {
+       RawProcessor:
+               for {
+                       select {
+                       case <-terminator:
+                               break RawProcessor
+                       case <-heartbeat.C:
+                               now = time.Now()
+                               if lastSent.Add(peer.Timeout).Before(now) {
+                                       peer.EthProcess(nil)
+                                       lastSent = now
+                               }
+                       case data = <-tap.Sink:
+                               peer.EthProcess(data)
+                               lastSent = time.Now()
+                       }
+               }
+       } else {
+       CPRProcessor:
+               for {
+                       data = nil
+                       select {
+                       case <-terminator:
+                               break CPRProcessor
+                       case data = <-tap.Sink:
+                               peer.EthProcess(data)
+                       default:
+                       }
+                       if data == nil {
+                               peer.EthProcess(nil)
+                       }
+                       time.Sleep(peer.CPRCycle)
+               }
+       }
+       close(terminator)
+       peer.Zero()
+       heartbeat.Stop()
+}