]> Cypherpunks.ru repositories - nncp.git/commitdiff
Keep downloading file opened
authorSergey Matveev <stargrave@stargrave.org>
Sat, 20 Feb 2021 09:02:21 +0000 (12:02 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sat, 20 Feb 2021 19:37:04 +0000 (22:37 +0300)
doc/news.ru.texi
doc/news.texi
src/sp.go

index f4df62e64d9f8befb2172db3950d4984b6c9516b..bc42c6e54f5de99de0a7a631ac387277bb9a7182 100644 (file)
 допустимого количества открытых файлов, если у вас было много пакетов в
 spool директории.
 
+@item
+Оптимизация: не закрывать файловый дескриптор файла который мы качаем.
+Прежде каждый его кусочек приводил к дорогим open/close вызовам.
+
 @end itemize
 
 @node Релиз 6.0.0
index 68661dbb3c2b8ac3a322a2ef3c0a9ac68118160b..565f79e0af01e45fe54e3bdf6246792e1cdee7f6 100644 (file)
@@ -12,6 +12,10 @@ Optimization: most commands do not keep opened file descriptors now.
 Previously you can exceed maximal number of opened files if you have got
 many packets in the spool directory.
 
+@item
+Optimization: do not close file descriptor of the file we download
+online. Previously each chunk lead to expensive open/close calls.
+
 @end itemize
 
 @node Release 6.0.0
index b5bad2d5dcd21a16b909c44a8ab29addf6d8a3a2..9410a2d905eb106a6cb2cbb53a7dca8b5895f4c5 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -59,6 +59,11 @@ var (
        spCheckerToken chan struct{}
 )
 
+type FdAndFullSize struct {
+       fd       *os.File
+       fullSize int64
+}
+
 type SPType uint8
 
 const (
@@ -214,6 +219,7 @@ type SPState struct {
        listOnly       bool
        onlyPkts       map[[32]byte]bool
        writeSPBuf     bytes.Buffer
+       fds            map[string]FdAndFullSize
        sync.RWMutex
 }
 
@@ -529,12 +535,21 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        return err
 }
 
+func (state *SPState) closeFd(pth string) {
+       s, exists := state.fds[pth]
+       delete(state.fds, pth)
+       if exists {
+               s.fd.Close()
+       }
+}
+
 func (state *SPState) StartWorkers(
        conn ConnDeadlined,
        infosPayloads [][]byte,
        payload []byte,
 ) error {
        les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
+       state.fds = make(map[string]FdAndFullSize)
        state.isDead = make(chan struct{})
        if state.maxOnlineTime > 0 {
                state.mustFinishAt = state.started.Add(state.maxOnlineTime)
@@ -682,22 +697,29 @@ func (state *SPState) StartWorkers(
                                        {"Size", int64(freq.Offset)},
                                }...)
                                state.Ctx.LogD("sp-file", lesp, "queueing")
-                               fd, err := os.Open(filepath.Join(
+                               pth := filepath.Join(
                                        state.Ctx.Spool,
                                        state.Node.Id.String(),
                                        string(TTx),
                                        Base32Codec.EncodeToString(freq.Hash[:]),
-                               ))
-                               if err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "")
-                                       return
-                               }
-                               fi, err := fd.Stat()
-                               if err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "")
-                                       return
+                               )
+                               fdAndFullSize, exists := state.fds[pth]
+                               if !exists {
+                                       fd, err := os.Open(pth)
+                                       if err != nil {
+                                               state.Ctx.LogE("sp-file", lesp, err, "")
+                                               return
+                                       }
+                                       fi, err := fd.Stat()
+                                       if err != nil {
+                                               state.Ctx.LogE("sp-file", lesp, err, "")
+                                               return
+                                       }
+                                       fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+                                       state.fds[pth] = fdAndFullSize
                                }
-                               fullSize := fi.Size()
+                               fd := fdAndFullSize.fd
+                               fullSize := fdAndFullSize.fullSize
                                var buf []byte
                                if freq.Offset < uint64(fullSize) {
                                        state.Ctx.LogD("sp-file", lesp, "seeking")
@@ -714,7 +736,7 @@ func (state *SPState) StartWorkers(
                                        buf = buf[:n]
                                        state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
                                }
-                               fd.Close() // #nosec G104
+                               state.closeFd(pth)
                                payload = MarshalSP(SPTypeFile, SPFile{
                                        Hash:    freq.Hash,
                                        Offset:  freq.Offset,
@@ -816,6 +838,9 @@ func (state *SPState) StartWorkers(
                state.wg.Done()
                state.SetDead()
                conn.Close() // #nosec G104
+               for _, s := range state.fds {
+                       s.fd.Close()
+               }
        }()
 
        return nil
@@ -944,27 +969,34 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                string(TRx),
                        )
                        filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
+                       filePathPart := filePath + PartSuffix
                        state.Ctx.LogD("sp-file", lesp, "opening part")
-                       fd, err := os.OpenFile(
-                               filePath+PartSuffix,
-                               os.O_RDWR|os.O_CREATE,
-                               os.FileMode(0666),
-                       )
-                       if err != nil {
-                               state.Ctx.LogE("sp-file", lesp, err, "")
-                               return nil, err
+                       fdAndFullSize, exists := state.fds[filePathPart]
+                       var fd *os.File
+                       if exists {
+                               fd = fdAndFullSize.fd
+                       } else {
+                               fd, err = os.OpenFile(
+                                       filePathPart,
+                                       os.O_RDWR|os.O_CREATE,
+                                       os.FileMode(0666),
+                               )
+                               if err != nil {
+                                       state.Ctx.LogE("sp-file", lesp, err, "")
+                                       return nil, err
+                               }
+                               state.fds[filePathPart] = FdAndFullSize{fd: fd}
                        }
                        state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
                        if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
                                state.Ctx.LogE("sp-file", lesp, err, "")
-                               fd.Close() // #nosec G104
+                               state.closeFd(filePathPart)
                                return nil, err
                        }
                        state.Ctx.LogD("sp-file", lesp, "writing")
-                       _, err = fd.Write(file.Payload)
-                       if err != nil {
+                       if _, err = fd.Write(file.Payload); err != nil {
                                state.Ctx.LogE("sp-file", lesp, err, "")
-                               fd.Close() // #nosec G104
+                               state.closeFd(filePathPart)
                                return nil, err
                        }
                        ourSize := int64(file.Offset + uint64(len(file.Payload)))
@@ -981,7 +1013,6 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                Progress("Rx", lesp)
                        }
                        if fullsize != ourSize {
-                               fd.Close() // #nosec G104
                                continue
                        }
                        <-spCheckerToken
@@ -991,19 +1022,19 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                }()
                                if err := fd.Sync(); err != nil {
                                        state.Ctx.LogE("sp-file", lesp, err, "sync")
-                                       fd.Close() // #nosec G104
+                                       state.closeFd(filePathPart)
                                        return
                                }
                                state.wg.Add(1)
                                defer state.wg.Done()
                                if _, err = fd.Seek(0, io.SeekStart); err != nil {
-                                       fd.Close() // #nosec G104
+                                       state.closeFd(filePathPart)
                                        state.Ctx.LogE("sp-file", lesp, err, "")
                                        return
                                }
                                state.Ctx.LogD("sp-file", lesp, "checking")
                                gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
-                               fd.Close() // #nosec G104
+                               state.closeFd(filePathPart)
                                if err != nil || !gut {
                                        state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
                                        return