From ce54ce6d5524ca0da2bfd542100178ff870a609c Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Fri, 19 Feb 2021 18:59:57 +0300 Subject: [PATCH] Do not keep files opened --- doc/news.ru.texi | 12 ++++++++++++ doc/news.texi | 14 +++++++++++++- src/check.go | 10 ++++++++-- src/cmd/nncp-bundle/main.go | 15 ++++++++++----- src/cmd/nncp-caller/main.go | 1 - src/cmd/nncp-stat/main.go | 2 -- src/cmd/nncp-xfer/main.go | 19 +++++++++++-------- src/jobs.go | 17 +++++++---------- src/nncp.go | 2 +- src/sp.go | 1 - src/toss.go | 36 ++++++++++++++++++------------------ src/toss_test.go | 7 ++++++- src/tx_test.go | 8 ++++++-- 13 files changed, 92 insertions(+), 52 deletions(-) diff --git a/doc/news.ru.texi b/doc/news.ru.texi index d758060..f4df62e 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -1,6 +1,18 @@ @node Новости @section Новости +@node Релиз 6.1.0 +@subsection Релиз 6.1.0 +@itemize + +@item +Оптимизация: большинство команд теперь не держат открытыми файловые +дескрипторы. Прежде вы легко могли выйти за пределы максимально +допустимого количества открытых файлов, если у вас было много пакетов в +spool директории. + +@end itemize + @node Релиз 6.0.0 @subsection Релиз 6.0.0 @itemize diff --git a/doc/news.texi b/doc/news.texi index 866fb53..68661db 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -3,11 +3,23 @@ See also this page @ref{Новости, on russian}. +@node Release 6.1.0 +@section Release 6.1.0 +@itemize + +@item +Optimization: most commands do not keep opened file descriptors now. +Previously you can exceed maximal number of opened files if you have got +many packets in the spool directory. + +@end itemize + @node Release 6.0.0 @section Release 6.0.0 @itemize -@item Log uses human readable and easy machine parseable +@item +Log uses human readable and easy machine parseable @url{https://www.gnu.org/software/recutils/, recfile} format for the records, instead of structured RFC 3339 lines. Old logs are not readable by @command{nncp-log} anymore. diff --git a/src/check.go b/src/check.go index 731e869..b2ea671 100644 --- a/src/check.go +++ b/src/check.go @@ -23,6 +23,7 @@ import ( "errors" "io" "log" + "os" "golang.org/x/crypto/blake2b" ) @@ -47,8 +48,13 @@ func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool { {"Pkt", Base32Codec.EncodeToString(job.HshValue[:])}, {"FullSize", job.Size}, } - gut, err := Check(job.Fd, job.HshValue[:], les, ctx.ShowPrgrs) - job.Fd.Close() // #nosec G104 + fd, err := os.Open(job.Path) + if err != nil { + ctx.LogE("check", les, err, "") + return true + } + gut, err := Check(fd, job.HshValue[:], les, ctx.ShowPrgrs) + fd.Close() // #nosec G104 if err != nil { ctx.LogE("check", les, err, "") return true diff --git a/src/cmd/nncp-bundle/main.go b/src/cmd/nncp-bundle/main.go index 0dbe3ba..106f820 100644 --- a/src/cmd/nncp-bundle/main.go +++ b/src/cmd/nncp-bundle/main.go @@ -125,13 +125,16 @@ func main() { {K: "Pkt", V: "dummy"}, } for job := range ctx.Jobs(&nodeId, nncp.TTx) { - pktName = filepath.Base(job.Fd.Name()) + pktName = filepath.Base(job.Path) les[len(les)-1].V = pktName if job.PktEnc.Nice > nice { ctx.LogD("nncp-bundle", les, "too nice") - job.Fd.Close() // #nosec G104 continue } + fd, err := os.Open(job.Path) + if err != nil { + log.Fatalln("Error during opening:", err) + } if err = tarWr.WriteHeader(&tar.Header{ Format: tar.FormatUSTAR, Name: nncp.NNCPBundlePrefix, @@ -155,7 +158,7 @@ func main() { log.Fatalln("Error writing tar header:", err) } if _, err = nncp.CopyProgressed( - tarWr, job.Fd, "Tx", + tarWr, bufio.NewReader(fd), "Tx", append(les, nncp.LEs{ {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])}, {K: "FullSize", V: job.Size}, @@ -164,7 +167,9 @@ func main() { ); err != nil { log.Fatalln("Error during copying to tar:", err) } - job.Fd.Close() // #nosec G104 + if err = fd.Close(); err != nil { + log.Fatalln("Error during closing:", err) + } if err = tarWr.Flush(); err != nil { log.Fatalln("Error during tar flushing:", err) } @@ -172,7 +177,7 @@ func main() { log.Fatalln("Error during stdout flushing:", err) } if *doDelete { - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { log.Fatalln("Error during deletion:", err) } } diff --git a/src/cmd/nncp-caller/main.go b/src/cmd/nncp-caller/main.go index b702b49..245c618 100644 --- a/src/cmd/nncp-caller/main.go +++ b/src/cmd/nncp-caller/main.go @@ -138,7 +138,6 @@ func main() { ctx.LogD("caller", les, "checking tx existence") txExists := false for job := range ctx.Jobs(node.Id, nncp.TTx) { - job.Fd.Close() if job.PktEnc.Nice > call.Nice { continue } diff --git a/src/cmd/nncp-stat/main.go b/src/cmd/nncp-stat/main.go index 04aaed2..994f83d 100644 --- a/src/cmd/nncp-stat/main.go +++ b/src/cmd/nncp-stat/main.go @@ -99,7 +99,6 @@ func main() { rxNums := make(map[uint8]int) rxBytes := make(map[uint8]int64) for job := range ctx.Jobs(node.Id, nncp.TRx) { - job.Fd.Close() // #nosec G104 if *showPkt { jobPrint(nncp.TRx, job) } @@ -109,7 +108,6 @@ func main() { txNums := make(map[uint8]int) txBytes := make(map[uint8]int64) for job := range ctx.Jobs(node.Id, nncp.TTx) { - job.Fd.Close() // #nosec G104 if *showPkt { jobPrint(nncp.TTx, job) } diff --git a/src/cmd/nncp-xfer/main.go b/src/cmd/nncp-xfer/main.go index 06db800..b975467 100644 --- a/src/cmd/nncp-xfer/main.go +++ b/src/cmd/nncp-xfer/main.go @@ -315,39 +315,42 @@ Tx: } les = les[:len(les)-1] for job := range ctx.Jobs(&nodeId, nncp.TTx) { - pktName := filepath.Base(job.Fd.Name()) + pktName := filepath.Base(job.Path) les := append(les, nncp.LE{K: "Pkt", V: pktName}) if job.PktEnc.Nice > nice { ctx.LogD("nncp-xfer", les, "too nice") - job.Fd.Close() // #nosec G104 continue } if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) { ctx.LogD("nncp-xfer", les, "already exists") - job.Fd.Close() // #nosec G104 continue } if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) { ctx.LogD("nncp-xfer", les, "already exists") - job.Fd.Close() // #nosec G104 continue } tmp, err := nncp.TempFile(dstPath, "xfer") if err != nil { ctx.LogE("nncp-xfer", les, err, "mktemp") - job.Fd.Close() // #nosec G104 isBad = true break } les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()}) ctx.LogD("nncp-xfer", les, "created") + fd, err := os.Open(job.Path) + if err != nil { + ctx.LogE("nncp-xfer", les, err, "open") + tmp.Close() // #nosec G104 + isBad = true + continue + } bufW := bufio.NewWriter(tmp) copied, err := nncp.CopyProgressed( - bufW, bufio.NewReader(job.Fd), "Tx", + bufW, bufio.NewReader(fd), "Tx", append(les, nncp.LE{K: "FullSize", V: job.Size}), ctx.ShowPrgrs, ) - job.Fd.Close() // #nosec G104 + fd.Close() // #nosec G104 if err != nil { ctx.LogE("nncp-xfer", les, err, "copy") tmp.Close() // #nosec G104 @@ -383,7 +386,7 @@ Tx: les = les[:len(les)-1] ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "") if !*keep { - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("nncp-xfer", les, err, "remove") isBad = true } diff --git a/src/jobs.go b/src/jobs.go index 8fe4564..3dc6bd4 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -18,7 +18,6 @@ along with this program. If not, see . package nncp import ( - "io" "os" "path/filepath" @@ -34,7 +33,7 @@ const ( type Job struct { PktEnc *PktEnc - Fd *os.File + Path string Size int64 HshValue *[32]byte } @@ -58,17 +57,15 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { if err != nil { continue } - fd, err := os.Open(filepath.Join(rxPath, fi.Name())) + pth := filepath.Join(rxPath, fi.Name()) + fd, err := os.Open(pth) if err != nil { continue } var pktEnc PktEnc - if _, err = xdr.Unmarshal(fd, &pktEnc); err != nil || pktEnc.Magic != MagicNNCPEv4 { - fd.Close() // #nosec G104 - continue - } - if _, err = fd.Seek(0, io.SeekStart); err != nil { - fd.Close() // #nosec G104 + _, err = xdr.Unmarshal(fd, &pktEnc) + fd.Close() + if err != nil || pktEnc.Magic != MagicNNCPEv4 { continue } ctx.LogD("jobs", LEs{ @@ -80,7 +77,7 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { }, "taken") job := Job{ PktEnc: &pktEnc, - Fd: fd, + Path: pth, Size: fi.Size(), HshValue: new([32]byte), } diff --git a/src/nncp.go b/src/nncp.go index 954dad1..5e44d5b 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -38,7 +38,7 @@ along with this program. If not, see .` ) var ( - Version string = "6.0.0" + Version string = "6.1.0" Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) ) diff --git a/src/sp.go b/src/sp.go index 3c6d2d4..b5bad2d 100644 --- a/src/sp.go +++ b/src/sp.go @@ -292,7 +292,6 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { - job.Fd.Close() // #nosec G104 if job.PktEnc.Nice > nice { continue } diff --git a/src/toss.go b/src/toss.go index cc49784..9916cc6 100644 --- a/src/toss.go +++ b/src/toss.go @@ -83,23 +83,24 @@ func (ctx *Ctx) Toss( } defer decompressor.Close() for job := range ctx.Jobs(nodeId, TRx) { - pktName := filepath.Base(job.Fd.Name()) + pktName := filepath.Base(job.Path) les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}} if job.PktEnc.Nice > nice { ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice") - job.Fd.Close() // #nosec G104 continue } + fd, err := os.Open(job.Path) + if err != nil { + ctx.LogE("rx", les, err, "open") + isBad = true + continue + } + pipeR, pipeW := io.Pipe() go func(job Job) error { pipeWB := bufio.NewWriter(pipeW) - _, _, err := PktEncRead( - ctx.Self, - ctx.Neigh, - bufio.NewReader(job.Fd), - pipeWB, - ) - job.Fd.Close() // #nosec G104 + _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB) + fd.Close() // #nosec G104 if err != nil { return pipeW.CloseWithError(err) } @@ -109,7 +110,6 @@ func (ctx *Ctx) Toss( return pipeW.Close() }(job) var pkt Pkt - var err error var pktSize int64 var pktSizeBlocks int64 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil { @@ -197,11 +197,11 @@ func (ctx *Ctx) Toss( ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true } @@ -293,11 +293,11 @@ func (ctx *Ctx) Toss( ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true } @@ -362,11 +362,11 @@ func (ctx *Ctx) Toss( ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true } @@ -408,11 +408,11 @@ func (ctx *Ctx) Toss( ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true } diff --git a/src/toss_test.go b/src/toss_test.go index 8a16dd9..3411382 100644 --- a/src/toss_test.go +++ b/src/toss_test.go @@ -383,7 +383,12 @@ func TestTossFreq(t *testing.T) { } for job := range ctx.Jobs(ctx.Self.Id, TTx) { var buf bytes.Buffer - _, _, err := PktEncRead(ctx.Self, ctx.Neigh, job.Fd, &buf) + fd, err := os.Open(job.Path) + if err != nil { + t.Error(err) + return false + } + _, _, err = PktEncRead(ctx.Self, ctx.Neigh, fd, &buf) if err != nil { t.Error(err) return false diff --git a/src/tx_test.go b/src/tx_test.go index ab0c1c6..7e07a05 100644 --- a/src/tx_test.go +++ b/src/tx_test.go @@ -98,9 +98,13 @@ func TestTx(t *testing.T) { return false } txJob := sentJobs[0] - defer txJob.Fd.Close() + fd, err := os.Open(txJob.Path) + if err != nil { + panic(err) + } + defer fd.Close() var bufR bytes.Buffer - if _, err = io.Copy(&bufR, txJob.Fd); err != nil { + if _, err = io.Copy(&bufR, fd); err != nil { panic(err) } var bufW bytes.Buffer -- 2.44.0