From fe84fe8198d77c4dca3bd16fd95b88e199904ad8 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 21 Dec 2019 20:24:29 +0300 Subject: [PATCH] PINGs --- doc/cmds.texi | 6 +- doc/news.ru.texi | 5 ++ doc/news.texi | 4 + doc/sp.texi | 22 +++++- doc/sp.txt | 4 +- src/cmd/nncp-daemon/main.go | 5 ++ src/sp.go | 142 ++++++++++++++++++++++-------------- 7 files changed, 126 insertions(+), 62 deletions(-) diff --git a/doc/cmds.texi b/doc/cmds.texi index 2606b6e..c4c6d01 100644 --- a/doc/cmds.texi +++ b/doc/cmds.texi @@ -250,10 +250,12 @@ can handle. @option{-bind} option specifies @option{addr:port} it must bind to and listen. It could be run as @command{inetd} service, by specifying -@option{-inetd} option. Example inetd-entry: +@option{-inetd} option. Pay attention that because it uses stdin/stdout, +it can not effectively work with IO timeouts and connection closing can +propagate up to 5 minutes in practice. Example inetd-entry: @verbatim -uucp stream tcp6 nowait nncpuser /usr/local/bin/nncp-daemon nncp-daemon -inetd +uucp stream tcp6 nowait nncpuser /usr/local/bin/nncp-daemon nncp-daemon -quiet -inetd @end verbatim @node nncp-exec diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 845d5ea..3ac7b2f 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -22,6 +22,11 @@ SP протокол порождает меньше вызовов записе Проверять @option{onlinedeadline} и @option{maxonlinetime} ежесекундно, независимо от чтения из сокета (раз в 10 секунд в худшем случае). +@item +Раз в минуту, если нет более никакого другого трафика, посылаются PING +пакеты в SP-соединении. Это позволит быстрее понимать что соединение +более не работоспособно. + @end itemize @node Релиз 5.2.1 diff --git a/doc/news.texi b/doc/news.texi index 0eab225..fe27bb1 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -24,6 +24,10 @@ packets. Check @option{onlinedeadline} and @option{maxonlinetime} options every second, independently from socket reads (up to 10 seconds). +@item +Once per minute, if no other traffic exists, PING packets are sent in +SP-connection. That allows faster determining of connection unworkability. + @end itemize @node Release 5.2.1 diff --git a/doc/sp.texi b/doc/sp.texi index d6be2d9..ef00f63 100644 --- a/doc/sp.texi +++ b/doc/sp.texi @@ -61,6 +61,15 @@ just an unsigned integer telling what body structure follows. +------+ @end verbatim +@item PING + Dummy packet only used for determining workability of the connection. + +@verbatim ++------+ +| PING | ++------+ +@end verbatim + @item INFO Information about the file we have for transmission. @@ -195,13 +204,20 @@ then run background integrity checker on it. If check succeeds, then delete @file{.part} suffix from file's name and send @emph{DONE} packet. @item When @emph{DONE} packet received, delete corresponding file. + @item When @emph{HALT} packet received, empty file sending queue. @item Each second, node checks: are there any new @emph{tx} packets appeared and queues corresponding @emph{INFO} packets. -@item If no packets are sent and received during @ref{CfgOnlineDeadline, -onlinedeadline} duration, then close the connection. There is no -explicit indication that session is over. +@item Each minute, if no packets were sent, node sends @emph{PING} +packet. + +@item If no non-PING packets are sent and received during +@ref{CfgOnlineDeadline, onlinedeadline} duration, then close the +connection. There is no explicit indication that session is over. + +@item If no packets are received during two minutes (two PING timeouts), +then close the connection. @end enumerate diff --git a/doc/sp.txt b/doc/sp.txt index 461c95a..b86ee68 100644 --- a/doc/sp.txt +++ b/doc/sp.txt @@ -13,7 +13,7 @@ Initiator -> Responder : [e, es, s, ss], INFO..., HALT... Initiator <- Responder : [e, ee, se], INFO..., HALT... Initiator -> Responder : INFO..., FREQ..., DONE... Initiator <- Responder : INFO..., FREQ..., DONE... -Initiator -> Responder : FILE..., INFO..., DONE... -Initiator <- Responder : FILE..., INFO..., DONE... +Initiator -> Responder : FILE..., INFO..., DONE..., PING +Initiator <- Responder : FILE..., INFO..., DONE..., PING @enduml diff --git a/src/cmd/nncp-daemon/main.go b/src/cmd/nncp-daemon/main.go index 42a0217..2d52fcf 100644 --- a/src/cmd/nncp-daemon/main.go +++ b/src/cmd/nncp-daemon/main.go @@ -59,6 +59,10 @@ func (c InetdConn) SetWriteDeadline(t time.Time) error { } func (c InetdConn) Close() error { + if err := c.r.Close(); err != nil { + c.w.Close() + return err + } return c.w.Close() } @@ -139,6 +143,7 @@ func main() { os.Stderr.Close() conn := &InetdConn{os.Stdin, os.Stdout} performSP(ctx, conn, nice) + conn.Close() return } diff --git a/src/sp.go b/src/sp.go index 6388bef..1d4e1c6 100644 --- a/src/sp.go +++ b/src/sp.go @@ -46,6 +46,7 @@ var ( SPFreqOverhead int SPFileOverhead int SPHaltMarshalized []byte + SPPingMarshalized []byte NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite( noise.DH25519, @@ -54,6 +55,7 @@ var ( ) DefaultDeadline = 10 * time.Second + PingTimeout = time.Minute spWorkersGroup sync.WaitGroup ) @@ -66,6 +68,7 @@ const ( SPTypeFile SPType = iota SPTypeDone SPType = iota SPTypeHalt SPType = iota + SPTypePing SPType = iota ) type SPHead struct { @@ -119,6 +122,14 @@ func init() { copy(SPHaltMarshalized, buf.Bytes()) buf.Reset() + spHead = SPHead{Type: SPTypePing} + if _, err := xdr.Marshal(&buf, spHead); err != nil { + panic(err) + } + SPPingMarshalized = make([]byte, SPHeadOverhead) + copy(SPPingMarshalized, buf.Bytes()) + buf.Reset() + spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)} if _, err := xdr.Marshal(&buf, spInfo); err != nil { panic(err) @@ -177,14 +188,17 @@ type SPState struct { csOur *noise.CipherState csTheir *noise.CipherState payloads chan []byte + pings chan struct{} infosTheir map[[32]byte]*SPInfo infosOurSeen map[[32]byte]uint8 queueTheir []*FreqWithNice wg sync.WaitGroup RxBytes int64 RxLastSeen time.Time + RxLastNonPing time.Time TxBytes int64 TxLastSeen time.Time + TxLastNonPing time.Time started time.Time mustFinishAt time.Time Duration time.Duration @@ -206,11 +220,9 @@ func (state *SPState) SetDead() { state.Lock() defer state.Unlock() select { - case _, ok := <-state.isDead: - if !ok { - // Already closed channel, dead - return - } + case <-state.isDead: + // Already closed channel, dead + return default: } close(state.isDead) @@ -218,14 +230,16 @@ func (state *SPState) SetDead() { for _ = range state.payloads { } }() + go func() { + for _ = range state.pings { + } + }() } func (state *SPState) NotAlive() bool { select { - case _, ok := <-state.isDead: - if !ok { - return true - } + case <-state.isDead: + return true default: } return false @@ -236,7 +250,7 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } -func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { +func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { state.writeSPBuf.Reset() n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ Magic: MagicNNCPLv1, @@ -248,6 +262,9 @@ func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil { state.TxLastSeen = time.Now() state.TxBytes += int64(n) + if !ping { + state.TxLastNonPing = state.TxLastSeen + } } return err } @@ -347,6 +364,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } state.hs = hs state.payloads = make(chan []byte) + state.pings = make(chan struct{}) state.infosTheir = make(map[[32]byte]*SPInfo) state.infosOurSeen = make(map[[32]byte]uint8) state.started = started @@ -376,7 +394,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { sds := SDS{"node": nodeId, "nice": int(state.Nice)} state.Ctx.LogD("sp-start", sds, "sending first message") conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) - if err = state.WriteSP(conn, buf); err != nil { + if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err @@ -421,6 +439,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { xxOnly := TRxTx("") state.hs = hs state.payloads = make(chan []byte) + state.pings = make(chan struct{}) state.infosOurSeen = make(map[[32]byte]uint8) state.infosTheir = make(map[[32]byte]*SPInfo) state.started = started @@ -497,7 +516,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { return err } conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) - if err = state.WriteSP(conn, buf); err != nil { + if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err @@ -561,25 +580,34 @@ func (state *SPState) StartWorkers( state.wg.Done() }() - // Deadline checker + // Periodic jobs state.wg.Add(1) go func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - defer state.wg.Done() + deadlineTicker := time.NewTicker(time.Second) + pingTicker := time.NewTicker(PingTimeout) for { select { - case _, ok := <-state.isDead: - if !ok { - return - } - case now := <-ticker.C: - if (now.Sub(state.RxLastSeen) >= state.onlineDeadline && - now.Sub(state.TxLastSeen) >= state.onlineDeadline) || - (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) { + case <-state.isDead: + state.wg.Done() + deadlineTicker.Stop() + pingTicker.Stop() + return + case now := <-deadlineTicker.C: + if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline && + now.Sub(state.TxLastNonPing) >= state.onlineDeadline) || + (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) || + (now.Sub(state.RxLastSeen) >= 2*PingTimeout) { state.SetDead() conn.Close() - return + } + case now := <-pingTicker.C: + if now.After(state.TxLastSeen.Add(PingTimeout)) { + state.wg.Add(1) + go func() { + state.pings <- struct{}{} + state.wg.Done() + state.Ctx.LogD("HERE", SDS{}, "PING GOROUTINE QUIT") + }() } } } @@ -592,12 +620,10 @@ func (state *SPState) StartWorkers( ticker := time.NewTicker(time.Second) for { select { - case _, ok := <-state.isDead: - if !ok { - state.wg.Done() - ticker.Stop() - return - } + case <-state.isDead: + state.wg.Done() + ticker.Stop() + return case <-ticker.C: for _, payload := range state.Ctx.infosOur( state.Node.Id, @@ -619,12 +645,20 @@ func (state *SPState) StartWorkers( // Sender state.wg.Add(1) go func() { + defer conn.Close() + defer state.SetDead() + defer state.wg.Done() for { if state.NotAlive() { - break + return } var payload []byte + var ping bool select { + case <-state.pings: + state.Ctx.LogD("sp-xmit", sds, "got ping") + payload = SPPingMarshalized + ping = true case payload = <-state.payloads: state.Ctx.LogD( "sp-xmit", @@ -632,8 +666,6 @@ func (state *SPState) StartWorkers( "got payload", ) default: - } - if payload == nil { state.RLock() if len(state.queueTheir) == 0 { state.RUnlock() @@ -642,11 +674,9 @@ func (state *SPState) StartWorkers( } freq := state.queueTheir[0].freq state.RUnlock() - if state.txRate > 0 { time.Sleep(time.Second / time.Duration(state.txRate)) } - sdsp := SdsAdd(sds, SDS{ "xx": string(TTx), "pkt": ToBase32(freq.Hash[:]), @@ -661,12 +691,12 @@ func (state *SPState) StartWorkers( )) if err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } fi, err := fd.Stat() if err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } fullSize := fi.Size() var buf []byte @@ -674,13 +704,13 @@ func (state *SPState) StartWorkers( state.Ctx.LogD("sp-file", sdsp, "seeking") if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } buf = buf[:n] state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read") @@ -714,19 +744,13 @@ func (state *SPState) StartWorkers( } state.Unlock() } - state.Ctx.LogD( - "sp-xmit", - SdsAdd(sds, SDS{"size": len(payload)}), - "sending", - ) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { + state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending") + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) + if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { state.Ctx.LogE("sp-xmit", sds, err, "") - break + return } } - state.SetDead() - state.wg.Done() }() // Receiver @@ -792,6 +816,8 @@ func (state *SPState) StartWorkers( } state.SetDead() state.wg.Done() + state.SetDead() + conn.Close() }() return nil @@ -800,6 +826,7 @@ func (state *SPState) StartWorkers( func (state *SPState) Wait() { state.wg.Wait() close(state.payloads) + close(state.pings) state.dirUnlock() state.Duration = time.Now().Sub(state.started) state.RxSpeed = state.RxBytes @@ -828,6 +855,13 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return nil, err } switch head.Type { + case SPTypeHalt: + state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") + state.Lock() + state.queueTheir = nil + state.Unlock() + case SPTypePing: + state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "") case SPTypeInfo: infosGot = true sdsp := SdsAdd(sds, SDS{"type": "info"}) @@ -1046,11 +1080,6 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } else { state.Ctx.LogD("sp-process", sdsp, "unknown") } - case SPTypeHalt: - state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") - state.Lock() - state.queueTheir = nil - state.Unlock() default: state.Ctx.LogE( "sp-process", @@ -1060,6 +1089,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { ) return nil, BadPktType } + if head.Type != SPTypePing { + state.RxLastNonPing = state.RxLastSeen + } } if infosGot { var pkts int -- 2.44.0