From 98a96b86eeac871a1b528e4833dd696864c3a9ae Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 20 Feb 2021 12:02:21 +0300 Subject: [PATCH] Keep downloading file opened --- doc/news.ru.texi | 4 +++ doc/news.texi | 4 +++ src/sp.go | 87 ++++++++++++++++++++++++++++++++---------------- 3 files changed, 67 insertions(+), 28 deletions(-) diff --git a/doc/news.ru.texi b/doc/news.ru.texi index f4df62e..bc42c6e 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -11,6 +11,10 @@ допустимого количества открытых файлов, если у вас было много пакетов в spool директории. +@item +Оптимизация: не закрывать файловый дескриптор файла который мы качаем. +Прежде каждый его кусочек приводил к дорогим open/close вызовам. + @end itemize @node Релиз 6.0.0 diff --git a/doc/news.texi b/doc/news.texi index 68661db..565f79e 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -12,6 +12,10 @@ Optimization: most commands do not keep opened file descriptors now. Previously you can exceed maximal number of opened files if you have got many packets in the spool directory. +@item +Optimization: do not close file descriptor of the file we download +online. Previously each chunk lead to expensive open/close calls. + @end itemize @node Release 6.0.0 diff --git a/src/sp.go b/src/sp.go index b5bad2d..9410a2d 100644 --- a/src/sp.go +++ b/src/sp.go @@ -59,6 +59,11 @@ var ( spCheckerToken chan struct{} ) +type FdAndFullSize struct { + fd *os.File + fullSize int64 +} + type SPType uint8 const ( @@ -214,6 +219,7 @@ type SPState struct { listOnly bool onlyPkts map[[32]byte]bool writeSPBuf bytes.Buffer + fds map[string]FdAndFullSize sync.RWMutex } @@ -529,12 +535,21 @@ func (state *SPState) StartR(conn ConnDeadlined) error { return err } +func (state *SPState) closeFd(pth string) { + s, exists := state.fds[pth] + delete(state.fds, pth) + if exists { + s.fd.Close() + } +} + func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, payload []byte, ) error { les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} + state.fds = make(map[string]FdAndFullSize) state.isDead = make(chan struct{}) if state.maxOnlineTime > 0 { state.mustFinishAt = state.started.Add(state.maxOnlineTime) @@ -682,22 +697,29 @@ func (state *SPState) StartWorkers( {"Size", int64(freq.Offset)}, }...) state.Ctx.LogD("sp-file", lesp, "queueing") - fd, err := os.Open(filepath.Join( + pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), Base32Codec.EncodeToString(freq.Hash[:]), - )) - if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") - return - } - fi, err := fd.Stat() - if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") - return + ) + fdAndFullSize, exists := state.fds[pth] + if !exists { + fd, err := os.Open(pth) + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "") + return + } + fi, err := fd.Stat() + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "") + return + } + fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()} + state.fds[pth] = fdAndFullSize } - fullSize := fi.Size() + fd := fdAndFullSize.fd + fullSize := fdAndFullSize.fullSize var buf []byte if freq.Offset < uint64(fullSize) { state.Ctx.LogD("sp-file", lesp, "seeking") @@ -714,7 +736,7 @@ func (state *SPState) StartWorkers( buf = buf[:n] state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read") } - fd.Close() // #nosec G104 + state.closeFd(pth) payload = MarshalSP(SPTypeFile, SPFile{ Hash: freq.Hash, Offset: freq.Offset, @@ -816,6 +838,9 @@ func (state *SPState) StartWorkers( state.wg.Done() state.SetDead() conn.Close() // #nosec G104 + for _, s := range state.fds { + s.fd.Close() + } }() return nil @@ -944,27 +969,34 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { string(TRx), ) filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:])) + filePathPart := filePath + PartSuffix state.Ctx.LogD("sp-file", lesp, "opening part") - fd, err := os.OpenFile( - filePath+PartSuffix, - os.O_RDWR|os.O_CREATE, - os.FileMode(0666), - ) - if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") - return nil, err + fdAndFullSize, exists := state.fds[filePathPart] + var fd *os.File + if exists { + fd = fdAndFullSize.fd + } else { + fd, err = os.OpenFile( + filePathPart, + os.O_RDWR|os.O_CREATE, + os.FileMode(0666), + ) + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "") + return nil, err + } + state.fds[filePathPart] = FdAndFullSize{fd: fd} } state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking") if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { state.Ctx.LogE("sp-file", lesp, err, "") - fd.Close() // #nosec G104 + state.closeFd(filePathPart) return nil, err } state.Ctx.LogD("sp-file", lesp, "writing") - _, err = fd.Write(file.Payload) - if err != nil { + if _, err = fd.Write(file.Payload); err != nil { state.Ctx.LogE("sp-file", lesp, err, "") - fd.Close() // #nosec G104 + state.closeFd(filePathPart) return nil, err } ourSize := int64(file.Offset + uint64(len(file.Payload))) @@ -981,7 +1013,6 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { Progress("Rx", lesp) } if fullsize != ourSize { - fd.Close() // #nosec G104 continue } <-spCheckerToken @@ -991,19 +1022,19 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { }() if err := fd.Sync(); err != nil { state.Ctx.LogE("sp-file", lesp, err, "sync") - fd.Close() // #nosec G104 + state.closeFd(filePathPart) return } state.wg.Add(1) defer state.wg.Done() if _, err = fd.Seek(0, io.SeekStart); err != nil { - fd.Close() // #nosec G104 + state.closeFd(filePathPart) state.Ctx.LogE("sp-file", lesp, err, "") return } state.Ctx.LogD("sp-file", lesp, "checking") gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs) - fd.Close() // #nosec G104 + state.closeFd(filePathPart) if err != nil || !gut { state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "") return -- 2.44.0