)
const (
- MaxLLPSize = 2<<15 - 256
- PartSuffix = ".part"
+ MaxLLPSize = 2<<15 - 256
+ PartSuffix = ".part"
+ DeadlineDuration = 10
)
var (
noise.CipherChaChaPoly,
noise.HashBLAKE2b,
)
-
- DeadlineDuration time.Duration = 10 * time.Second
)
type LLPType uint8
payloads chan []byte
infosTheir map[[32]byte]*LLPInfo
queueTheir []*LLPFreq
- isDead bool
wg sync.WaitGroup
RxBytes int64
RxLastSeen time.Time
sync.RWMutex
}
+func (state *LLPState) isDead() bool {
+ now := time.Now()
+ return now.Sub(state.RxLastSeen).Seconds() >= DeadlineDuration && now.Sub(state.TxLastSeen).Seconds() >= DeadlineDuration
+}
+
func (state *LLPState) dirUnlock() {
state.ctx.UnlockDir(state.rxLock)
state.ctx.UnlockDir(state.txLock)
buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
ctx.LogD("llp-start", sds, "sending first message")
- conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
+ conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
if err = state.WriteLLP(conn, buf); err != nil {
ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
return nil, err
}
ctx.LogD("llp-start", sds, "waiting for first message")
- conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
+ conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
if buf, err = state.ReadLLP(conn); err != nil {
ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
SDS{"nice": strconv.Itoa(int(nice))},
"waiting for first message",
)
- conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
+ conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
if buf, err = state.ReadLLP(conn); err != nil {
ctx.LogE("llp-start", SDS{"err": err}, "")
return nil, err
ctx.LogD("llp-start", sds, "sending first message")
buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
- conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
+ conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
if err = state.WriteLLP(conn, buf); err != nil {
ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
go func() {
defer state.wg.Done()
for {
- if state.isDead {
+ if state.isDead() {
return
}
var payload []byte
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"sending",
)
- conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
+ conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
if err := state.WriteLLP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
state.ctx.LogE("llp-xmit", SdsAdd(sds, SDS{"err": err}), "")
break
}
}
- state.isDead = true
}()
state.wg.Add(1)
go func() {
defer state.wg.Done()
for {
- if state.isDead {
+ if state.isDead() {
return
}
state.ctx.LogD("llp-recv", sds, "waiting for payload")
- conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
+ conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
payload, err := state.ReadLLP(conn)
if err != nil {
unmarshalErr := err.(*xdr.UnmarshalError)
netErr, ok := unmarshalErr.Err.(net.Error)
- if !((ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO) {
+ if (ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO {
+ continue
+ } else {
state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
+ break
}
- break
}
state.ctx.LogD(
"llp-recv",
}
}()
}
- state.isDead = true
}()
return nil
}
state.infosTheir[*info.Hash] = &info
state.Unlock()
state.ctx.LogD("llp-process", sdsp, "stating part")
+ if _, err = os.Stat(filepath.Join(
+ state.ctx.Spool,
+ state.NodeId.String(),
+ string(TRx),
+ ToBase32(info.Hash[:]),
+ )); err == nil {
+ state.ctx.LogD("llp-process", sdsp, "already done")
+ replies = append(replies, MarshalLLP(LLPTypeDone, LLPDone{info.Hash}))
+ continue
+ }
fi, err := os.Stat(filepath.Join(
state.ctx.Spool,
state.NodeId.String(),
fd.Close()
return
}
+ state.wg.Add(1)
+ defer state.wg.Done()
fd.Seek(0, 0)
state.ctx.LogD("llp-file", sdsp, "checking")
gut, err := Check(fd, file.Hash[:])