X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=94b37ff624893f8560a7d4b0f1f0f3ed827c4eff;hb=4e08a1c97600e0372680e86a651f916c70e89342;hp=a29fb9d6ed7254a6a99c3e39e019d3b609b1c624;hpb=1b3ddd7e64bd2310bbe7475e7722e656e5137eaa;p=nncp.git diff --git a/src/sp.go b/src/sp.go index a29fb9d..94b37ff 100644 --- a/src/sp.go +++ b/src/sp.go @@ -22,7 +22,6 @@ import ( "crypto/subtle" "errors" "io" - "net" "os" "path/filepath" "sort" @@ -34,10 +33,8 @@ import ( ) const ( - MaxSPSize = 1<<16 - 256 - PartSuffix = ".part" - DefaultDeadline = 10 - + MaxSPSize = 1<<16 - 256 + PartSuffix = ".part" SPHeadOverhead = 4 ) @@ -48,6 +45,7 @@ var ( SPFreqOverhead int SPFileOverhead int SPHaltMarshalized []byte + SPPingMarshalized []byte NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite( noise.DH25519, @@ -55,6 +53,9 @@ var ( noise.HashBLAKE2b, ) + DefaultDeadline = 10 * time.Second + PingTimeout = time.Minute + spWorkersGroup sync.WaitGroup ) @@ -66,6 +67,7 @@ const ( SPTypeFile SPType = iota SPTypeDone SPType = iota SPTypeHalt SPType = iota + SPTypePing SPType = iota ) type SPHead struct { @@ -119,6 +121,14 @@ func init() { copy(SPHaltMarshalized, buf.Bytes()) buf.Reset() + spHead = SPHead{Type: SPTypePing} + if _, err := xdr.Marshal(&buf, spHead); err != nil { + panic(err) + } + SPPingMarshalized = make([]byte, SPHeadOverhead) + copy(SPPingMarshalized, buf.Bytes()) + buf.Reset() + spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)} if _, err := xdr.Marshal(&buf, spInfo); err != nil { panic(err) @@ -171,20 +181,23 @@ type SPState struct { Ctx *Ctx Node *Node Nice uint8 - onlineDeadline uint + onlineDeadline time.Duration maxOnlineTime time.Duration hs *noise.HandshakeState csOur *noise.CipherState csTheir *noise.CipherState payloads chan []byte + pings chan struct{} infosTheir map[[32]byte]*SPInfo infosOurSeen map[[32]byte]uint8 queueTheir []*FreqWithNice wg sync.WaitGroup RxBytes int64 RxLastSeen time.Time + RxLastNonPing time.Time TxBytes int64 TxLastSeen time.Time + TxLastNonPing time.Time started time.Time mustFinishAt time.Time Duration time.Duration @@ -206,11 +219,9 @@ func (state *SPState) SetDead() { state.Lock() defer state.Unlock() select { - case _, ok := <-state.isDead: - if !ok { - // Already closed channel, dead - return - } + case <-state.isDead: + // Already closed channel, dead + return default: } close(state.isDead) @@ -218,14 +229,16 @@ func (state *SPState) SetDead() { for _ = range state.payloads { } }() + go func() { + for _ = range state.pings { + } + }() } func (state *SPState) NotAlive() bool { select { - case _, ok := <-state.isDead: - if !ok { - return true - } + case <-state.isDead: + return true default: } return false @@ -236,7 +249,7 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } -func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { +func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { state.writeSPBuf.Reset() n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ Magic: MagicNNCPLv1, @@ -248,6 +261,9 @@ func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil { state.TxLastSeen = time.Now() state.TxBytes += int64(n) + if !ping { + state.TxLastNonPing = state.TxLastSeen + } } return err } @@ -274,7 +290,7 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { - job.Fd.Close() + job.Fd.Close() // #nosec G104 if job.PktEnc.Nice > nice { continue } @@ -295,7 +311,7 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ payloads = append(payloads, MarshalSP(SPTypeInfo, info)) ctx.LogD("sp-info-our", SDS{ "node": nodeId, - "name": ToBase32(info.Hash[:]), + "name": Base32Codec.EncodeToString(info.Hash[:]), "size": info.Size, }, "") } @@ -318,14 +334,14 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } var rxLock *os.File if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) { - rxLock, err = state.Ctx.LockDir(nodeId, TRx) + rxLock, err = state.Ctx.LockDir(nodeId, string(TRx)) if err != nil { return err } } var txLock *os.File if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { - txLock, err = state.Ctx.LockDir(nodeId, TTx) + txLock, err = state.Ctx.LockDir(nodeId, string(TTx)) if err != nil { return err } @@ -347,6 +363,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } state.hs = hs state.payloads = make(chan []byte) + state.pings = make(chan struct{}) state.infosTheir = make(map[[32]byte]*SPInfo) state.infosOurSeen = make(map[[32]byte]uint8) state.started = started @@ -375,14 +392,14 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } sds := SDS{"node": nodeId, "nice": int(state.Nice)} state.Ctx.LogD("sp-start", sds, "sending first message") - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err = state.WriteSP(conn, buf); err != nil { + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } state.Ctx.LogD("sp-start", sds, "waiting for first message") - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() @@ -421,6 +438,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { xxOnly := TRxTx("") state.hs = hs state.payloads = make(chan []byte) + state.pings = make(chan struct{}) state.infosOurSeen = make(map[[32]byte]uint8) state.infosTheir = make(map[[32]byte]*SPInfo) state.started = started @@ -428,7 +446,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { var buf []byte var payload []byte state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message") - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { state.Ctx.LogE("sp-start", SDS{}, err, "") return err @@ -446,7 +464,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } } if node == nil { - peerId := ToBase32(state.hs.PeerStatic()) + peerId := Base32Codec.EncodeToString(state.hs.PeerStatic()) state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "") return errors.New("Unknown peer: " + peerId) } @@ -457,12 +475,12 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.maxOnlineTime = node.MaxOnlineTime sds := SDS{"node": node.Id, "nice": int(state.Nice)} - if state.Ctx.ensureRxDir(node.Id); err != nil { + if err = state.Ctx.ensureRxDir(node.Id); err != nil { return err } var rxLock *os.File if xxOnly == "" || xxOnly == TRx { - rxLock, err = state.Ctx.LockDir(node.Id, TRx) + rxLock, err = state.Ctx.LockDir(node.Id, string(TRx)) if err != nil { return err } @@ -470,7 +488,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.rxLock = rxLock var txLock *os.File if xxOnly == "" || xxOnly == TTx { - txLock, err = state.Ctx.LockDir(node.Id, TTx) + txLock, err = state.Ctx.LockDir(node.Id, string(TTx)) if err != nil { return err } @@ -496,8 +514,8 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.dirUnlock() return err } - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err = state.WriteSP(conn, buf); err != nil { + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err @@ -561,25 +579,33 @@ func (state *SPState) StartWorkers( state.wg.Done() }() - // Deadline checker + // Periodic jobs state.wg.Add(1) go func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - defer state.wg.Done() + deadlineTicker := time.NewTicker(time.Second) + pingTicker := time.NewTicker(PingTimeout) for { select { - case _, ok := <-state.isDead: - if !ok { - return - } - case now := <-ticker.C: - if (uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && - uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline) || - (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) { + case <-state.isDead: + state.wg.Done() + deadlineTicker.Stop() + pingTicker.Stop() + return + case now := <-deadlineTicker.C: + if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline && + now.Sub(state.TxLastNonPing) >= state.onlineDeadline) || + (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) || + (now.Sub(state.RxLastSeen) >= 2*PingTimeout) { state.SetDead() - conn.Close() - return + conn.Close() // #nosec G104 + } + case now := <-pingTicker.C: + if now.After(state.TxLastSeen.Add(PingTimeout)) { + state.wg.Add(1) + go func() { + state.pings <- struct{}{} + state.wg.Done() + }() } } } @@ -592,12 +618,10 @@ func (state *SPState) StartWorkers( ticker := time.NewTicker(time.Second) for { select { - case _, ok := <-state.isDead: - if !ok { - state.wg.Done() - ticker.Stop() - return - } + case <-state.isDead: + state.wg.Done() + ticker.Stop() + return case <-ticker.C: for _, payload := range state.Ctx.infosOur( state.Node.Id, @@ -619,12 +643,20 @@ func (state *SPState) StartWorkers( // Sender state.wg.Add(1) go func() { + defer conn.Close() + defer state.SetDead() + defer state.wg.Done() for { if state.NotAlive() { - break + return } var payload []byte + var ping bool select { + case <-state.pings: + state.Ctx.LogD("sp-xmit", sds, "got ping") + payload = SPPingMarshalized + ping = true case payload = <-state.payloads: state.Ctx.LogD( "sp-xmit", @@ -632,8 +664,6 @@ func (state *SPState) StartWorkers( "got payload", ) default: - } - if payload == nil { state.RLock() if len(state.queueTheir) == 0 { state.RUnlock() @@ -642,14 +672,12 @@ func (state *SPState) StartWorkers( } freq := state.queueTheir[0].freq state.RUnlock() - if state.txRate > 0 { time.Sleep(time.Second / time.Duration(state.txRate)) } - sdsp := SdsAdd(sds, SDS{ "xx": string(TTx), - "pkt": ToBase32(freq.Hash[:]), + "pkt": Base32Codec.EncodeToString(freq.Hash[:]), "size": int64(freq.Offset), }) state.Ctx.LogD("sp-file", sdsp, "queueing") @@ -657,16 +685,16 @@ func (state *SPState) StartWorkers( state.Ctx.Spool, state.Node.Id.String(), string(TTx), - ToBase32(freq.Hash[:]), + Base32Codec.EncodeToString(freq.Hash[:]), )) if err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } fi, err := fd.Stat() if err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } fullSize := fi.Size() var buf []byte @@ -674,18 +702,18 @@ func (state *SPState) StartWorkers( state.Ctx.LogD("sp-file", sdsp, "seeking") if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - break + return } buf = buf[:n] state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read") } - fd.Close() + fd.Close() // #nosec G104 payload = MarshalSP(SPTypeFile, SPFile{ Hash: freq.Hash, Offset: freq.Offset, @@ -714,19 +742,13 @@ func (state *SPState) StartWorkers( } state.Unlock() } - state.Ctx.LogD( - "sp-xmit", - SdsAdd(sds, SDS{"size": len(payload)}), - "sending", - ) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { + state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending") + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { state.Ctx.LogE("sp-xmit", sds, err, "") - break + return } } - state.SetDead() - state.wg.Done() }() // Receiver @@ -737,15 +759,14 @@ func (state *SPState) StartWorkers( break } state.Ctx.LogD("sp-recv", sds, "waiting for payload") - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 payload, err := state.ReadSP(conn) if err != nil { if err == io.EOF { break } unmarshalErr := err.(*xdr.UnmarshalError) - netErr, ok := unmarshalErr.Err.(net.Error) - if ok && netErr.Timeout() { + if os.IsTimeout(unmarshalErr.Err) { continue } if unmarshalErr.ErrorCode == xdr.ErrIO { @@ -792,6 +813,8 @@ func (state *SPState) StartWorkers( } state.SetDead() state.wg.Done() + state.SetDead() + conn.Close() // #nosec G104 }() return nil @@ -800,6 +823,7 @@ func (state *SPState) StartWorkers( func (state *SPState) Wait() { state.wg.Wait() close(state.payloads) + close(state.pings) state.dirUnlock() state.Duration = time.Now().Sub(state.started) state.RxSpeed = state.RxBytes @@ -827,7 +851,17 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogE("sp-process", sds, err, "") return nil, err } + if head.Type != SPTypePing { + state.RxLastNonPing = state.RxLastSeen + } switch head.Type { + case SPTypeHalt: + state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") + state.Lock() + state.queueTheir = nil + state.Unlock() + case SPTypePing: + state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "") case SPTypeInfo: infosGot = true sdsp := SdsAdd(sds, SDS{"type": "info"}) @@ -838,7 +872,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return nil, err } sdsp = SdsAdd(sds, SDS{ - "pkt": ToBase32(info.Hash[:]), + "pkt": Base32Codec.EncodeToString(info.Hash[:]), "size": int64(info.Size), "nice": int(info.Nice), }) @@ -858,7 +892,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.Spool, state.Node.Id.String(), string(TRx), - ToBase32(info.Hash[:]), + Base32Codec.EncodeToString(info.Hash[:]), ) if _, err = os.Stat(pktPath); err == nil { state.Ctx.LogI("sp-info", sdsp, "already done") @@ -903,14 +937,14 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return nil, err } sdsp["xx"] = string(TRx) - sdsp["pkt"] = ToBase32(file.Hash[:]) + sdsp["pkt"] = Base32Codec.EncodeToString(file.Hash[:]) sdsp["size"] = len(file.Payload) dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), ) - filePath := filepath.Join(dirToSync, ToBase32(file.Hash[:])) + filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:])) state.Ctx.LogD("sp-file", sdsp, "opening part") fd, err := os.OpenFile( filePath+PartSuffix, @@ -928,14 +962,14 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { ) if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - fd.Close() + fd.Close() // #nosec G104 return nil, err } state.Ctx.LogD("sp-file", sdsp, "writing") _, err = fd.Write(file.Payload) if err != nil { state.Ctx.LogE("sp-file", sdsp, err, "") - fd.Close() + fd.Close() // #nosec G104 return nil, err } ourSize := int64(file.Offset + uint64(len(file.Payload))) @@ -952,7 +986,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { Progress("Rx", sdsp) } if fullsize != ourSize { - fd.Close() + fd.Close() // #nosec G104 continue } spWorkersGroup.Wait() @@ -960,15 +994,19 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { go func() { if err := fd.Sync(); err != nil { state.Ctx.LogE("sp-file", sdsp, err, "sync") - fd.Close() + fd.Close() // #nosec G104 return } state.wg.Add(1) defer state.wg.Done() - fd.Seek(0, io.SeekStart) + if _, err = fd.Seek(0, io.SeekStart); err != nil { + fd.Close() // #nosec G104 + state.Ctx.LogE("sp-file", sdsp, err, "") + return + } state.Ctx.LogD("sp-file", sdsp, "checking") gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs) - fd.Close() + fd.Close() // #nosec G104 if err != nil || !gut { state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "") return @@ -1000,13 +1038,13 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "") return nil, err } - sdsp["pkt"] = ToBase32(done.Hash[:]) + sdsp["pkt"] = Base32Codec.EncodeToString(done.Hash[:]) state.Ctx.LogD("sp-done", sdsp, "removing") err := os.Remove(filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), - ToBase32(done.Hash[:]), + Base32Codec.EncodeToString(done.Hash[:]), )) sdsp["xx"] = string(TTx) if err == nil { @@ -1022,7 +1060,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogE("sp-process", sdsp, err, "") return nil, err } - sdsp["pkt"] = ToBase32(freq.Hash[:]) + sdsp["pkt"] = Base32Codec.EncodeToString(freq.Hash[:]) sdsp["offset"] = freq.Offset state.Ctx.LogD("sp-process", sdsp, "queueing") nice, exists := state.infosOurSeen[*freq.Hash] @@ -1046,11 +1084,6 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } else { state.Ctx.LogD("sp-process", sdsp, "unknown") } - case SPTypeHalt: - state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") - state.Lock() - state.queueTheir = nil - state.Unlock() default: state.Ctx.LogE( "sp-process",