]> Cypherpunks.ru repositories - nncp.git/commitdiff
Do not keep files opened
authorSergey Matveev <stargrave@stargrave.org>
Fri, 19 Feb 2021 15:59:57 +0000 (18:59 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sat, 20 Feb 2021 19:37:03 +0000 (22:37 +0300)
13 files changed:
doc/news.ru.texi
doc/news.texi
src/check.go
src/cmd/nncp-bundle/main.go
src/cmd/nncp-caller/main.go
src/cmd/nncp-stat/main.go
src/cmd/nncp-xfer/main.go
src/jobs.go
src/nncp.go
src/sp.go
src/toss.go
src/toss_test.go
src/tx_test.go

index d758060dc1442eae13adf5bced5027008eb86554..f4df62e64d9f8befb2172db3950d4984b6c9516b 100644 (file)
@@ -1,6 +1,18 @@
 @node Новости
 @section Новости
 
+@node Релиз 6.1.0
+@subsection Релиз 6.1.0
+@itemize
+
+@item
+Оптимизация: большинство команд теперь не держат открытыми файловые
+дескрипторы. Прежде вы легко могли выйти за пределы максимально
+допустимого количества открытых файлов, если у вас было много пакетов в
+spool директории.
+
+@end itemize
+
 @node Релиз 6.0.0
 @subsection Релиз 6.0.0
 @itemize
index 866fb531d435c1ed6e49e7d44e3df39e61f3f3af..68661dbb3c2b8ac3a322a2ef3c0a9ac68118160b 100644 (file)
@@ -3,11 +3,23 @@
 
 See also this page @ref{Новости, on russian}.
 
+@node Release 6.1.0
+@section Release 6.1.0
+@itemize
+
+@item
+Optimization: most commands do not keep opened file descriptors now.
+Previously you can exceed maximal number of opened files if you have got
+many packets in the spool directory.
+
+@end itemize
+
 @node Release 6.0.0
 @section Release 6.0.0
 @itemize
 
-@item Log uses human readable and easy machine parseable
+@item
+Log uses human readable and easy machine parseable
 @url{https://www.gnu.org/software/recutils/, recfile} format for the
 records, instead of structured RFC 3339 lines. Old logs are not readable
 by @command{nncp-log} anymore.
index 731e86937c55cb7b2840bbad3d2a1e67cca17398..b2ea671325d0a027b956e05286a0bda4dd36d6f3 100644 (file)
@@ -23,6 +23,7 @@ import (
        "errors"
        "io"
        "log"
+       "os"
 
        "golang.org/x/crypto/blake2b"
 )
@@ -47,8 +48,13 @@ func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool {
                        {"Pkt", Base32Codec.EncodeToString(job.HshValue[:])},
                        {"FullSize", job.Size},
                }
-               gut, err := Check(job.Fd, job.HshValue[:], les, ctx.ShowPrgrs)
-               job.Fd.Close() // #nosec G104
+               fd, err := os.Open(job.Path)
+               if err != nil {
+                       ctx.LogE("check", les, err, "")
+                       return true
+               }
+               gut, err := Check(fd, job.HshValue[:], les, ctx.ShowPrgrs)
+               fd.Close() // #nosec G104
                if err != nil {
                        ctx.LogE("check", les, err, "")
                        return true
index 0dbe3ba34a45b9ac1e3ab97562891cab77385937..106f820c26f28b4f9e33c723d36be6ff777de692 100644 (file)
@@ -125,13 +125,16 @@ func main() {
                                {K: "Pkt", V: "dummy"},
                        }
                        for job := range ctx.Jobs(&nodeId, nncp.TTx) {
-                               pktName = filepath.Base(job.Fd.Name())
+                               pktName = filepath.Base(job.Path)
                                les[len(les)-1].V = pktName
                                if job.PktEnc.Nice > nice {
                                        ctx.LogD("nncp-bundle", les, "too nice")
-                                       job.Fd.Close() // #nosec G104
                                        continue
                                }
+                               fd, err := os.Open(job.Path)
+                               if err != nil {
+                                       log.Fatalln("Error during opening:", err)
+                               }
                                if err = tarWr.WriteHeader(&tar.Header{
                                        Format:   tar.FormatUSTAR,
                                        Name:     nncp.NNCPBundlePrefix,
@@ -155,7 +158,7 @@ func main() {
                                        log.Fatalln("Error writing tar header:", err)
                                }
                                if _, err = nncp.CopyProgressed(
-                                       tarWr, job.Fd, "Tx",
+                                       tarWr, bufio.NewReader(fd), "Tx",
                                        append(les, nncp.LEs{
                                                {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])},
                                                {K: "FullSize", V: job.Size},
@@ -164,7 +167,9 @@ func main() {
                                ); err != nil {
                                        log.Fatalln("Error during copying to tar:", err)
                                }
-                               job.Fd.Close() // #nosec G104
+                               if err = fd.Close(); err != nil {
+                                       log.Fatalln("Error during closing:", err)
+                               }
                                if err = tarWr.Flush(); err != nil {
                                        log.Fatalln("Error during tar flushing:", err)
                                }
@@ -172,7 +177,7 @@ func main() {
                                        log.Fatalln("Error during stdout flushing:", err)
                                }
                                if *doDelete {
-                                       if err = os.Remove(job.Fd.Name()); err != nil {
+                                       if err = os.Remove(job.Path); err != nil {
                                                log.Fatalln("Error during deletion:", err)
                                        }
                                }
index b702b498038ca4eeb04a4e01b29e372ea70e83f4..245c6183689a075c3e596a5b5ca1bc43c5f604da 100644 (file)
@@ -138,7 +138,6 @@ func main() {
                                                        ctx.LogD("caller", les, "checking tx existence")
                                                        txExists := false
                                                        for job := range ctx.Jobs(node.Id, nncp.TTx) {
-                                                               job.Fd.Close()
                                                                if job.PktEnc.Nice > call.Nice {
                                                                        continue
                                                                }
index 04aaed24c0d5eef77a0ea0bfb5665a28c2b48898..994f83db6a8a1fdb66373f61e7d5fc9a20e90d2b 100644 (file)
@@ -99,7 +99,6 @@ func main() {
                rxNums := make(map[uint8]int)
                rxBytes := make(map[uint8]int64)
                for job := range ctx.Jobs(node.Id, nncp.TRx) {
-                       job.Fd.Close() // #nosec G104
                        if *showPkt {
                                jobPrint(nncp.TRx, job)
                        }
@@ -109,7 +108,6 @@ func main() {
                txNums := make(map[uint8]int)
                txBytes := make(map[uint8]int64)
                for job := range ctx.Jobs(node.Id, nncp.TTx) {
-                       job.Fd.Close() // #nosec G104
                        if *showPkt {
                                jobPrint(nncp.TTx, job)
                        }
index 06db800fc7588e427d9190be4e4de89af12b2d47..b9754674d172246e029378162f1f2fcf5a578ae2 100644 (file)
@@ -315,39 +315,42 @@ Tx:
                }
                les = les[:len(les)-1]
                for job := range ctx.Jobs(&nodeId, nncp.TTx) {
-                       pktName := filepath.Base(job.Fd.Name())
+                       pktName := filepath.Base(job.Path)
                        les := append(les, nncp.LE{K: "Pkt", V: pktName})
                        if job.PktEnc.Nice > nice {
                                ctx.LogD("nncp-xfer", les, "too nice")
-                               job.Fd.Close() // #nosec G104
                                continue
                        }
                        if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) {
                                ctx.LogD("nncp-xfer", les, "already exists")
-                               job.Fd.Close() // #nosec G104
                                continue
                        }
                        if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) {
                                ctx.LogD("nncp-xfer", les, "already exists")
-                               job.Fd.Close() // #nosec G104
                                continue
                        }
                        tmp, err := nncp.TempFile(dstPath, "xfer")
                        if err != nil {
                                ctx.LogE("nncp-xfer", les, err, "mktemp")
-                               job.Fd.Close() // #nosec G104
                                isBad = true
                                break
                        }
                        les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()})
                        ctx.LogD("nncp-xfer", les, "created")
+                       fd, err := os.Open(job.Path)
+                       if err != nil {
+                               ctx.LogE("nncp-xfer", les, err, "open")
+                               tmp.Close() // #nosec G104
+                               isBad = true
+                               continue
+                       }
                        bufW := bufio.NewWriter(tmp)
                        copied, err := nncp.CopyProgressed(
-                               bufW, bufio.NewReader(job.Fd), "Tx",
+                               bufW, bufio.NewReader(fd), "Tx",
                                append(les, nncp.LE{K: "FullSize", V: job.Size}),
                                ctx.ShowPrgrs,
                        )
-                       job.Fd.Close() // #nosec G104
+                       fd.Close() // #nosec G104
                        if err != nil {
                                ctx.LogE("nncp-xfer", les, err, "copy")
                                tmp.Close() // #nosec G104
@@ -383,7 +386,7 @@ Tx:
                        les = les[:len(les)-1]
                        ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "")
                        if !*keep {
-                               if err = os.Remove(job.Fd.Name()); err != nil {
+                               if err = os.Remove(job.Path); err != nil {
                                        ctx.LogE("nncp-xfer", les, err, "remove")
                                        isBad = true
                                }
index 8fe45642d4e8e33998fb89950163973c3ced5f20..3dc6bd431e3641439ae6d5f2c3b00df0f9052670 100644 (file)
@@ -18,7 +18,6 @@ along with this program.  If not, see <http://www.gnu.org/licenses/>.
 package nncp
 
 import (
-       "io"
        "os"
        "path/filepath"
 
@@ -34,7 +33,7 @@ const (
 
 type Job struct {
        PktEnc   *PktEnc
-       Fd       *os.File
+       Path     string
        Size     int64
        HshValue *[32]byte
 }
@@ -58,17 +57,15 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
                        if err != nil {
                                continue
                        }
-                       fd, err := os.Open(filepath.Join(rxPath, fi.Name()))
+                       pth := filepath.Join(rxPath, fi.Name())
+                       fd, err := os.Open(pth)
                        if err != nil {
                                continue
                        }
                        var pktEnc PktEnc
-                       if _, err = xdr.Unmarshal(fd, &pktEnc); err != nil || pktEnc.Magic != MagicNNCPEv4 {
-                               fd.Close() // #nosec G104
-                               continue
-                       }
-                       if _, err = fd.Seek(0, io.SeekStart); err != nil {
-                               fd.Close() // #nosec G104
+                       _, err = xdr.Unmarshal(fd, &pktEnc)
+                       fd.Close()
+                       if err != nil || pktEnc.Magic != MagicNNCPEv4 {
                                continue
                        }
                        ctx.LogD("jobs", LEs{
@@ -80,7 +77,7 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
                        }, "taken")
                        job := Job{
                                PktEnc:   &pktEnc,
-                               Fd:       fd,
+                               Path:     pth,
                                Size:     fi.Size(),
                                HshValue: new([32]byte),
                        }
index 954dad18932992fa51489635c9bd27ddcf7ac9ee..5e44d5b1cbafaa9dba221f57006484c557720524 100644 (file)
@@ -38,7 +38,7 @@ along with this program.  If not, see <http://www.gnu.org/licenses/>.`
 )
 
 var (
-       Version string = "6.0.0"
+       Version string = "6.1.0"
 
        Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding)
 )
index 3c6d2d4bfbfc1f7535f883829e30a7203dd57da8..b5bad2d5dcd21a16b909c44a8ab29addf6d8a3a2 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -292,7 +292,6 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [
        var infos []*SPInfo
        var totalSize int64
        for job := range ctx.Jobs(nodeId, TTx) {
-               job.Fd.Close() // #nosec G104
                if job.PktEnc.Nice > nice {
                        continue
                }
index cc49784b335ffb7bfdecf1dd395b17fac68c5ca1..9916cc6f11596bf2c47646fdd0d0ca0c59a7715d 100644 (file)
@@ -83,23 +83,24 @@ func (ctx *Ctx) Toss(
        }
        defer decompressor.Close()
        for job := range ctx.Jobs(nodeId, TRx) {
-               pktName := filepath.Base(job.Fd.Name())
+               pktName := filepath.Base(job.Path)
                les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}}
                if job.PktEnc.Nice > nice {
                        ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice")
-                       job.Fd.Close() // #nosec G104
                        continue
                }
+               fd, err := os.Open(job.Path)
+               if err != nil {
+                       ctx.LogE("rx", les, err, "open")
+                       isBad = true
+                       continue
+               }
+
                pipeR, pipeW := io.Pipe()
                go func(job Job) error {
                        pipeWB := bufio.NewWriter(pipeW)
-                       _, _, err := PktEncRead(
-                               ctx.Self,
-                               ctx.Neigh,
-                               bufio.NewReader(job.Fd),
-                               pipeWB,
-                       )
-                       job.Fd.Close() // #nosec G104
+                       _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB)
+                       fd.Close() // #nosec G104
                        if err != nil {
                                return pipeW.CloseWithError(err)
                        }
@@ -109,7 +110,6 @@ func (ctx *Ctx) Toss(
                        return pipeW.Close()
                }(job)
                var pkt Pkt
-               var err error
                var pktSize int64
                var pktSizeBlocks int64
                if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
@@ -197,11 +197,11 @@ func (ctx *Ctx) Toss(
                        ctx.LogI("rx", les, "")
                        if !dryRun {
                                if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                       if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
                                                fd.Close() // #nosec G104
                                        }
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
+                               if err = os.Remove(job.Path); err != nil {
                                        ctx.LogE("rx", les, err, "remove")
                                        isBad = true
                                }
@@ -293,11 +293,11 @@ func (ctx *Ctx) Toss(
                        ctx.LogI("rx", les, "")
                        if !dryRun {
                                if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                       if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
                                                fd.Close() // #nosec G104
                                        }
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
+                               if err = os.Remove(job.Path); err != nil {
                                        ctx.LogE("rx", les, err, "remove")
                                        isBad = true
                                }
@@ -362,11 +362,11 @@ func (ctx *Ctx) Toss(
                        ctx.LogI("rx", les, "")
                        if !dryRun {
                                if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                       if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
                                                fd.Close() // #nosec G104
                                        }
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
+                               if err = os.Remove(job.Path); err != nil {
                                        ctx.LogE("rx", les, err, "remove")
                                        isBad = true
                                }
@@ -408,11 +408,11 @@ func (ctx *Ctx) Toss(
                        ctx.LogI("rx", les, "")
                        if !dryRun {
                                if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                       if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
                                                fd.Close() // #nosec G104
                                        }
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
+                               if err = os.Remove(job.Path); err != nil {
                                        ctx.LogE("rx", les, err, "remove")
                                        isBad = true
                                }
index 8a16dd94512f184c87e60aef65e88f30c0f629be..34113829d98fa493580f340affab973e5a78b065 100644 (file)
@@ -383,7 +383,12 @@ func TestTossFreq(t *testing.T) {
                }
                for job := range ctx.Jobs(ctx.Self.Id, TTx) {
                        var buf bytes.Buffer
-                       _, _, err := PktEncRead(ctx.Self, ctx.Neigh, job.Fd, &buf)
+                       fd, err := os.Open(job.Path)
+                       if err != nil {
+                               t.Error(err)
+                               return false
+                       }
+                       _, _, err = PktEncRead(ctx.Self, ctx.Neigh, fd, &buf)
                        if err != nil {
                                t.Error(err)
                                return false
index ab0c1c6c8f3a6c6f49ee24772a40aee3dcabba10..7e07a0524a1d0a5837e7ce9033c2ea229d18e4e4 100644 (file)
@@ -98,9 +98,13 @@ func TestTx(t *testing.T) {
                        return false
                }
                txJob := sentJobs[0]
-               defer txJob.Fd.Close()
+               fd, err := os.Open(txJob.Path)
+               if err != nil {
+                       panic(err)
+               }
+               defer fd.Close()
                var bufR bytes.Buffer
-               if _, err = io.Copy(&bufR, txJob.Fd); err != nil {
+               if _, err = io.Copy(&bufR, fd); err != nil {
                        panic(err)
                }
                var bufW bytes.Buffer