From 1b3ddd7e64bd2310bbe7475e7722e656e5137eaa Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Fri, 20 Dec 2019 17:22:50 +0300 Subject: [PATCH] Check aliveness every second --- doc/news.ru.texi | 4 ++++ doc/news.texi | 4 ++++ src/sp.go | 44 ++++++++++++++++++++++++++++++++++++-------- 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/doc/news.ru.texi b/doc/news.ru.texi index beb54fc..845d5ea 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -18,6 +18,10 @@ SP протокол порождает меньше вызовов записей (соответственно, и TCP пакетов) в сокет. +@item +Проверять @option{onlinedeadline} и @option{maxonlinetime} ежесекундно, +независимо от чтения из сокета (раз в 10 секунд в худшем случае). + @end itemize @node Релиз 5.2.1 diff --git a/doc/news.texi b/doc/news.texi index 3693d70..0eab225 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -20,6 +20,10 @@ Finish all SP protocol related goroutines, less memory leak. SP protocol generates less socket write calls, thus generating less TCP packets. +@item +Check @option{onlinedeadline} and @option{maxonlinetime} options every +second, independently from socket reads (up to 10 seconds). + @end itemize @node Release 5.2.1 diff --git a/src/sp.go b/src/sp.go index db361d7..a29fb9d 100644 --- a/src/sp.go +++ b/src/sp.go @@ -186,6 +186,7 @@ type SPState struct { TxBytes int64 TxLastSeen time.Time started time.Time + mustFinishAt time.Time Duration time.Duration RxSpeed int64 TxSpeed int64 @@ -227,12 +228,7 @@ func (state *SPState) NotAlive() bool { } default: } - now := time.Now() - if state.maxOnlineTime > 0 && state.started.Add(state.maxOnlineTime).Before(now) { - return true - } - return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && - uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline + return false } func (state *SPState) dirUnlock() { @@ -519,9 +515,13 @@ func (state *SPState) StartWorkers( infosPayloads [][]byte, payload []byte, ) error { - state.isDead = make(chan struct{}) sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} + state.isDead = make(chan struct{}) + if state.maxOnlineTime > 0 { + state.mustFinishAt = state.started.Add(state.maxOnlineTime) + } + // Remaining handshake payload sending if len(infosPayloads) > 1 { state.wg.Add(1) go func() { @@ -536,6 +536,8 @@ func (state *SPState) StartWorkers( state.wg.Done() }() } + + // Processing of first payload and queueing its responses state.Ctx.LogD( "sp-work", SdsAdd(sds, SDS{"size": len(payload)}), @@ -546,7 +548,6 @@ func (state *SPState) StartWorkers( state.Ctx.LogE("sp-work", sds, err, "") return err } - state.wg.Add(1) go func() { for _, reply := range replies { @@ -560,6 +561,31 @@ func (state *SPState) StartWorkers( state.wg.Done() }() + // Deadline checker + state.wg.Add(1) + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + defer state.wg.Done() + for { + select { + case _, ok := <-state.isDead: + if !ok { + return + } + case now := <-ticker.C: + if (uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && + uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline) || + (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) { + state.SetDead() + conn.Close() + return + } + } + } + }() + + // Spool checker and INFOs sender of appearing files if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { state.wg.Add(1) go func() { @@ -590,6 +616,7 @@ func (state *SPState) StartWorkers( }() } + // Sender state.wg.Add(1) go func() { for { @@ -702,6 +729,7 @@ func (state *SPState) StartWorkers( state.wg.Done() }() + // Receiver state.wg.Add(1) go func() { for { -- 2.44.0