}
hs.Zero()
terminator = make(chan struct{})
- go func() {
- heartbeat := time.NewTicker(peer.Timeout)
- var data []byte
- Processor:
- for {
- select {
- case <-heartbeat.C:
- peer.EthProcess(nil)
- case <-terminator:
- break Processor
- case data = <-tap.Sink:
- peer.EthProcess(data)
- }
- }
- heartbeat.Stop()
- peer.Zero()
- }()
+ go govpn.PeerTapProcessor(peer, tap, terminator)
break HandshakeCycle
}
if hs != nil {
}
hs.Zero()
terminator = make(chan struct{})
- go func() {
- heartbeat := time.NewTicker(peer.Timeout)
- var data []byte
- Processor:
- for {
- select {
- case <-heartbeat.C:
- peer.EthProcess(nil)
- case <-terminator:
- break Processor
- case data = <-tap.Sink:
- peer.EthProcess(data)
- }
- }
- heartbeat.Stop()
- peer.Zero()
- }()
+ go govpn.PeerTapProcessor(peer, tap, terminator)
}
if terminator != nil {
terminator <- struct{}{}
import (
"bytes"
"sync"
- "time"
"cypherpunks.ru/govpn"
)
kpLock sync.RWMutex
)
-func peerReady(ps PeerState) {
- var data []byte
- heartbeat := time.NewTicker(ps.peer.Timeout)
-Processor:
- for {
- select {
- case <-heartbeat.C:
- ps.peer.EthProcess(nil)
- case <-ps.terminator:
- break Processor
- case data = <-ps.tap.Sink:
- ps.peer.EthProcess(data)
- }
- }
- close(ps.terminator)
- ps.peer.Zero()
- heartbeat.Stop()
-}
-
func callUp(peerId *govpn.PeerId, remoteAddr string) (string, error) {
ifaceName := confs[*peerId].Iface
if confs[*peerId].Up != "" {
tap: tap,
terminator: make(chan struct{}),
}
- go peerReady(*ps)
+ go govpn.PeerTapProcessor(ps.peer, ps.tap, ps.terminator)
peersByIdLock.Lock()
kpLock.Lock()
delete(peers, addrPrev)
tap: tap,
terminator: make(chan struct{}, 1),
}
- go peerReady(*ps)
+ go govpn.PeerTapProcessor(ps.peer, ps.tap, ps.terminator)
peersLock.Lock()
peersByIdLock.Lock()
kpLock.Lock()
terminator: make(chan struct{}),
}
go func(ps PeerState) {
- peerReady(ps)
+ govpn.PeerTapProcessor(ps.peer, ps.tap, ps.terminator)
<-udpBufs
<-udpBufs
}(*ps)
terminator: make(chan struct{}),
}
go func(ps PeerState) {
- peerReady(ps)
+ govpn.PeerTapProcessor(ps.peer, ps.tap, ps.terminator)
<-udpBufs
<-udpBufs
}(*ps)
nonceBucketN int32
// Timers
- Timeout time.Duration `json:"-"`
- Established time.Time
- LastPing time.Time
- LastSent time.Time
- willSentCycle time.Time
+ Timeout time.Duration `json:"-"`
+ Established time.Time
+ LastPing time.Time
// Receiver
BusyR sync.Mutex `json:"-"`
tagT *[TagSize]byte
keyAuthT *[SSize]byte
frameT []byte
- now time.Time
}
func (p *Peer) String() string {
log.Println("Padded data packet size", len(data)+1, "is bigger than MTU", p.MTU, p)
return
}
- p.now = time.Now()
p.BusyT.Lock()
// Zero size is a heartbeat packet
SliceZero(p.bufT)
if len(data) == 0 {
- // If this heartbeat is necessary
- if !p.LastSent.Add(p.Timeout).Before(p.now) {
- p.BusyT.Unlock()
- return
- }
p.bufT[S20BS+0] = PadByte
p.HeartbeatSent++
} else {
out = append(p.tagT[:], p.frameT...)
}
p.FramesOut++
-
- if p.CPRCycle != time.Duration(0) {
- p.willSentCycle = p.LastSent.Add(p.CPRCycle)
- if p.willSentCycle.After(p.now) {
- time.Sleep(p.willSentCycle.Sub(p.now))
- p.now = p.willSentCycle
- }
- }
-
- p.LastSent = p.now
p.Conn.Write(out)
p.BusyT.Unlock()
}
p.BusyR.Unlock()
return true
}
+
+func PeerTapProcessor(peer *Peer, tap *TAP, terminator chan struct{}) {
+ var data []byte
+ var now time.Time
+ lastSent := time.Now()
+ heartbeat := time.NewTicker(peer.Timeout)
+ if peer.CPRCycle == time.Duration(0) {
+ RawProcessor:
+ for {
+ select {
+ case <-terminator:
+ break RawProcessor
+ case <-heartbeat.C:
+ now = time.Now()
+ if lastSent.Add(peer.Timeout).Before(now) {
+ peer.EthProcess(nil)
+ lastSent = now
+ }
+ case data = <-tap.Sink:
+ peer.EthProcess(data)
+ lastSent = time.Now()
+ }
+ }
+ } else {
+ CPRProcessor:
+ for {
+ data = nil
+ select {
+ case <-terminator:
+ break CPRProcessor
+ case data = <-tap.Sink:
+ peer.EthProcess(data)
+ default:
+ }
+ if data == nil {
+ peer.EthProcess(nil)
+ }
+ time.Sleep(peer.CPRCycle)
+ }
+ }
+ close(terminator)
+ peer.Zero()
+ heartbeat.Stop()
+}