// S20BS is Salsa20's internal blocksize in bytes
S20BS = 64
// Maximal amount of bytes transfered with single key (4 GiB)
- MaxBytesPerKey int64 = 1 << 32
+ MaxBytesPerKey uint64 = 1 << 32
// Heartbeat rate, relative to Timeout
TimeoutHeartbeat = 4
// Minimal valid packet length
}
type Peer struct {
+ // Statistics (they are at the beginning for correct int64 alignment)
+ BytesIn uint64
+ BytesOut uint64
+ BytesPayloadIn uint64
+ BytesPayloadOut uint64
+ FramesIn uint64
+ FramesOut uint64
+ FramesUnauth uint64
+ FramesDup uint64
+ HeartbeatRecv uint64
+ HeartbeatSent uint64
+
+ // Basic
Addr string
Id *PeerId
- Conn io.Writer
+ Conn io.Writer `json:"-"`
// Traffic behaviour
NoiseEnable bool
nonceBucketN int32
// Timers
- Timeout time.Duration `json:"-"`
- Established time.Time
- LastPing time.Time
- LastSent time.Time
- willSentCycle time.Time
-
- // Statistics
- BytesIn int64
- BytesOut int64
- BytesPayloadIn int64
- BytesPayloadOut int64
- FramesIn int
- FramesOut int
- FramesUnauth int
- FramesDup int
- HeartbeatRecv int
- HeartbeatSent int
+ Timeout time.Duration `json:"-"`
+ Established time.Time
+ LastPing time.Time
// Receiver
BusyR sync.Mutex `json:"-"`
tagT *[TagSize]byte
keyAuthT *[SSize]byte
frameT []byte
- now time.Time
}
func (p *Peer) String() string {
log.Println("Padded data packet size", len(data)+1, "is bigger than MTU", p.MTU, p)
return
}
- p.now = time.Now()
p.BusyT.Lock()
// Zero size is a heartbeat packet
SliceZero(p.bufT)
if len(data) == 0 {
- // If this heartbeat is necessary
- if !p.LastSent.Add(p.Timeout).Before(p.now) {
- p.BusyT.Unlock()
- return
- }
p.bufT[S20BS+0] = PadByte
p.HeartbeatSent++
} else {
// accept the next one
copy(p.bufT[S20BS:], data)
p.bufT[S20BS+len(data)] = PadByte
- p.BytesPayloadOut += int64(len(data))
+ p.BytesPayloadOut += uint64(len(data))
}
if p.NoiseEnable && !p.Encless {
)
copy(p.keyAuthT[:], p.bufT[:SSize])
poly1305.Sum(p.tagT, p.frameT, p.keyAuthT)
- atomic.AddInt64(&p.BytesOut, int64(len(p.frameT)+TagSize))
+ atomic.AddUint64(&p.BytesOut, uint64(len(p.frameT)+TagSize))
out = append(p.tagT[:], p.frameT...)
}
p.FramesOut++
-
- if p.CPRCycle != time.Duration(0) {
- p.willSentCycle = p.LastSent.Add(p.CPRCycle)
- if p.willSentCycle.After(p.now) {
- time.Sleep(p.willSentCycle.Sub(p.now))
- p.now = p.willSentCycle
- }
- }
-
- p.LastSent = p.now
p.Conn.Write(out)
p.BusyT.Unlock()
}
}
p.FramesIn++
- atomic.AddInt64(&p.BytesIn, int64(len(data)))
+ atomic.AddUint64(&p.BytesIn, uint64(len(data)))
p.LastPing = time.Now()
p.pktSizeR = bytes.LastIndexByte(out, PadByte)
if p.pktSizeR == -1 {
p.BusyR.Unlock()
return true
}
- p.BytesPayloadIn += int64(p.pktSizeR)
+ p.BytesPayloadIn += uint64(p.pktSizeR)
tap.Write(out[:p.pktSizeR])
p.BusyR.Unlock()
return true
}
+
+func PeerTapProcessor(peer *Peer, tap *TAP, terminator chan struct{}) {
+ var data []byte
+ var now time.Time
+ lastSent := time.Now()
+ heartbeat := time.NewTicker(peer.Timeout)
+ if peer.CPRCycle == time.Duration(0) {
+ RawProcessor:
+ for {
+ select {
+ case <-terminator:
+ break RawProcessor
+ case <-heartbeat.C:
+ now = time.Now()
+ if lastSent.Add(peer.Timeout).Before(now) {
+ peer.EthProcess(nil)
+ lastSent = now
+ }
+ case data = <-tap.Sink:
+ peer.EthProcess(data)
+ lastSent = time.Now()
+ }
+ }
+ } else {
+ CPRProcessor:
+ for {
+ data = nil
+ select {
+ case <-terminator:
+ break CPRProcessor
+ case data = <-tap.Sink:
+ peer.EthProcess(data)
+ default:
+ }
+ if data == nil {
+ peer.EthProcess(nil)
+ }
+ time.Sleep(peer.CPRCycle)
+ }
+ }
+ close(terminator)
+ peer.Zero()
+ heartbeat.Stop()
+}