)
const (
- MaxSPSize = 2<<16 - 256
- PartSuffix = ".part"
- DeadlineDuration = 10
+ MaxSPSize = 1<<16 - 256
+ PartSuffix = ".part"
+ DefaultDeadline = 10
)
var (
noise.CipherChaChaPoly,
noise.HashBLAKE2b,
)
+
+ spWorkersGroup sync.WaitGroup
)
type SPType uint8
}
type SPState struct {
- ctx *Ctx
- NodeId *NodeId
- nice uint8
- hs *noise.HandshakeState
- csOur *noise.CipherState
- csTheir *noise.CipherState
- payloads chan []byte
- infosTheir map[[32]byte]*SPInfo
- queueTheir []*SPFreq
- wg sync.WaitGroup
- RxBytes int64
- RxLastSeen time.Time
- TxBytes int64
- TxLastSeen time.Time
- started time.Time
- Duration time.Duration
- RxSpeed int64
- TxSpeed int64
- rxLock *os.File
- txLock *os.File
- xxOnly *TRxTx
+ ctx *Ctx
+ Node *Node
+ onlineDeadline uint
+ maxOnlineTime uint
+ nice uint8
+ hs *noise.HandshakeState
+ csOur *noise.CipherState
+ csTheir *noise.CipherState
+ payloads chan []byte
+ infosTheir map[[32]byte]*SPInfo
+ infosOurSeen map[[32]byte]struct{}
+ queueTheir []*SPFreq
+ wg sync.WaitGroup
+ RxBytes int64
+ RxLastSeen time.Time
+ TxBytes int64
+ TxLastSeen time.Time
+ started time.Time
+ Duration time.Duration
+ RxSpeed int64
+ TxSpeed int64
+ rxLock *os.File
+ txLock *os.File
+ xxOnly TRxTx
+ isDead bool
sync.RWMutex
}
-func (state *SPState) isDead() bool {
+func (state *SPState) NotAlive() bool {
+ if state.isDead {
+ return true
+ }
now := time.Now()
- return now.Sub(state.RxLastSeen).Seconds() >= DeadlineDuration && now.Sub(state.TxLastSeen).Seconds() >= DeadlineDuration
+ if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
+ return true
+ }
+ return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline
}
func (state *SPState) dirUnlock() {
func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
var sp SPRaw
- n, err := xdr.UnmarshalLimited(src, &sp, 2<<17)
+ n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
if err != nil {
return nil, err
}
return sp.Payload, nil
}
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte {
var infos []*SPInfo
var totalSize int64
for job := range ctx.Jobs(nodeId, TTx) {
if job.PktEnc.Nice > nice {
continue
}
+ if _, known := (*seen)[*job.HshValue]; known {
+ continue
+ }
totalSize += job.Size
infos = append(infos, &SPInfo{
Nice: job.PktEnc.Nice,
Size: uint64(job.Size),
Hash: job.HshValue,
})
+ (*seen)[*job.HshValue] = struct{}{}
}
sort.Sort(ByNice(infos))
var payloads [][]byte
"size": strconv.FormatInt(int64(info.Size), 10),
}, "")
}
- ctx.LogI("sp-infos", SDS{
- "xx": string(TTx),
- "node": nodeId,
- "pkts": strconv.Itoa(len(payloads)),
- "size": strconv.FormatInt(totalSize, 10),
- }, "")
+ if totalSize > 0 {
+ ctx.LogI("sp-infos", SDS{
+ "xx": string(TTx),
+ "node": nodeId,
+ "pkts": strconv.Itoa(len(payloads)),
+ "size": strconv.FormatInt(totalSize, 10),
+ }, "")
+ }
return payloadsSplit(payloads)
}
-func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*SPState, error) {
+func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, onlineDeadline, maxOnlineTime uint) (*SPState, error) {
err := ctx.ensureRxDir(nodeId)
if err != nil {
return nil, err
}
var rxLock *os.File
- if xxOnly != nil && *xxOnly == TRx {
+ if xxOnly == "" || xxOnly == TRx {
rxLock, err = ctx.LockDir(nodeId, TRx)
if err != nil {
return nil, err
}
}
var txLock *os.File
- if xxOnly != nil && *xxOnly == TTx {
+ if xxOnly == "" || xxOnly == TTx {
txLock, err = ctx.LockDir(nodeId, TTx)
if err != nil {
return nil, err
}
}
started := time.Now()
+ node := ctx.Neigh[*nodeId]
conf := noise.Config{
CipherSuite: NoiseCipherSuite,
Pattern: noise.HandshakeIK,
Private: ctx.Self.NoisePrv[:],
Public: ctx.Self.NoisePub[:],
},
- PeerStatic: ctx.Neigh[*nodeId].NoisePub[:],
+ PeerStatic: node.NoisePub[:],
}
state := SPState{
- ctx: ctx,
- hs: noise.NewHandshakeState(conf),
- NodeId: nodeId,
- nice: nice,
- payloads: make(chan []byte),
- infosTheir: make(map[[32]byte]*SPInfo),
- started: started,
- rxLock: rxLock,
- txLock: txLock,
- xxOnly: xxOnly,
+ ctx: ctx,
+ hs: noise.NewHandshakeState(conf),
+ Node: node,
+ onlineDeadline: onlineDeadline,
+ maxOnlineTime: maxOnlineTime,
+ nice: nice,
+ payloads: make(chan []byte),
+ infosTheir: make(map[[32]byte]*SPInfo),
+ infosOurSeen: make(map[[32]byte]struct{}),
+ started: started,
+ rxLock: rxLock,
+ txLock: txLock,
+ xxOnly: xxOnly,
}
var infosPayloads [][]byte
- if xxOnly == nil || *xxOnly != TTx {
- infosPayloads = ctx.infosOur(nodeId, nice)
+ if xxOnly == "" || xxOnly == TTx {
+ infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
}
var firstPayload []byte
if len(infosPayloads) > 0 {
buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
ctx.LogD("sp-start", sds, "sending first message")
- conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
+ conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
if err = state.WriteSP(conn, buf); err != nil {
ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
return nil, err
}
ctx.LogD("sp-start", sds, "waiting for first message")
- conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
+ conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
if buf, err = state.ReadSP(conn); err != nil {
ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
return &state, err
}
-func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, error) {
+func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error) {
started := time.Now()
conf := noise.Config{
CipherSuite: NoiseCipherSuite,
},
}
state := SPState{
- ctx: ctx,
- hs: noise.NewHandshakeState(conf),
- nice: nice,
- payloads: make(chan []byte),
- infosTheir: make(map[[32]byte]*SPInfo),
- started: started,
- xxOnly: xxOnly,
+ ctx: ctx,
+ hs: noise.NewHandshakeState(conf),
+ nice: nice,
+ payloads: make(chan []byte),
+ infosOurSeen: make(map[[32]byte]struct{}),
+ infosTheir: make(map[[32]byte]*SPInfo),
+ started: started,
+ xxOnly: xxOnly,
}
var buf []byte
var payload []byte
SDS{"nice": strconv.Itoa(int(nice))},
"waiting for first message",
)
- conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
+ conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
if buf, err = state.ReadSP(conn); err != nil {
ctx.LogE("sp-start", SDS{"err": err}, "")
return nil, err
return nil, err
}
- var nodeId *NodeId
- for _, node := range ctx.Neigh {
+ var node *Node
+ for _, node = range ctx.Neigh {
if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
- nodeId = node.Id
break
}
}
- if nodeId == nil {
+ if node == nil {
peerId := ToBase32(state.hs.PeerStatic())
ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
return nil, errors.New("Unknown peer: " + peerId)
}
- state.NodeId = nodeId
- sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
+ state.Node = node
+ state.onlineDeadline = node.OnlineDeadline
+ state.maxOnlineTime = node.MaxOnlineTime
+ sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))}
- if ctx.ensureRxDir(nodeId); err != nil {
+ if ctx.ensureRxDir(node.Id); err != nil {
return nil, err
}
var rxLock *os.File
- if xxOnly != nil && *xxOnly == TRx {
- rxLock, err = ctx.LockDir(nodeId, TRx)
+ if xxOnly == "" || xxOnly == TRx {
+ rxLock, err = ctx.LockDir(node.Id, TRx)
if err != nil {
return nil, err
}
}
state.rxLock = rxLock
var txLock *os.File
- if xxOnly != nil && *xxOnly == TTx {
- txLock, err = ctx.LockDir(nodeId, TTx)
+ if xxOnly == "" || xxOnly == TTx {
+ txLock, err = ctx.LockDir(node.Id, TTx)
if err != nil {
return nil, err
}
state.txLock = txLock
var infosPayloads [][]byte
- if xxOnly == nil || *xxOnly != TTx {
- infosPayloads = ctx.infosOur(nodeId, nice)
+ if xxOnly == "" || xxOnly == TTx {
+ infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen)
}
var firstPayload []byte
if len(infosPayloads) > 0 {
ctx.LogD("sp-start", sds, "sending first message")
buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
- conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
+ conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
if err = state.WriteSP(conn, buf); err != nil {
ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
}
func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
- sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
+ sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
if len(infosPayloads) > 1 {
go func() {
for _, payload := range infosPayloads[1:] {
state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
return err
}
+
go func() {
for _, reply := range replies {
state.ctx.LogD(
state.payloads <- reply
}
}()
+
+ if state.xxOnly == "" || state.xxOnly == TTx {
+ go func() {
+ for range time.Tick(time.Second) {
+ for _, payload := range state.ctx.infosOur(
+ state.Node.Id,
+ state.nice,
+ &state.infosOurSeen,
+ ) {
+ state.ctx.LogD(
+ "sp-work",
+ SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+ "queuing new info",
+ )
+ state.payloads <- payload
+ }
+ }
+ }()
+ }
+
state.wg.Add(1)
go func() {
- defer state.wg.Done()
+ defer func() {
+ state.isDead = true
+ state.wg.Done()
+ }()
for {
- if state.isDead() {
+ if state.NotAlive() {
return
}
var payload []byte
state.ctx.LogD("sp-file", sdsp, "queueing")
fd, err := os.Open(filepath.Join(
state.ctx.Spool,
- state.NodeId.String(),
+ state.Node.Id.String(),
string(TTx),
ToBase32(freq.Hash[:]),
))
var buf []byte
if freq.Offset < fullSize {
state.ctx.LogD("sp-file", sdsp, "seeking")
- if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
+ if _, err = fd.Seek(int64(freq.Offset), 0); err != nil {
state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
break
}
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"sending",
)
- conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
+ conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
break
}
}
}()
+
state.wg.Add(1)
go func() {
- defer state.wg.Done()
+ defer func() {
+ state.isDead = true
+ state.wg.Done()
+ }()
for {
- if state.isDead() {
+ if state.NotAlive() {
return
}
state.ctx.LogD("sp-recv", sds, "waiting for payload")
- conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
+ conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
payload, err := state.ReadSP(conn)
if err != nil {
unmarshalErr := err.(*xdr.UnmarshalError)
netErr, ok := unmarshalErr.Err.(net.Error)
- if (ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO {
+ if ok && netErr.Timeout() {
continue
- } else {
- state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
+ }
+ if unmarshalErr.ErrorCode == xdr.ErrIO {
break
}
+ state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
+ break
}
state.ctx.LogD(
"sp-recv",
}()
}
}()
+
return nil
}
}
func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
- sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
+ sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
r := bytes.NewReader(payload)
var err error
var replies [][]byte
continue
}
state.ctx.LogD("sp-process", sdsp, "received")
- if state.xxOnly != nil && *state.xxOnly == TTx {
+ if state.xxOnly == TTx {
continue
}
state.Lock()
state.ctx.LogD("sp-process", sdsp, "stating part")
if _, err = os.Stat(filepath.Join(
state.ctx.Spool,
- state.NodeId.String(),
+ state.Node.Id.String(),
string(TRx),
ToBase32(info.Hash[:]),
)); err == nil {
}
fi, err := os.Stat(filepath.Join(
state.ctx.Spool,
- state.NodeId.String(),
+ state.Node.Id.String(),
string(TRx),
ToBase32(info.Hash[:])+PartSuffix,
))
})
filePath := filepath.Join(
state.ctx.Spool,
- state.NodeId.String(),
+ state.Node.Id.String(),
string(TRx),
ToBase32(file.Hash[:]),
)
SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
"seeking",
)
- if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
+ if _, err = fd.Seek(int64(file.Offset), 0); err != nil {
state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
fd.Close()
return nil, err
return nil, err
}
ourSize := uint64(file.Offset) + uint64(len(file.Payload))
+ state.RLock()
sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
state.ctx.LogP("sp-file", sdsp, "")
if state.infosTheir[*file.Hash].Size != ourSize {
+ state.RUnlock()
fd.Close()
continue
}
+ state.RUnlock()
+ spWorkersGroup.Wait()
+ spWorkersGroup.Add(1)
go func() {
if err := fd.Sync(); err != nil {
state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
}
state.wg.Add(1)
defer state.wg.Done()
- fd.Seek(0, io.SeekStart)
+ fd.Seek(0, 0)
state.ctx.LogD("sp-file", sdsp, "checking")
gut, err := Check(fd, file.Hash[:])
fd.Close()
}
state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
os.Rename(filePath+PartSuffix, filePath)
+ state.Lock()
+ delete(state.infosTheir, *file.Hash)
+ state.Unlock()
+ spWorkersGroup.Done()
go func() {
state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
}()
state.ctx.LogD("sp-done", sdsp, "removing")
err := os.Remove(filepath.Join(
state.ctx.Spool,
- state.NodeId.String(),
+ state.Node.Id.String(),
string(TTx),
ToBase32(done.Hash[:]),
))
if infosGot {
var pkts int
var size uint64
+ state.RLock()
for _, info := range state.infosTheir {
pkts++
size += info.Size
}
+ state.RUnlock()
state.ctx.LogI("sp-infos", SDS{
"xx": string(TRx),
- "node": state.NodeId,
+ "node": state.Node.Id,
"pkts": strconv.Itoa(pkts),
"size": strconv.FormatInt(int64(size), 10),
}, "")