X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=37cf265ead11ac3588fcb66eab69f7641599cc9c;hb=f3d5c49c31c59f699288799353b7bc1ba9211f38;hp=49c4179239a94e037034d1e640fb1f27405c4761;hpb=7d6272831df064204477f797cd8028e60010bb2b;p=nncp.git diff --git a/src/sp.go b/src/sp.go index 49c4179..37cf265 100644 --- a/src/sp.go +++ b/src/sp.go @@ -22,7 +22,6 @@ import ( "crypto/subtle" "errors" "io" - "net" "os" "path/filepath" "sort" @@ -34,11 +33,9 @@ import ( ) const ( - MaxSPSize = 1<<16 - 256 - PartSuffix = ".part" - DefaultDeadline = 10 - - SPHeadOverhead = 4 + MaxSPSize = 1<<16 - 256 + PartSuffix = ".part" + SPHeadOverhead = 4 ) var ( @@ -48,6 +45,7 @@ var ( SPFreqOverhead int SPFileOverhead int SPHaltMarshalized []byte + SPPingMarshalized []byte NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite( noise.DH25519, @@ -55,6 +53,9 @@ var ( noise.HashBLAKE2b, ) + DefaultDeadline = 10 * time.Second + PingTimeout = time.Minute + spWorkersGroup sync.WaitGroup ) @@ -66,6 +67,7 @@ const ( SPTypeFile SPType = iota SPTypeDone SPType = iota SPTypeHalt SPType = iota + SPTypePing SPType = iota ) type SPHead struct { @@ -119,6 +121,14 @@ func init() { copy(SPHaltMarshalized, buf.Bytes()) buf.Reset() + spHead = SPHead{Type: SPTypePing} + if _, err := xdr.Marshal(&buf, spHead); err != nil { + panic(err) + } + SPPingMarshalized = make([]byte, SPHeadOverhead) + copy(SPPingMarshalized, buf.Bytes()) + buf.Reset() + spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)} if _, err := xdr.Marshal(&buf, spInfo); err != nil { panic(err) @@ -142,11 +152,10 @@ func init() { func MarshalSP(typ SPType, sp interface{}) []byte { var buf bytes.Buffer - var err error - if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil { + if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil { panic(err) } - if _, err = xdr.Marshal(&buf, sp); err != nil { + if _, err := xdr.Marshal(&buf, sp); err != nil { panic(err) } return buf.Bytes() @@ -172,21 +181,25 @@ type SPState struct { Ctx *Ctx Node *Node Nice uint8 - onlineDeadline uint - maxOnlineTime uint + onlineDeadline time.Duration + maxOnlineTime time.Duration hs *noise.HandshakeState csOur *noise.CipherState csTheir *noise.CipherState payloads chan []byte + pings chan struct{} infosTheir map[[32]byte]*SPInfo infosOurSeen map[[32]byte]uint8 queueTheir []*FreqWithNice wg sync.WaitGroup RxBytes int64 RxLastSeen time.Time + RxLastNonPing time.Time TxBytes int64 TxLastSeen time.Time + TxLastNonPing time.Time started time.Time + mustFinishAt time.Time Duration time.Duration RxSpeed int64 TxSpeed int64 @@ -195,22 +208,40 @@ type SPState struct { xxOnly TRxTx rxRate int txRate int - isDead bool + isDead chan struct{} listOnly bool onlyPkts map[[32]byte]bool + writeSPBuf bytes.Buffer sync.RWMutex } -func (state *SPState) NotAlive() bool { - if state.isDead { - return true +func (state *SPState) SetDead() { + state.Lock() + defer state.Unlock() + select { + case <-state.isDead: + // Already closed channel, dead + return + default: } - now := time.Now() - if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) { + close(state.isDead) + go func() { + for _ = range state.payloads { + } + }() + go func() { + for _ = range state.pings { + } + }() +} + +func (state *SPState) NotAlive() bool { + select { + case <-state.isDead: return true + default: } - return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && - uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline + return false } func (state *SPState) dirUnlock() { @@ -218,11 +249,21 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } -func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { - n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload}) - if err == nil { +func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { + state.writeSPBuf.Reset() + n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ + Magic: MagicNNCPLv1, + Payload: payload, + }) + if err != nil { + return err + } + if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil { state.TxLastSeen = time.Now() state.TxBytes += int64(n) + if !ping { + state.TxLastNonPing = state.TxLastSeen + } } return err } @@ -293,14 +334,14 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } var rxLock *os.File if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) { - rxLock, err = state.Ctx.LockDir(nodeId, TRx) + rxLock, err = state.Ctx.LockDir(nodeId, string(TRx)) if err != nil { return err } } var txLock *os.File if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { - txLock, err = state.Ctx.LockDir(nodeId, TTx) + txLock, err = state.Ctx.LockDir(nodeId, string(TTx)) if err != nil { return err } @@ -322,6 +363,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } state.hs = hs state.payloads = make(chan []byte) + state.pings = make(chan struct{}) state.infosTheir = make(map[[32]byte]*SPInfo) state.infosOurSeen = make(map[[32]byte]uint8) state.started = started @@ -350,14 +392,14 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } sds := SDS{"node": nodeId, "nice": int(state.Nice)} state.Ctx.LogD("sp-start", sds, "sending first message") - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err = state.WriteSP(conn, buf); err != nil { + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) + if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } state.Ctx.LogD("sp-start", sds, "waiting for first message") - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) if buf, err = state.ReadSP(conn); err != nil { state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() @@ -396,6 +438,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { xxOnly := TRxTx("") state.hs = hs state.payloads = make(chan []byte) + state.pings = make(chan struct{}) state.infosOurSeen = make(map[[32]byte]uint8) state.infosTheir = make(map[[32]byte]*SPInfo) state.started = started @@ -403,7 +446,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { var buf []byte var payload []byte state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message") - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) if buf, err = state.ReadSP(conn); err != nil { state.Ctx.LogE("sp-start", SDS{}, err, "") return err @@ -437,7 +480,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } var rxLock *os.File if xxOnly == "" || xxOnly == TRx { - rxLock, err = state.Ctx.LockDir(node.Id, TRx) + rxLock, err = state.Ctx.LockDir(node.Id, string(TRx)) if err != nil { return err } @@ -445,7 +488,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.rxLock = rxLock var txLock *os.File if xxOnly == "" || xxOnly == TTx { - txLock, err = state.Ctx.LockDir(node.Id, TTx) + txLock, err = state.Ctx.LockDir(node.Id, string(TTx)) if err != nil { return err } @@ -471,8 +514,8 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.dirUnlock() return err } - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err = state.WriteSP(conn, buf); err != nil { + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) + if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err @@ -488,9 +531,17 @@ func (state *SPState) StartR(conn ConnDeadlined) error { func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, - payload []byte) error { + payload []byte, +) error { sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} + state.isDead = make(chan struct{}) + if state.maxOnlineTime > 0 { + state.mustFinishAt = state.started.Add(state.maxOnlineTime) + } + + // Remaining handshake payload sending if len(infosPayloads) > 1 { + state.wg.Add(1) go func() { for _, payload := range infosPayloads[1:] { state.Ctx.LogD( @@ -500,8 +551,11 @@ func (state *SPState) StartWorkers( ) state.payloads <- payload } + state.wg.Done() }() } + + // Processing of first payload and queueing its responses state.Ctx.LogD( "sp-work", SdsAdd(sds, SDS{"size": len(payload)}), @@ -512,7 +566,7 @@ func (state *SPState) StartWorkers( state.Ctx.LogE("sp-work", sds, err, "") return err } - + state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( @@ -522,42 +576,87 @@ func (state *SPState) StartWorkers( ) state.payloads <- reply } + state.wg.Done() + }() + + // Periodic jobs + state.wg.Add(1) + go func() { + deadlineTicker := time.NewTicker(time.Second) + pingTicker := time.NewTicker(PingTimeout) + for { + select { + case <-state.isDead: + state.wg.Done() + deadlineTicker.Stop() + pingTicker.Stop() + return + case now := <-deadlineTicker.C: + if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline && + now.Sub(state.TxLastNonPing) >= state.onlineDeadline) || + (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) || + (now.Sub(state.RxLastSeen) >= 2*PingTimeout) { + state.SetDead() + conn.Close() + } + case now := <-pingTicker.C: + if now.After(state.TxLastSeen.Add(PingTimeout)) { + state.wg.Add(1) + go func() { + state.pings <- struct{}{} + state.wg.Done() + }() + } + } + } }() + // Spool checker and INFOs sender of appearing files if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { + state.wg.Add(1) go func() { - for range time.Tick(time.Second) { - if state.NotAlive() { + ticker := time.NewTicker(time.Second) + for { + select { + case <-state.isDead: + state.wg.Done() + ticker.Stop() return - } - for _, payload := range state.Ctx.infosOur( - state.Node.Id, - state.Nice, - &state.infosOurSeen, - ) { - state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": len(payload)}), - "queuing new info", - ) - state.payloads <- payload + case <-ticker.C: + for _, payload := range state.Ctx.infosOur( + state.Node.Id, + state.Nice, + &state.infosOurSeen, + ) { + state.Ctx.LogD( + "sp-work", + SdsAdd(sds, SDS{"size": len(payload)}), + "queuing new info", + ) + state.payloads <- payload + } } } }() } + // Sender state.wg.Add(1) go func() { - defer func() { - state.isDead = true - state.wg.Done() - }() + defer conn.Close() + defer state.SetDead() + defer state.wg.Done() for { if state.NotAlive() { return } var payload []byte + var ping bool select { + case <-state.pings: + state.Ctx.LogD("sp-xmit", sds, "got ping") + payload = SPPingMarshalized + ping = true case payload = <-state.payloads: state.Ctx.LogD( "sp-xmit", @@ -565,22 +664,17 @@ func (state *SPState) StartWorkers( "got payload", ) default: - } - if payload == nil { state.RLock() if len(state.queueTheir) == 0 { - state.Ctx.LogD("sp-xmit", sds, "file queue is empty") state.RUnlock() time.Sleep(100 * time.Millisecond) continue } freq := state.queueTheir[0].freq state.RUnlock() - if state.txRate > 0 { time.Sleep(time.Second / time.Duration(state.txRate)) } - sdsp := SdsAdd(sds, SDS{ "xx": string(TTx), "pkt": ToBase32(freq.Hash[:]), @@ -595,12 +689,12 @@ func (state *SPState) StartWorkers( )) if err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } fi, err := fd.Stat() if err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } fullSize := fi.Size() var buf []byte @@ -608,20 +702,16 @@ func (state *SPState) StartWorkers( state.Ctx.LogD("sp-file", sdsp, "seeking") if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } buf = buf[:n] - state.Ctx.LogD( - "sp-file", - SdsAdd(sdsp, SDS{"size": n}), - "read", - ) + state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read") } fd.Close() payload = MarshalSP(SPTypeFile, SPFile{ @@ -652,39 +742,31 @@ func (state *SPState) StartWorkers( } state.Unlock() } - state.Ctx.LogD( - "sp-xmit", - SdsAdd(sds, SDS{"size": len(payload)}), - "sending", - ) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { + state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending") + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) + if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { state.Ctx.LogE("sp-xmit", sds, err, "") - break + return } } }() + // Receiver state.wg.Add(1) go func() { - defer func() { - state.isDead = true - state.wg.Done() - }() for { if state.NotAlive() { - return + break } state.Ctx.LogD("sp-recv", sds, "waiting for payload") - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) payload, err := state.ReadSP(conn) if err != nil { if err == io.EOF { break } unmarshalErr := err.(*xdr.UnmarshalError) - netErr, ok := unmarshalErr.Err.(net.Error) - if ok && netErr.Timeout() { + if os.IsTimeout(unmarshalErr.Err) { continue } if unmarshalErr.ErrorCode == xdr.ErrIO { @@ -713,6 +795,7 @@ func (state *SPState) StartWorkers( state.Ctx.LogE("sp-recv", sds, err, "") break } + state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( @@ -722,11 +805,16 @@ func (state *SPState) StartWorkers( ) state.payloads <- reply } + state.wg.Done() }() if state.rxRate > 0 { time.Sleep(time.Second / time.Duration(state.rxRate)) } } + state.SetDead() + state.wg.Done() + state.SetDead() + conn.Close() }() return nil @@ -734,6 +822,8 @@ func (state *SPState) StartWorkers( func (state *SPState) Wait() { state.wg.Wait() + close(state.payloads) + close(state.pings) state.dirUnlock() state.Duration = time.Now().Sub(state.started) state.RxSpeed = state.RxBytes @@ -761,7 +851,17 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogE("sp-process", sds, err, "") return nil, err } + if head.Type != SPTypePing { + state.RxLastNonPing = state.RxLastSeen + } switch head.Type { + case SPTypeHalt: + state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") + state.Lock() + state.queueTheir = nil + state.Unlock() + case SPTypePing: + state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "") case SPTypeInfo: infosGot = true sdsp := SdsAdd(sds, SDS{"type": "info"}) @@ -872,19 +972,23 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { fd.Close() return nil, err } - ourSize := file.Offset + uint64(len(file.Payload)) + ourSize := int64(file.Offset + uint64(len(file.Payload))) + sdsp["size"] = ourSize + fullsize := int64(0) state.RLock() - sdsp["size"] = int64(ourSize) - sdsp["fullsize"] = int64(state.infosTheir[*file.Hash].Size) + infoTheir, ok := state.infosTheir[*file.Hash] + state.RUnlock() + if ok { + fullsize = int64(infoTheir.Size) + } + sdsp["fullsize"] = fullsize if state.Ctx.ShowPrgrs { Progress("Rx", sdsp) } - if state.infosTheir[*file.Hash].Size != ourSize { - state.RUnlock() + if fullsize != ourSize { fd.Close() continue } - state.RUnlock() spWorkersGroup.Wait() spWorkersGroup.Add(1) go func() { @@ -916,8 +1020,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { delete(state.infosTheir, *file.Hash) state.Unlock() spWorkersGroup.Done() + state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) + state.wg.Done() }() }() case SPTypeDone: @@ -974,11 +1080,6 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } else { state.Ctx.LogD("sp-process", sdsp, "unknown") } - case SPTypeHalt: - state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") - state.Lock() - state.queueTheir = nil - state.Unlock() default: state.Ctx.LogE( "sp-process",