From: Sergey Matveev Date: Mon, 9 May 2016 09:58:57 +0000 (+0300) Subject: Refactor and simplify CPR work code X-Git-Tag: 5.8^2~3 X-Git-Url: http://www.git.cypherpunks.ru/?p=govpn.git;a=commitdiff_plain;h=ce2d12cc15b31a2a1157123f47e58e7857436783 Refactor and simplify CPR work code --- diff --git a/src/cypherpunks.ru/govpn/cmd/govpn-client/tcp.go b/src/cypherpunks.ru/govpn/cmd/govpn-client/tcp.go index 38a8cb5..dc0eb60 100644 --- a/src/cypherpunks.ru/govpn/cmd/govpn-client/tcp.go +++ b/src/cypherpunks.ru/govpn/cmd/govpn-client/tcp.go @@ -88,23 +88,7 @@ HandshakeCycle: } hs.Zero() terminator = make(chan struct{}) - go func() { - heartbeat := time.NewTicker(peer.Timeout) - var data []byte - Processor: - for { - select { - case <-heartbeat.C: - peer.EthProcess(nil) - case <-terminator: - break Processor - case data = <-tap.Sink: - peer.EthProcess(data) - } - } - heartbeat.Stop() - peer.Zero() - }() + go govpn.PeerTapProcessor(peer, tap, terminator) break HandshakeCycle } if hs != nil { diff --git a/src/cypherpunks.ru/govpn/cmd/govpn-client/udp.go b/src/cypherpunks.ru/govpn/cmd/govpn-client/udp.go index abbbf07..96dbfba 100644 --- a/src/cypherpunks.ru/govpn/cmd/govpn-client/udp.go +++ b/src/cypherpunks.ru/govpn/cmd/govpn-client/udp.go @@ -94,23 +94,7 @@ MainCycle: } hs.Zero() terminator = make(chan struct{}) - go func() { - heartbeat := time.NewTicker(peer.Timeout) - var data []byte - Processor: - for { - select { - case <-heartbeat.C: - peer.EthProcess(nil) - case <-terminator: - break Processor - case data = <-tap.Sink: - peer.EthProcess(data) - } - } - heartbeat.Stop() - peer.Zero() - }() + go govpn.PeerTapProcessor(peer, tap, terminator) } if terminator != nil { terminator <- struct{}{} diff --git a/src/cypherpunks.ru/govpn/cmd/govpn-server/common.go b/src/cypherpunks.ru/govpn/cmd/govpn-server/common.go index 691e51c..7c2dd41 100644 --- a/src/cypherpunks.ru/govpn/cmd/govpn-server/common.go +++ b/src/cypherpunks.ru/govpn/cmd/govpn-server/common.go @@ -21,7 +21,6 @@ package main import ( "bytes" "sync" - "time" "cypherpunks.ru/govpn" ) @@ -46,25 +45,6 @@ var ( kpLock sync.RWMutex ) -func peerReady(ps PeerState) { - var data []byte - heartbeat := time.NewTicker(ps.peer.Timeout) -Processor: - for { - select { - case <-heartbeat.C: - ps.peer.EthProcess(nil) - case <-ps.terminator: - break Processor - case data = <-ps.tap.Sink: - ps.peer.EthProcess(data) - } - } - close(ps.terminator) - ps.peer.Zero() - heartbeat.Stop() -} - func callUp(peerId *govpn.PeerId, remoteAddr string) (string, error) { ifaceName := confs[*peerId].Iface if confs[*peerId].Up != "" { diff --git a/src/cypherpunks.ru/govpn/cmd/govpn-server/tcp.go b/src/cypherpunks.ru/govpn/cmd/govpn-server/tcp.go index 66b9345..7a8f738 100644 --- a/src/cypherpunks.ru/govpn/cmd/govpn-server/tcp.go +++ b/src/cypherpunks.ru/govpn/cmd/govpn-server/tcp.go @@ -108,7 +108,7 @@ func handleTCP(conn net.Conn) { tap: tap, terminator: make(chan struct{}), } - go peerReady(*ps) + go govpn.PeerTapProcessor(ps.peer, ps.tap, ps.terminator) peersByIdLock.Lock() kpLock.Lock() delete(peers, addrPrev) @@ -143,7 +143,7 @@ func handleTCP(conn net.Conn) { tap: tap, terminator: make(chan struct{}, 1), } - go peerReady(*ps) + go govpn.PeerTapProcessor(ps.peer, ps.tap, ps.terminator) peersLock.Lock() peersByIdLock.Lock() kpLock.Lock() diff --git a/src/cypherpunks.ru/govpn/cmd/govpn-server/udp.go b/src/cypherpunks.ru/govpn/cmd/govpn-server/udp.go index 374b4dd..98993d3 100644 --- a/src/cypherpunks.ru/govpn/cmd/govpn-server/udp.go +++ b/src/cypherpunks.ru/govpn/cmd/govpn-server/udp.go @@ -121,7 +121,7 @@ func startUDP() { terminator: make(chan struct{}), } go func(ps PeerState) { - peerReady(ps) + govpn.PeerTapProcessor(ps.peer, ps.tap, ps.terminator) <-udpBufs <-udpBufs }(*ps) @@ -159,7 +159,7 @@ func startUDP() { terminator: make(chan struct{}), } go func(ps PeerState) { - peerReady(ps) + govpn.PeerTapProcessor(ps.peer, ps.tap, ps.terminator) <-udpBufs <-udpBufs }(*ps) diff --git a/src/cypherpunks.ru/govpn/peer.go b/src/cypherpunks.ru/govpn/peer.go index 5160bd4..9312b64 100644 --- a/src/cypherpunks.ru/govpn/peer.go +++ b/src/cypherpunks.ru/govpn/peer.go @@ -102,11 +102,9 @@ type Peer struct { nonceBucketN int32 // Timers - Timeout time.Duration `json:"-"` - Established time.Time - LastPing time.Time - LastSent time.Time - willSentCycle time.Time + Timeout time.Duration `json:"-"` + Established time.Time + LastPing time.Time // Receiver BusyR sync.Mutex `json:"-"` @@ -121,7 +119,6 @@ type Peer struct { tagT *[TagSize]byte keyAuthT *[SSize]byte frameT []byte - now time.Time } func (p *Peer) String() string { @@ -224,17 +221,11 @@ func (p *Peer) EthProcess(data []byte) { log.Println("Padded data packet size", len(data)+1, "is bigger than MTU", p.MTU, p) return } - p.now = time.Now() p.BusyT.Lock() // Zero size is a heartbeat packet SliceZero(p.bufT) if len(data) == 0 { - // If this heartbeat is necessary - if !p.LastSent.Add(p.Timeout).Before(p.now) { - p.BusyT.Unlock() - return - } p.bufT[S20BS+0] = PadByte p.HeartbeatSent++ } else { @@ -283,16 +274,6 @@ func (p *Peer) EthProcess(data []byte) { out = append(p.tagT[:], p.frameT...) } p.FramesOut++ - - if p.CPRCycle != time.Duration(0) { - p.willSentCycle = p.LastSent.Add(p.CPRCycle) - if p.willSentCycle.After(p.now) { - time.Sleep(p.willSentCycle.Sub(p.now)) - p.now = p.willSentCycle - } - } - - p.LastSent = p.now p.Conn.Write(out) p.BusyT.Unlock() } @@ -402,3 +383,47 @@ func (p *Peer) PktProcess(data []byte, tap io.Writer, reorderable bool) bool { p.BusyR.Unlock() return true } + +func PeerTapProcessor(peer *Peer, tap *TAP, terminator chan struct{}) { + var data []byte + var now time.Time + lastSent := time.Now() + heartbeat := time.NewTicker(peer.Timeout) + if peer.CPRCycle == time.Duration(0) { + RawProcessor: + for { + select { + case <-terminator: + break RawProcessor + case <-heartbeat.C: + now = time.Now() + if lastSent.Add(peer.Timeout).Before(now) { + peer.EthProcess(nil) + lastSent = now + } + case data = <-tap.Sink: + peer.EthProcess(data) + lastSent = time.Now() + } + } + } else { + CPRProcessor: + for { + data = nil + select { + case <-terminator: + break CPRProcessor + case data = <-tap.Sink: + peer.EthProcess(data) + default: + } + if data == nil { + peer.EthProcess(nil) + } + time.Sleep(peer.CPRCycle) + } + } + close(terminator) + peer.Zero() + heartbeat.Stop() +}