/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
noise.CipherChaChaPoly,
noise.HashBLAKE2b,
)
+
+ spWorkersGroup sync.WaitGroup
)
type SPType uint8
Payload []byte
}
+type FreqWithNice struct {
+ freq *SPFreq
+ nice uint8
+}
+
func init() {
var buf bytes.Buffer
spHead := SPHead{Type: SPTypeHalt}
csTheir *noise.CipherState
payloads chan []byte
infosTheir map[[32]byte]*SPInfo
- infosOurSeen map[[32]byte]struct{}
- queueTheir []*SPFreq
+ infosOurSeen map[[32]byte]uint8
+ queueTheir []*FreqWithNice
wg sync.WaitGroup
RxBytes int64
RxLastSeen time.Time
return sp.Payload, nil
}
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
var infos []*SPInfo
var totalSize int64
for job := range ctx.Jobs(nodeId, TTx) {
Size: uint64(job.Size),
Hash: job.HshValue,
})
- (*seen)[*job.HshValue] = struct{}{}
+ (*seen)[*job.HshValue] = job.PktEnc.Nice
}
sort.Sort(ByNice(infos))
var payloads [][]byte
},
PeerStatic: node.NoisePub[:],
}
+ hs, err := noise.NewHandshakeState(conf)
+ if err != nil {
+ return nil, err
+ }
state := SPState{
ctx: ctx,
- hs: noise.NewHandshakeState(conf),
+ hs: hs,
Node: node,
onlineDeadline: onlineDeadline,
maxOnlineTime: maxOnlineTime,
nice: nice,
payloads: make(chan []byte),
infosTheir: make(map[[32]byte]*SPInfo),
- infosOurSeen: make(map[[32]byte]struct{}),
+ infosOurSeen: make(map[[32]byte]uint8),
started: started,
rxLock: rxLock,
txLock: txLock,
var buf []byte
var payload []byte
- buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
+ buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
+ if err != nil {
+ state.dirUnlock()
+ return nil, err
+ }
sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
ctx.LogD("sp-start", sds, "sending first message")
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
Public: ctx.Self.NoisePub[:],
},
}
+ hs, err := noise.NewHandshakeState(conf)
+ if err != nil {
+ return nil, err
+ }
state := SPState{
ctx: ctx,
- hs: noise.NewHandshakeState(conf),
+ hs: hs,
nice: nice,
payloads: make(chan []byte),
- infosOurSeen: make(map[[32]byte]struct{}),
+ infosOurSeen: make(map[[32]byte]uint8),
infosTheir: make(map[[32]byte]*SPInfo),
started: started,
xxOnly: xxOnly,
}
var buf []byte
var payload []byte
- var err error
ctx.LogD(
"sp-start",
SDS{"nice": strconv.Itoa(int(nice))},
}
ctx.LogD("sp-start", sds, "sending first message")
- buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
+ buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
+ if err != nil {
+ state.dirUnlock()
+ return nil, err
+ }
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
if err = state.WriteSP(conn, buf); err != nil {
ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
time.Sleep(100 * time.Millisecond)
continue
}
- freq := state.queueTheir[0]
+ freq := state.queueTheir[0].freq
state.RUnlock()
sdsp := SdsAdd(sds, SDS{
"xx": string(TTx),
sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
state.ctx.LogP("sp-file", sdsp, "")
state.Lock()
- if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
+ if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
if ourSize == fullSize {
state.ctx.LogD("sp-file", sdsp, "finished")
if len(state.queueTheir) > 1 {
state.queueTheir = state.queueTheir[:0]
}
} else {
- state.queueTheir[0].Offset += uint64(len(buf))
+ state.queueTheir[0].freq.Offset += uint64(len(buf))
}
} else {
state.ctx.LogD("sp-file", sdsp, "queue disappeared")
state.infosTheir[*info.Hash] = &info
state.Unlock()
state.ctx.LogD("sp-process", sdsp, "stating part")
- if _, err = os.Stat(filepath.Join(
+ pktPath := filepath.Join(
state.ctx.Spool,
state.Node.Id.String(),
string(TRx),
ToBase32(info.Hash[:]),
- )); err == nil {
+ )
+ if _, err = os.Stat(pktPath); err == nil {
state.ctx.LogD("sp-process", sdsp, "already done")
replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
continue
}
- fi, err := os.Stat(filepath.Join(
- state.ctx.Spool,
- state.Node.Id.String(),
- string(TRx),
- ToBase32(info.Hash[:])+PartSuffix,
- ))
+ if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
+ state.ctx.LogD("sp-process", sdsp, "already seen")
+ replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
+ continue
+ }
+ fi, err := os.Stat(pktPath + PartSuffix)
var offset int64
if err == nil {
offset = fi.Size()
continue
}
state.RUnlock()
+ spWorkersGroup.Wait()
+ spWorkersGroup.Add(1)
go func() {
if err := fd.Sync(); err != nil {
state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
state.Lock()
delete(state.infosTheir, *file.Hash)
state.Unlock()
+ spWorkersGroup.Done()
go func() {
state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
}()
"hash": ToBase32(freq.Hash[:]),
"offset": strconv.FormatInt(int64(freq.Offset), 10),
}), "queueing")
- state.Lock()
- state.queueTheir = append(state.queueTheir, &freq)
- state.Unlock()
+ nice, exists := state.infosOurSeen[*freq.Hash]
+ if exists {
+ state.Lock()
+ insertIdx := 0
+ var freqWithNice *FreqWithNice
+ for insertIdx, freqWithNice = range state.queueTheir {
+ if freqWithNice.nice > nice {
+ break
+ }
+ }
+ state.queueTheir = append(state.queueTheir, nil)
+ copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
+ state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
+ state.Unlock()
+ } else {
+ state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
+ "hash": ToBase32(freq.Hash[:]),
+ "offset": strconv.FormatInt(int64(freq.Offset), 10),
+ }), "unknown")
+ }
case SPTypeHalt:
sdsp := SdsAdd(sds, SDS{"type": "halt"})
state.ctx.LogD("sp-process", sdsp, "")