X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=4db358742a17b1f584aa102ac853a1c6386c6d13;hb=0367cce2741e1ce6a89a49fd5c4e9df6005c9744;hp=66fa0375797d2d53fb1cacea0d6143bcf7b0096e;hpb=9fbc648a2821d9a6e0de6352d9adb40f98f1723a;p=nncp.git diff --git a/src/sp.go b/src/sp.go index 66fa037..4db3587 100644 --- a/src/sp.go +++ b/src/sp.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -23,6 +23,7 @@ import ( "errors" "fmt" "io" + "log" "os" "path/filepath" "sort" @@ -438,7 +439,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { NicenessFmt(state.Nice), ) }) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-startI", les, err, func(les LEs) string { return fmt.Sprintf( @@ -457,7 +458,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { NicenessFmt(state.Nice), ) }) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) if buf, err = state.ReadSP(conn); err != nil { state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { return fmt.Sprintf( @@ -537,7 +538,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } les := LEs{{"Nice", int(state.Nice)}} state.Ctx.LogD("sp-startR", les, logMsg) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) if buf, err = state.ReadSP(conn); err != nil { state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err @@ -549,6 +550,9 @@ func (state *SPState) StartR(conn ConnDeadlined) error { var node *Node for _, n := range state.Ctx.Neigh { + if n.NoisePub == nil { + continue + } if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 { node = n break @@ -611,7 +615,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.dirUnlock() return err } - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string { return fmt.Sprintf( @@ -752,7 +756,7 @@ func (state *SPState) StartWorkers( break Deadlined: state.SetDead() - conn.Close() // #nosec G104 + conn.Close() case now := <-pingTicker.C: if now.After(state.TxLastSeen.Add(PingTimeout)) { state.wg.Add(1) @@ -769,14 +773,21 @@ func (state *SPState) StartWorkers( if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { state.wg.Add(1) go func() { - ticker := time.NewTicker(time.Second) + dw, err := state.Ctx.NewDirWatcher( + filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)), + time.Second, + ) + if err != nil { + state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg) + log.Fatalln(err) + } for { select { case <-state.isDead: + dw.Close() state.wg.Done() - ticker.Stop() return - case <-ticker.C: + case <-dw.C: for _, payload := range state.Ctx.infosOur( state.Node.Id, state.Nice, @@ -986,7 +997,7 @@ func (state *SPState) StartWorkers( ) } state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) ct, err := state.csOur.Encrypt(nil, nil, payload) if err != nil { state.Ctx.LogE("sp-encrypting", les, err, logMsg) @@ -1013,7 +1024,7 @@ func (state *SPState) StartWorkers( ) } state.Ctx.LogD("sp-recv-wait", les, logMsg) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) payload, err := state.ReadSP(conn) if err != nil { if err == io.EOF { @@ -1087,7 +1098,7 @@ func (state *SPState) StartWorkers( state.SetDead() state.wg.Done() state.SetDead() - conn.Close() // #nosec G104 + conn.Close() }() return nil @@ -1252,7 +1263,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } continue } - if _, err = os.Stat(pktPath + SeenSuffix); err == nil { + if _, err = os.Stat(filepath.Join( + state.Ctx.Spool, state.Node.Id.String(), string(TRx), + SeenDir, Base32Codec.EncodeToString(info.Hash[:]), + )); err == nil { state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string { return logMsg(les) + ": already seen" }) @@ -1573,7 +1587,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return fmt.Sprintf("Packet %s is sent", pktName) }) if state.Ctx.HdrUsage { - os.Remove(pth + HdrSuffix) + os.Remove(JobPath2Hdr(pth)) } } else { state.Ctx.LogE("sp-done", lesp, err, logMsg)