nonceBucketN int32
// Timers
- Timeout time.Duration `json:"-"`
- Established time.Time
- LastPing time.Time
- LastSent time.Time
- willSentCycle time.Time
+ Timeout time.Duration `json:"-"`
+ Established time.Time
+ LastPing time.Time
// Receiver
BusyR sync.Mutex `json:"-"`
tagT *[TagSize]byte
keyAuthT *[SSize]byte
frameT []byte
- now time.Time
}
func (p *Peer) String() string {
log.Println("Padded data packet size", len(data)+1, "is bigger than MTU", p.MTU, p)
return
}
- p.now = time.Now()
p.BusyT.Lock()
// Zero size is a heartbeat packet
SliceZero(p.bufT)
if len(data) == 0 {
- // If this heartbeat is necessary
- if !p.LastSent.Add(p.Timeout).Before(p.now) {
- p.BusyT.Unlock()
- return
- }
p.bufT[S20BS+0] = PadByte
p.HeartbeatSent++
} else {
out = append(p.tagT[:], p.frameT...)
}
p.FramesOut++
-
- if p.CPRCycle != time.Duration(0) {
- p.willSentCycle = p.LastSent.Add(p.CPRCycle)
- if p.willSentCycle.After(p.now) {
- time.Sleep(p.willSentCycle.Sub(p.now))
- p.now = p.willSentCycle
- }
- }
-
- p.LastSent = p.now
p.Conn.Write(out)
p.BusyT.Unlock()
}
p.BusyR.Unlock()
return true
}
+
+func PeerTapProcessor(peer *Peer, tap *TAP, terminator chan struct{}) {
+ var data []byte
+ var now time.Time
+ lastSent := time.Now()
+ heartbeat := time.NewTicker(peer.Timeout)
+ if peer.CPRCycle == time.Duration(0) {
+ RawProcessor:
+ for {
+ select {
+ case <-terminator:
+ break RawProcessor
+ case <-heartbeat.C:
+ now = time.Now()
+ if lastSent.Add(peer.Timeout).Before(now) {
+ peer.EthProcess(nil)
+ lastSent = now
+ }
+ case data = <-tap.Sink:
+ peer.EthProcess(data)
+ lastSent = time.Now()
+ }
+ }
+ } else {
+ CPRProcessor:
+ for {
+ data = nil
+ select {
+ case <-terminator:
+ break CPRProcessor
+ case data = <-tap.Sink:
+ peer.EthProcess(data)
+ default:
+ }
+ if data == nil {
+ peer.EthProcess(nil)
+ }
+ time.Sleep(peer.CPRCycle)
+ }
+ }
+ close(terminator)
+ peer.Zero()
+ heartbeat.Stop()
+}