From 8d123212f301a6e79275607bcd4da0036b894193 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sun, 10 Jun 2018 13:30:35 +0300 Subject: [PATCH] Queue higher priority packets first in SP --- doc/news.ru.texi | 4 +++ doc/news.texi | 3 +++ doc/sp.texi | 5 ++-- src/cypherpunks.ru/nncp/sp.go | 46 ++++++++++++++++++++++++++--------- 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 5aebb84..dd20336 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -9,6 +9,10 @@ проверяют существование @file{.seen} файла и расценивают его как то, что файл уже был скачан. Возможно передача данных была осуществлена сторонним способом и удалённая сторона должна быть оповещена об этом. +@item +Если более высокоприоритетный пакет попадает в спул, то +@command{nncp-daemon} добавит его в очередь отправки первым, прерывая +низкоприоритетные передачи. @end itemize @node Релиз 3.2 diff --git a/doc/news.texi b/doc/news.texi index 123a1ca..d69e02e 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -11,6 +11,9 @@ See also this page @ref{Новости, on russian}. if @file{.seen} exists and treat it like file was already downloaded. Possibly it was transferred out-of-bound and remote side needs to be notifier about that. +@item +If higher priority packet is spooled, then @command{nncp-daemon} will +queue its sending first, interrupting lower priority transmissions. @end itemize @node Release 3.2 diff --git a/doc/sp.texi b/doc/sp.texi index 41ebf45..239b87f 100644 --- a/doc/sp.texi +++ b/doc/sp.texi @@ -169,8 +169,9 @@ payloads, then send all of remaining in the transport stage. corresponding offset. @end itemize -@item When @emph{FREQ} packet received, append it to current sending -queue. Sending queue contains files with offsets that are needed to be +@item When @emph{FREQ} packet received, insert it to current sending +queue with niceness level sort: higher priority packets will be sent +first. Sending queue contains files with offsets that are needed to be sent. @item While sending queue is not empty, send @emph{FILE} packets. diff --git a/src/cypherpunks.ru/nncp/sp.go b/src/cypherpunks.ru/nncp/sp.go index f6b07ce..86cd96c 100644 --- a/src/cypherpunks.ru/nncp/sp.go +++ b/src/cypherpunks.ru/nncp/sp.go @@ -99,6 +99,11 @@ type SPRaw struct { Payload []byte } +type FreqWithNice struct { + freq *SPFreq + nice uint8 +} + func init() { var buf bytes.Buffer spHead := SPHead{Type: SPTypeHalt} @@ -169,8 +174,8 @@ type SPState struct { csTheir *noise.CipherState payloads chan []byte infosTheir map[[32]byte]*SPInfo - infosOurSeen map[[32]byte]struct{} - queueTheir []*SPFreq + infosOurSeen map[[32]byte]uint8 + queueTheir []*FreqWithNice wg sync.WaitGroup RxBytes int64 RxLastSeen time.Time @@ -226,7 +231,7 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { return sp.Payload, nil } -func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte { +func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte { var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { @@ -243,7 +248,7 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{} Size: uint64(job.Size), Hash: job.HshValue, }) - (*seen)[*job.HshValue] = struct{}{} + (*seen)[*job.HshValue] = job.PktEnc.Nice } sort.Sort(ByNice(infos)) var payloads [][]byte @@ -310,7 +315,7 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, nice: nice, payloads: make(chan []byte), infosTheir: make(map[[32]byte]*SPInfo), - infosOurSeen: make(map[[32]byte]struct{}), + infosOurSeen: make(map[[32]byte]uint8), started: started, rxLock: rxLock, txLock: txLock, @@ -388,7 +393,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error hs: hs, nice: nice, payloads: make(chan []byte), - infosOurSeen: make(map[[32]byte]struct{}), + infosOurSeen: make(map[[32]byte]uint8), infosTheir: make(map[[32]byte]*SPInfo), started: started, xxOnly: xxOnly, @@ -563,7 +568,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa time.Sleep(100 * time.Millisecond) continue } - freq := state.queueTheir[0] + freq := state.queueTheir[0].freq state.RUnlock() sdsp := SdsAdd(sds, SDS{ "xx": string(TTx), @@ -618,7 +623,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10) state.ctx.LogP("sp-file", sdsp, "") state.Lock() - if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash { + if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { if ourSize == fullSize { state.ctx.LogD("sp-file", sdsp, "finished") if len(state.queueTheir) > 1 { @@ -627,7 +632,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa state.queueTheir = state.queueTheir[:0] } } else { - state.queueTheir[0].Offset += uint64(len(buf)) + state.queueTheir[0].freq.Offset += uint64(len(buf)) } } else { state.ctx.LogD("sp-file", sdsp, "queue disappeared") @@ -923,9 +928,26 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { "hash": ToBase32(freq.Hash[:]), "offset": strconv.FormatInt(int64(freq.Offset), 10), }), "queueing") - state.Lock() - state.queueTheir = append(state.queueTheir, &freq) - state.Unlock() + nice, exists := state.infosOurSeen[*freq.Hash] + if exists { + state.Lock() + insertIdx := 0 + var freqWithNice *FreqWithNice + for insertIdx, freqWithNice = range state.queueTheir { + if freqWithNice.nice > nice { + break + } + } + state.queueTheir = append(state.queueTheir, nil) + copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:]) + state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice} + state.Unlock() + } else { + state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{ + "hash": ToBase32(freq.Hash[:]), + "offset": strconv.FormatInt(int64(freq.Offset), 10), + }), "unknown") + } case SPTypeHalt: sdsp := SdsAdd(sds, SDS{"type": "halt"}) state.ctx.LogD("sp-process", sdsp, "") -- 2.44.0