]> Cypherpunks.ru repositories - nncp.git/commitdiff
Queue higher priority packets first in SP
authorSergey Matveev <stargrave@stargrave.org>
Sun, 10 Jun 2018 10:30:35 +0000 (13:30 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sun, 10 Jun 2018 10:30:35 +0000 (13:30 +0300)
doc/news.ru.texi
doc/news.texi
doc/sp.texi
src/cypherpunks.ru/nncp/sp.go

index 5aebb84d17f13c2e56e4cbcdb7b5f58ef5748203..dd20336f65c32e120142c8b0d2ddc1836047ae28 100644 (file)
@@ -9,6 +9,10 @@
 проверяют существование @file{.seen} файла и расценивают его как то, что
 файл уже был скачан. Возможно передача данных была осуществлена
 сторонним способом и удалённая сторона должна быть оповещена об этом.
+@item
+Если более высокоприоритетный пакет попадает в спул, то
+@command{nncp-daemon} добавит его в очередь отправки первым, прерывая
+низкоприоритетные передачи.
 @end itemize
 
 @node Релиз 3.2
index 123a1ca3f45d6f0d3bce571e71ea543e456b676f..d69e02e03b52388c8db084a0a14c5e1feeba49a4 100644 (file)
@@ -11,6 +11,9 @@ See also this page @ref{Новости, on russian}.
 if @file{.seen} exists and treat it like file was already downloaded.
 Possibly it was transferred out-of-bound and remote side needs to be
 notifier about that.
+@item
+If higher priority packet is spooled, then @command{nncp-daemon} will
+queue its sending first, interrupting lower priority transmissions.
 @end itemize
 
 @node Release 3.2
index 41ebf45121da6e1284563dad454bbe3ad16a7ff8..239b87f1eb6c0d995caa606a88f196a8114e15a6 100644 (file)
@@ -169,8 +169,9 @@ payloads, then send all of remaining in the transport stage.
     corresponding offset.
     @end itemize
 
-@item When @emph{FREQ} packet received, append it to current sending
-queue. Sending queue contains files with offsets that are needed to be
+@item When @emph{FREQ} packet received, insert it to current sending
+queue with niceness level sort: higher priority packets will be sent
+first. Sending queue contains files with offsets that are needed to be
 sent.
 
 @item While sending queue is not empty, send @emph{FILE} packets.
index f6b07ce5ad7dac0ac1f6d7ab04f28bab4ae9a5b8..86cd96c833f6db6b7b0ab2dad707d8faddeecde4 100644 (file)
@@ -99,6 +99,11 @@ type SPRaw struct {
        Payload []byte
 }
 
+type FreqWithNice struct {
+       freq *SPFreq
+       nice uint8
+}
+
 func init() {
        var buf bytes.Buffer
        spHead := SPHead{Type: SPTypeHalt}
@@ -169,8 +174,8 @@ type SPState struct {
        csTheir        *noise.CipherState
        payloads       chan []byte
        infosTheir     map[[32]byte]*SPInfo
-       infosOurSeen   map[[32]byte]struct{}
-       queueTheir     []*SPFreq
+       infosOurSeen   map[[32]byte]uint8
+       queueTheir     []*FreqWithNice
        wg             sync.WaitGroup
        RxBytes        int64
        RxLastSeen     time.Time
@@ -226,7 +231,7 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
        return sp.Payload, nil
 }
 
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
        var infos []*SPInfo
        var totalSize int64
        for job := range ctx.Jobs(nodeId, TTx) {
@@ -243,7 +248,7 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}
                        Size: uint64(job.Size),
                        Hash: job.HshValue,
                })
-               (*seen)[*job.HshValue] = struct{}{}
+               (*seen)[*job.HshValue] = job.PktEnc.Nice
        }
        sort.Sort(ByNice(infos))
        var payloads [][]byte
@@ -310,7 +315,7 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx,
                nice:           nice,
                payloads:       make(chan []byte),
                infosTheir:     make(map[[32]byte]*SPInfo),
-               infosOurSeen:   make(map[[32]byte]struct{}),
+               infosOurSeen:   make(map[[32]byte]uint8),
                started:        started,
                rxLock:         rxLock,
                txLock:         txLock,
@@ -388,7 +393,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error
                hs:           hs,
                nice:         nice,
                payloads:     make(chan []byte),
-               infosOurSeen: make(map[[32]byte]struct{}),
+               infosOurSeen: make(map[[32]byte]uint8),
                infosTheir:   make(map[[32]byte]*SPInfo),
                started:      started,
                xxOnly:       xxOnly,
@@ -563,7 +568,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa
                                        time.Sleep(100 * time.Millisecond)
                                        continue
                                }
-                               freq := state.queueTheir[0]
+                               freq := state.queueTheir[0].freq
                                state.RUnlock()
                                sdsp := SdsAdd(sds, SDS{
                                        "xx":   string(TTx),
@@ -618,7 +623,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa
                                sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
                                state.ctx.LogP("sp-file", sdsp, "")
                                state.Lock()
-                               if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
+                               if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
                                        if ourSize == fullSize {
                                                state.ctx.LogD("sp-file", sdsp, "finished")
                                                if len(state.queueTheir) > 1 {
@@ -627,7 +632,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa
                                                        state.queueTheir = state.queueTheir[:0]
                                                }
                                        } else {
-                                               state.queueTheir[0].Offset += uint64(len(buf))
+                                               state.queueTheir[0].freq.Offset += uint64(len(buf))
                                        }
                                } else {
                                        state.ctx.LogD("sp-file", sdsp, "queue disappeared")
@@ -923,9 +928,26 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                "hash":   ToBase32(freq.Hash[:]),
                                "offset": strconv.FormatInt(int64(freq.Offset), 10),
                        }), "queueing")
-                       state.Lock()
-                       state.queueTheir = append(state.queueTheir, &freq)
-                       state.Unlock()
+                       nice, exists := state.infosOurSeen[*freq.Hash]
+                       if exists {
+                               state.Lock()
+                               insertIdx := 0
+                               var freqWithNice *FreqWithNice
+                               for insertIdx, freqWithNice = range state.queueTheir {
+                                       if freqWithNice.nice > nice {
+                                               break
+                                       }
+                               }
+                               state.queueTheir = append(state.queueTheir, nil)
+                               copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
+                               state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
+                               state.Unlock()
+                       } else {
+                               state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
+                                       "hash":   ToBase32(freq.Hash[:]),
+                                       "offset": strconv.FormatInt(int64(freq.Offset), 10),
+                               }), "unknown")
+                       }
                case SPTypeHalt:
                        sdsp := SdsAdd(sds, SDS{"type": "halt"})
                        state.ctx.LogD("sp-process", sdsp, "")