defer conn.Close()
defer state.SetDead()
defer state.wg.Done()
+ buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
for {
if state.NotAlive() {
return
fdAndFullSize, exists := state.fds[pth]
state.fdsLock.RUnlock()
if !exists {
+ state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
+ return logMsg(les) + ": opening"
+ })
fd, err := os.Open(pth)
if err != nil {
state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
}
fd := fdAndFullSize.fd
fullSize := fdAndFullSize.fullSize
- var buf []byte
+ lesp = append(lesp, LE{"FullSize", fullSize})
+ var bufRead []byte
if freq.Offset < uint64(fullSize) {
state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
return logMsg(les) + ": seeking"
})
return
}
- buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
n, err := fd.Read(buf)
if err != nil {
state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
})
return
}
- buf = buf[:n]
+ bufRead = buf[:n]
lesp = append(
les,
LE{"XX", string(TTx)},
LE{"Pkt", pktName},
LE{"Size", int64(n)},
+ LE{"FullSize", fullSize},
)
state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
return fmt.Sprintf(
logMsg(les), humanize.IBytes(uint64(n)),
)
})
+ } else {
+ state.closeFd(pth)
}
- state.closeFd(pth)
payload = MarshalSP(SPTypeFile, SPFile{
Hash: freq.Hash,
Offset: freq.Offset,
- Payload: buf,
+ Payload: bufRead,
})
- ourSize := freq.Offset + uint64(len(buf))
+ ourSize := freq.Offset + uint64(len(bufRead))
lesp = append(
les,
LE{"XX", string(TTx)},
state.progressBars[pktName] = struct{}{}
Progress("Tx", lesp)
}
+ if ourSize == uint64(fullSize) {
+ state.closeFd(pth)
+ state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
+ return logMsg(les) + ": finished"
+ })
+ if state.Ctx.ShowPrgrs {
+ delete(state.progressBars, pktName)
+ }
+ }
state.Lock()
- if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
+ for i, q := range state.queueTheir {
+ if *q.freq.Hash != *freq.Hash {
+ continue
+ }
if ourSize == uint64(fullSize) {
- state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
- return logMsg(les) + ": finished"
- })
- if len(state.queueTheir) > 1 {
- state.queueTheir = state.queueTheir[1:]
- } else {
- state.queueTheir = state.queueTheir[:0]
- }
- if state.Ctx.ShowPrgrs {
- delete(state.progressBars, pktName)
- }
+ state.queueTheir = append(
+ state.queueTheir[:i],
+ state.queueTheir[i+1:]...,
+ )
} else {
- state.queueTheir[0].freq.Offset += uint64(len(buf))
+ q.freq.Offset = ourSize
}
- } else {
- state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
- return logMsg(les) + ": queue disappeared"
- })
+ break
}
state.Unlock()
}
}
fullsize := int64(0)
state.RLock()
- infoTheir, ok := state.infosTheir[*file.Hash]
+ infoTheir := state.infosTheir[*file.Hash]
state.RUnlock()
- if !ok {
+ if infoTheir == nil {
state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
return logMsg(les) + ": unknown file"
})
}
if hasherAndOffset != nil {
delete(state.fileHashers, filePath)
- if hasherAndOffset.mth.PrependSize() == 0 {
+ if hasherAndOffset.mth.PreaddSize() == 0 {
if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
state.Ctx.LogE(
"sp-file-bad-checksum", lesp,
state.Lock()
delete(state.infosTheir, *file.Hash)
state.Unlock()
- if hasherAndOffset != nil {
- go func() {
- spCheckerTasks <- SPCheckerTask{
- nodeId: state.Node.Id,
- hsh: file.Hash,
- mth: hasherAndOffset.mth,
- done: state.payloads,
- }
- }()
- }
+ go func() {
+ t := SPCheckerTask{
+ nodeId: state.Node.Id,
+ hsh: file.Hash,
+ done: state.payloads,
+ }
+ if hasherAndOffset != nil {
+ t.mth = hasherAndOffset.mth
+ }
+ spCheckerTasks <- t
+ }()
case SPTypeDone:
lesp := append(les, LE{"Type", "done"})