PartSuffix = ".part"
DefaultDeadline = 10
- SPHeadOverhead = 4
+ SPHeadOverhead = 4
)
var (
xxOnly TRxTx
rxRate int
txRate int
- isDead bool
+ isDead chan struct{}
listOnly bool
onlyPkts map[[32]byte]bool
sync.RWMutex
}
+func (state *SPState) SetDead() {
+ state.Lock()
+ defer state.Unlock()
+ select {
+ case _, ok := <-state.isDead:
+ if !ok {
+ // Already closed channel, dead
+ return
+ }
+ default:
+ }
+ close(state.isDead)
+ go func() {
+ for _ = range state.payloads {
+ }
+ }()
+}
+
func (state *SPState) NotAlive() bool {
- if state.isDead {
- return true
+ select {
+ case _, ok := <-state.isDead:
+ if !ok {
+ return true
+ }
+ default:
}
now := time.Now()
if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
func (state *SPState) StartWorkers(
conn ConnDeadlined,
infosPayloads [][]byte,
- payload []byte) error {
+ payload []byte,
+) error {
+ state.isDead = make(chan struct{})
sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
+
if len(infosPayloads) > 1 {
+ state.wg.Add(1)
go func() {
for _, payload := range infosPayloads[1:] {
state.Ctx.LogD(
)
state.payloads <- payload
}
+ state.wg.Done()
}()
}
state.Ctx.LogD(
return err
}
+ state.wg.Add(1)
go func() {
for _, reply := range replies {
state.Ctx.LogD(
)
state.payloads <- reply
}
+ state.wg.Done()
}()
if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
+ state.wg.Add(1)
go func() {
- for range time.Tick(time.Second) {
- if state.NotAlive() {
- return
- }
- for _, payload := range state.Ctx.infosOur(
- state.Node.Id,
- state.Nice,
- &state.infosOurSeen,
- ) {
- state.Ctx.LogD(
- "sp-work",
- SdsAdd(sds, SDS{"size": len(payload)}),
- "queuing new info",
- )
- state.payloads <- payload
+ ticker := time.NewTicker(time.Second)
+ for {
+ select {
+ case _, ok := <-state.isDead:
+ if !ok {
+ state.wg.Done()
+ ticker.Stop()
+ return
+ }
+ case <-ticker.C:
+ for _, payload := range state.Ctx.infosOur(
+ state.Node.Id,
+ state.Nice,
+ &state.infosOurSeen,
+ ) {
+ state.Ctx.LogD(
+ "sp-work",
+ SdsAdd(sds, SDS{"size": len(payload)}),
+ "queuing new info",
+ )
+ state.payloads <- payload
+ }
}
}
}()
state.wg.Add(1)
go func() {
- defer func() {
- state.isDead = true
- state.wg.Done()
- }()
for {
if state.NotAlive() {
- return
+ break
}
var payload []byte
select {
break
}
}
+ state.SetDead()
+ state.wg.Done()
}()
state.wg.Add(1)
go func() {
- defer func() {
- state.isDead = true
- state.wg.Done()
- }()
for {
if state.NotAlive() {
- return
+ break
}
state.Ctx.LogD("sp-recv", sds, "waiting for payload")
conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
state.Ctx.LogE("sp-recv", sds, err, "")
break
}
+ state.wg.Add(1)
go func() {
for _, reply := range replies {
state.Ctx.LogD(
)
state.payloads <- reply
}
+ state.wg.Done()
}()
if state.rxRate > 0 {
time.Sleep(time.Second / time.Duration(state.rxRate))
}
}
+ state.SetDead()
+ state.wg.Done()
}()
return nil
func (state *SPState) Wait() {
state.wg.Wait()
+ close(state.payloads)
state.dirUnlock()
state.Duration = time.Now().Sub(state.started)
state.RxSpeed = state.RxBytes
delete(state.infosTheir, *file.Hash)
state.Unlock()
spWorkersGroup.Done()
+ state.wg.Add(1)
go func() {
state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
+ state.wg.Done()
}()
}()
case SPTypeDone: