DefaultDeadline = 10 * time.Second
PingTimeout = time.Minute
- spCheckers = make(map[NodeId]*SPCheckerQueues)
+ spCheckers = make(map[NodeId]*SPCheckerQueues)
+ SPCheckersWg sync.WaitGroup
)
type FdAndFullSize struct {
onlyPkts map[[32]byte]bool
writeSPBuf bytes.Buffer
fds map[string]FdAndFullSize
+ fdsLock sync.RWMutex
fileHashers map[string]*HasherAndOffset
checkerQueues SPCheckerQueues
+ progressBars map[string]struct{}
sync.RWMutex
}
for range state.pings {
}
}()
- go func() {
- for _, s := range state.fds {
- s.fd.Close()
- }
- }()
}
func (state *SPState) NotAlive() bool {
{"Node", nodeId},
{"Pkt", pktName},
}
+ SPCheckersWg.Add(1)
ctx.LogD("sp-checker", les, func(les LEs) string {
return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
})
pktName, humanize.IBytes(uint64(size)),
)
})
+ SPCheckersWg.Done()
go func(hsh *[32]byte) { checked <- hsh }(hshValue)
}
}
state.pings = make(chan struct{})
state.infosTheir = make(map[[32]byte]*SPInfo)
state.infosOurSeen = make(map[[32]byte]uint8)
+ state.progressBars = make(map[string]struct{})
state.started = started
state.rxLock = rxLock
state.txLock = txLock
state.pings = make(chan struct{})
state.infosOurSeen = make(map[[32]byte]uint8)
state.infosTheir = make(map[[32]byte]*SPInfo)
+ state.progressBars = make(map[string]struct{})
state.started = started
state.xxOnly = xxOnly
}
func (state *SPState) closeFd(pth string) {
- s, exists := state.fds[pth]
- delete(state.fds, pth)
- if exists {
+ state.fdsLock.Lock()
+ if s, exists := state.fds[pth]; exists {
+ delete(state.fds, pth)
s.fd.Close()
}
+ state.fdsLock.Unlock()
}
func (state *SPState) StartWorkers(
for _, payload := range infosPayloads[1:] {
state.Ctx.LogD(
"sp-queue-remaining",
- append(les, LE{"Size", len(payload)}),
+ append(les, LE{"Size", int64(len(payload))}),
func(les LEs) string {
return fmt.Sprintf(
"SP with %s (nice %s): queuing remaining payload (%s)",
humanize.IBytes(uint64(len(payload))),
)
}
- state.Ctx.LogD("sp-process", append(les, LE{"Size", len(payload)}), logMsg)
+ state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
replies, err := state.ProcessSP(payload)
if err != nil {
state.Ctx.LogE("sp-process", les, err, logMsg)
for _, reply := range replies {
state.Ctx.LogD(
"sp-queue-reply",
- append(les, LE{"Size", len(reply)}),
+ append(les, LE{"Size", int64(len(reply))}),
func(les LEs) string {
return fmt.Sprintf(
"SP with %s (nice %s): queuing reply (%s)",
pingTicker.Stop()
return
case now := <-deadlineTicker.C:
- if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
- now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
- (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
- (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
- state.SetDead()
- conn.Close() // #nosec G104
+ if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
+ now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
+ goto Deadlined
+ }
+ if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
+ goto Deadlined
}
+ if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
+ goto Deadlined
+ }
+ break
+ Deadlined:
+ state.SetDead()
+ conn.Close() // #nosec G104
case now := <-pingTicker.C:
if now.After(state.TxLastSeen.Add(PingTimeout)) {
state.wg.Add(1)
) {
state.Ctx.LogD(
"sp-queue-info",
- append(les, LE{"Size", len(payload)}),
+ append(les, LE{"Size", int64(len(payload))}),
func(les LEs) string {
return fmt.Sprintf(
"SP with %s (nice %s): queuing new info (%s)",
case payload = <-state.payloads:
state.Ctx.LogD(
"sp-got-payload",
- append(les, LE{"Size", len(payload)}),
+ append(les, LE{"Size", int64(len(payload))}),
func(les LEs) string {
return fmt.Sprintf(
"SP with %s (nice %s): got payload (%s)",
string(TTx),
Base32Codec.EncodeToString(freq.Hash[:]),
)
+ state.fdsLock.RLock()
fdAndFullSize, exists := state.fds[pth]
+ state.fdsLock.RUnlock()
if !exists {
fd, err := os.Open(pth)
if err != nil {
return
}
fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+ state.fdsLock.Lock()
state.fds[pth] = fdAndFullSize
+ state.fdsLock.Unlock()
}
fd := fdAndFullSize.fd
fullSize := fdAndFullSize.fullSize
return
}
buf = buf[:n]
- state.Ctx.LogD(
- "sp-file-read",
- append(lesp, LE{"Size", n}),
- func(les LEs) string {
- return fmt.Sprintf(
- "%s: read %s",
- logMsg(les), humanize.IBytes(uint64(n)),
- )
- },
+ lesp = append(
+ les,
+ LE{"XX", string(TTx)},
+ LE{"Pkt", pktName},
+ LE{"Size", int64(n)},
)
+ state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
+ return fmt.Sprintf(
+ "%s: read %s",
+ logMsg(les), humanize.IBytes(uint64(n)),
+ )
+ })
}
state.closeFd(pth)
payload = MarshalSP(SPTypeFile, SPFile{
Payload: buf,
})
ourSize := freq.Offset + uint64(len(buf))
- lesp = append(lesp, LE{"Size", int64(ourSize)}, LE{"FullSize", fullSize})
+ lesp = append(
+ les,
+ LE{"XX", string(TTx)},
+ LE{"Pkt", pktName},
+ LE{"Size", int64(ourSize)},
+ LE{"FullSize", fullSize},
+ )
if state.Ctx.ShowPrgrs {
+ state.progressBars[pktName] = struct{}{}
Progress("Tx", lesp)
}
state.Lock()
} else {
state.queueTheir = state.queueTheir[:0]
}
+ if state.Ctx.ShowPrgrs {
+ delete(state.progressBars, pktName)
+ }
} else {
state.queueTheir[0].freq.Offset += uint64(len(buf))
}
humanize.IBytes(uint64(len(payload))),
)
}
- state.Ctx.LogD("sp-sending", append(les, LE{"Size", len(payload)}), logMsg)
+ state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
state.Ctx.LogE("sp-sending", les, err, logMsg)
}
state.Ctx.LogD(
"sp-recv-got",
- append(les, LE{"Size", len(payload)}),
+ append(les, LE{"Size", int64(len(payload))}),
func(les LEs) string { return logMsg(les) + ": got" },
)
payload, err = state.csTheir.Decrypt(nil, nil, payload)
}
state.Ctx.LogD(
"sp-recv-process",
- append(les, LE{"Size", len(payload)}),
+ append(les, LE{"Size", int64(len(payload))}),
func(les LEs) string {
return logMsg(les) + ": processing"
},
for _, reply := range replies {
state.Ctx.LogD(
"sp-recv-reply",
- append(les[:len(les)-1], LE{"Size", len(reply)}),
+ append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
func(les LEs) string {
return fmt.Sprintf(
"SP with %s (nice %s): queuing reply (%s)",
state.wg.Wait()
close(state.payloads)
close(state.pings)
- state.dirUnlock()
state.Duration = time.Now().Sub(state.started)
+ SPCheckersWg.Wait()
+ state.dirUnlock()
state.RxSpeed = state.RxBytes
state.TxSpeed = state.TxBytes
rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
if txDuration > 0 {
state.TxSpeed = state.TxBytes / txDuration
}
+ for _, s := range state.fds {
+ s.fd.Close()
+ }
+ for pktName := range state.progressBars {
+ ProgressKill(pktName)
+ }
}
func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
lesp,
LE{"XX", string(TRx)},
LE{"Pkt", pktName},
- LE{"Size", len(file.Payload)},
+ LE{"Size", int64(len(file.Payload))},
)
logMsg := func(les LEs) string {
return fmt.Sprintf(
state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
return logMsg(les) + ": opening part"
})
+ state.fdsLock.RLock()
fdAndFullSize, exists := state.fds[filePathPart]
+ state.fdsLock.RUnlock()
var fd *os.File
if exists {
fd = fdAndFullSize.fd
})
return nil, err
}
+ state.fdsLock.Lock()
state.fds[filePathPart] = FdAndFullSize{fd: fd}
+ state.fdsLock.Unlock()
if file.Offset == 0 {
h, err := blake2b.New256(nil)
if err != nil {
}
lesp = append(lesp, LE{"FullSize", fullsize})
if state.Ctx.ShowPrgrs {
+ state.progressBars[pktName] = struct{}{}
Progress("Rx", lesp)
}
if fullsize != ourSize {
continue
}
+ if state.Ctx.ShowPrgrs {
+ delete(state.progressBars, pktName)
+ }
logMsg = func(les LEs) string {
return fmt.Sprintf(
"Got packet %s %d%% (%s / %s)",