spCheckerToken chan struct{}
)
+type FdAndFullSize struct {
+ fd *os.File
+ fullSize int64
+}
+
type SPType uint8
const (
listOnly bool
onlyPkts map[[32]byte]bool
writeSPBuf bytes.Buffer
+ fds map[string]FdAndFullSize
sync.RWMutex
}
return err
}
+func (state *SPState) closeFd(pth string) {
+ s, exists := state.fds[pth]
+ delete(state.fds, pth)
+ if exists {
+ s.fd.Close()
+ }
+}
+
func (state *SPState) StartWorkers(
conn ConnDeadlined,
infosPayloads [][]byte,
payload []byte,
) error {
les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
+ state.fds = make(map[string]FdAndFullSize)
state.isDead = make(chan struct{})
if state.maxOnlineTime > 0 {
state.mustFinishAt = state.started.Add(state.maxOnlineTime)
{"Size", int64(freq.Offset)},
}...)
state.Ctx.LogD("sp-file", lesp, "queueing")
- fd, err := os.Open(filepath.Join(
+ pth := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
string(TTx),
Base32Codec.EncodeToString(freq.Hash[:]),
- ))
- if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
- return
- }
- fi, err := fd.Stat()
- if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
- return
+ )
+ fdAndFullSize, exists := state.fds[pth]
+ if !exists {
+ fd, err := os.Open(pth)
+ if err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "")
+ return
+ }
+ fi, err := fd.Stat()
+ if err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "")
+ return
+ }
+ fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+ state.fds[pth] = fdAndFullSize
}
- fullSize := fi.Size()
+ fd := fdAndFullSize.fd
+ fullSize := fdAndFullSize.fullSize
var buf []byte
if freq.Offset < uint64(fullSize) {
state.Ctx.LogD("sp-file", lesp, "seeking")
buf = buf[:n]
state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
}
- fd.Close() // #nosec G104
+ state.closeFd(pth)
payload = MarshalSP(SPTypeFile, SPFile{
Hash: freq.Hash,
Offset: freq.Offset,
state.wg.Done()
state.SetDead()
conn.Close() // #nosec G104
+ for _, s := range state.fds {
+ s.fd.Close()
+ }
}()
return nil
string(TRx),
)
filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
+ filePathPart := filePath + PartSuffix
state.Ctx.LogD("sp-file", lesp, "opening part")
- fd, err := os.OpenFile(
- filePath+PartSuffix,
- os.O_RDWR|os.O_CREATE,
- os.FileMode(0666),
- )
- if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
- return nil, err
+ fdAndFullSize, exists := state.fds[filePathPart]
+ var fd *os.File
+ if exists {
+ fd = fdAndFullSize.fd
+ } else {
+ fd, err = os.OpenFile(
+ filePathPart,
+ os.O_RDWR|os.O_CREATE,
+ os.FileMode(0666),
+ )
+ if err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "")
+ return nil, err
+ }
+ state.fds[filePathPart] = FdAndFullSize{fd: fd}
}
state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
state.Ctx.LogE("sp-file", lesp, err, "")
- fd.Close() // #nosec G104
+ state.closeFd(filePathPart)
return nil, err
}
state.Ctx.LogD("sp-file", lesp, "writing")
- _, err = fd.Write(file.Payload)
- if err != nil {
+ if _, err = fd.Write(file.Payload); err != nil {
state.Ctx.LogE("sp-file", lesp, err, "")
- fd.Close() // #nosec G104
+ state.closeFd(filePathPart)
return nil, err
}
ourSize := int64(file.Offset + uint64(len(file.Payload)))
Progress("Rx", lesp)
}
if fullsize != ourSize {
- fd.Close() // #nosec G104
continue
}
<-spCheckerToken
}()
if err := fd.Sync(); err != nil {
state.Ctx.LogE("sp-file", lesp, err, "sync")
- fd.Close() // #nosec G104
+ state.closeFd(filePathPart)
return
}
state.wg.Add(1)
defer state.wg.Done()
if _, err = fd.Seek(0, io.SeekStart); err != nil {
- fd.Close() // #nosec G104
+ state.closeFd(filePathPart)
state.Ctx.LogE("sp-file", lesp, err, "")
return
}
state.Ctx.LogD("sp-file", lesp, "checking")
gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
- fd.Close() // #nosec G104
+ state.closeFd(filePathPart)
if err != nil || !gut {
state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
return