onlyPkts map[[32]byte]bool
writeSPBuf bytes.Buffer
fds map[string]FdAndFullSize
+ fdsLock sync.RWMutex
fileHashers map[string]*HasherAndOffset
checkerQueues SPCheckerQueues
sync.RWMutex
}
func (state *SPState) closeFd(pth string) {
- s, exists := state.fds[pth]
- delete(state.fds, pth)
- if exists {
+ state.fdsLock.Lock()
+ if s, exists := state.fds[pth]; exists {
+ delete(state.fds, pth)
s.fd.Close()
}
+ state.fdsLock.Unlock()
}
func (state *SPState) StartWorkers(
for _, payload := range infosPayloads[1:] {
state.Ctx.LogD(
"sp-queue-remaining",
- append(les, LE{"Size", len(payload)}),
+ append(les, LE{"Size", int64(len(payload))}),
func(les LEs) string {
return fmt.Sprintf(
"SP with %s (nice %s): queuing remaining payload (%s)",
humanize.IBytes(uint64(len(payload))),
)
}
- state.Ctx.LogD("sp-process", append(les, LE{"Size", len(payload)}), logMsg)
+ state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
replies, err := state.ProcessSP(payload)
if err != nil {
state.Ctx.LogE("sp-process", les, err, logMsg)
for _, reply := range replies {
state.Ctx.LogD(
"sp-queue-reply",
- append(les, LE{"Size", len(reply)}),
+ append(les, LE{"Size", int64(len(reply))}),
func(les LEs) string {
return fmt.Sprintf(
"SP with %s (nice %s): queuing reply (%s)",
) {
state.Ctx.LogD(
"sp-queue-info",
- append(les, LE{"Size", len(payload)}),
+ append(les, LE{"Size", int64(len(payload))}),
func(les LEs) string {
return fmt.Sprintf(
"SP with %s (nice %s): queuing new info (%s)",
case payload = <-state.payloads:
state.Ctx.LogD(
"sp-got-payload",
- append(les, LE{"Size", len(payload)}),
+ append(les, LE{"Size", int64(len(payload))}),
func(les LEs) string {
return fmt.Sprintf(
"SP with %s (nice %s): got payload (%s)",
string(TTx),
Base32Codec.EncodeToString(freq.Hash[:]),
)
+ state.fdsLock.RLock()
fdAndFullSize, exists := state.fds[pth]
+ state.fdsLock.RUnlock()
if !exists {
fd, err := os.Open(pth)
if err != nil {
return
}
fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+ state.fdsLock.Lock()
state.fds[pth] = fdAndFullSize
+ state.fdsLock.Unlock()
}
fd := fdAndFullSize.fd
fullSize := fdAndFullSize.fullSize
return
}
buf = buf[:n]
- state.Ctx.LogD(
- "sp-file-read",
- append(lesp, LE{"Size", n}),
- func(les LEs) string {
- return fmt.Sprintf(
- "%s: read %s",
- logMsg(les), humanize.IBytes(uint64(n)),
- )
- },
+ lesp = append(
+ les,
+ LE{"XX", string(TTx)},
+ LE{"Pkt", pktName},
+ LE{"Size", int64(n)},
)
+ state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
+ return fmt.Sprintf(
+ "%s: read %s",
+ logMsg(les), humanize.IBytes(uint64(n)),
+ )
+ })
}
state.closeFd(pth)
payload = MarshalSP(SPTypeFile, SPFile{
Payload: buf,
})
ourSize := freq.Offset + uint64(len(buf))
- lesp = append(lesp, LE{"Size", int64(ourSize)}, LE{"FullSize", fullSize})
+ lesp = append(
+ les,
+ LE{"XX", string(TTx)},
+ LE{"Pkt", pktName},
+ LE{"Size", int64(ourSize)},
+ LE{"FullSize", fullSize},
+ )
if state.Ctx.ShowPrgrs {
Progress("Tx", lesp)
}
humanize.IBytes(uint64(len(payload))),
)
}
- state.Ctx.LogD("sp-sending", append(les, LE{"Size", len(payload)}), logMsg)
+ state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
state.Ctx.LogE("sp-sending", les, err, logMsg)
}
state.Ctx.LogD(
"sp-recv-got",
- append(les, LE{"Size", len(payload)}),
+ append(les, LE{"Size", int64(len(payload))}),
func(les LEs) string { return logMsg(les) + ": got" },
)
payload, err = state.csTheir.Decrypt(nil, nil, payload)
}
state.Ctx.LogD(
"sp-recv-process",
- append(les, LE{"Size", len(payload)}),
+ append(les, LE{"Size", int64(len(payload))}),
func(les LEs) string {
return logMsg(les) + ": processing"
},
for _, reply := range replies {
state.Ctx.LogD(
"sp-recv-reply",
- append(les[:len(les)-1], LE{"Size", len(reply)}),
+ append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
func(les LEs) string {
return fmt.Sprintf(
"SP with %s (nice %s): queuing reply (%s)",
lesp,
LE{"XX", string(TRx)},
LE{"Pkt", pktName},
- LE{"Size", len(file.Payload)},
+ LE{"Size", int64(len(file.Payload))},
)
logMsg := func(les LEs) string {
return fmt.Sprintf(
state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
return logMsg(les) + ": opening part"
})
+ state.fdsLock.RLock()
fdAndFullSize, exists := state.fds[filePathPart]
+ state.fdsLock.RUnlock()
var fd *os.File
if exists {
fd = fdAndFullSize.fd
})
return nil, err
}
+ state.fdsLock.Lock()
state.fds[filePathPart] = FdAndFullSize{fd: fd}
+ state.fdsLock.Unlock()
if file.Offset == 0 {
h, err := blake2b.New256(nil)
if err != nil {