]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/cypherpunks.ru/nncp/sp.go
Queue higher priority packets first in SP
[nncp.git] / src / cypherpunks.ru / nncp / sp.go
index be845109752a79b2393dfdb8be20b716c5584621..86cd96c833f6db6b7b0ab2dad707d8faddeecde4 100644 (file)
@@ -1,6 +1,6 @@
 /*
 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
 
 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
@@ -55,6 +55,8 @@ var (
                noise.CipherChaChaPoly,
                noise.HashBLAKE2b,
        )
+
+       spWorkersGroup sync.WaitGroup
 )
 
 type SPType uint8
@@ -97,6 +99,11 @@ type SPRaw struct {
        Payload []byte
 }
 
+type FreqWithNice struct {
+       freq *SPFreq
+       nice uint8
+}
+
 func init() {
        var buf bytes.Buffer
        spHead := SPHead{Type: SPTypeHalt}
@@ -167,8 +174,8 @@ type SPState struct {
        csTheir        *noise.CipherState
        payloads       chan []byte
        infosTheir     map[[32]byte]*SPInfo
-       infosOurSeen   map[[32]byte]struct{}
-       queueTheir     []*SPFreq
+       infosOurSeen   map[[32]byte]uint8
+       queueTheir     []*FreqWithNice
        wg             sync.WaitGroup
        RxBytes        int64
        RxLastSeen     time.Time
@@ -224,7 +231,7 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
        return sp.Payload, nil
 }
 
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
        var infos []*SPInfo
        var totalSize int64
        for job := range ctx.Jobs(nodeId, TTx) {
@@ -241,7 +248,7 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}
                        Size: uint64(job.Size),
                        Hash: job.HshValue,
                })
-               (*seen)[*job.HshValue] = struct{}{}
+               (*seen)[*job.HshValue] = job.PktEnc.Nice
        }
        sort.Sort(ByNice(infos))
        var payloads [][]byte
@@ -295,16 +302,20 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx,
                },
                PeerStatic: node.NoisePub[:],
        }
+       hs, err := noise.NewHandshakeState(conf)
+       if err != nil {
+               return nil, err
+       }
        state := SPState{
                ctx:            ctx,
-               hs:             noise.NewHandshakeState(conf),
+               hs:             hs,
                Node:           node,
                onlineDeadline: onlineDeadline,
                maxOnlineTime:  maxOnlineTime,
                nice:           nice,
                payloads:       make(chan []byte),
                infosTheir:     make(map[[32]byte]*SPInfo),
-               infosOurSeen:   make(map[[32]byte]struct{}),
+               infosOurSeen:   make(map[[32]byte]uint8),
                started:        started,
                rxLock:         rxLock,
                txLock:         txLock,
@@ -326,7 +337,11 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx,
 
        var buf []byte
        var payload []byte
-       buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
+       buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
+       if err != nil {
+               state.dirUnlock()
+               return nil, err
+       }
        sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
        ctx.LogD("sp-start", sds, "sending first message")
        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
@@ -369,19 +384,22 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error
                        Public:  ctx.Self.NoisePub[:],
                },
        }
+       hs, err := noise.NewHandshakeState(conf)
+       if err != nil {
+               return nil, err
+       }
        state := SPState{
                ctx:          ctx,
-               hs:           noise.NewHandshakeState(conf),
+               hs:           hs,
                nice:         nice,
                payloads:     make(chan []byte),
-               infosOurSeen: make(map[[32]byte]struct{}),
+               infosOurSeen: make(map[[32]byte]uint8),
                infosTheir:   make(map[[32]byte]*SPInfo),
                started:      started,
                xxOnly:       xxOnly,
        }
        var buf []byte
        var payload []byte
-       var err error
        ctx.LogD(
                "sp-start",
                SDS{"nice": strconv.Itoa(int(nice))},
@@ -447,7 +465,11 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error
        }
 
        ctx.LogD("sp-start", sds, "sending first message")
-       buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
+       buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
+       if err != nil {
+               state.dirUnlock()
+               return nil, err
+       }
        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
        if err = state.WriteSP(conn, buf); err != nil {
                ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
@@ -546,7 +568,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa
                                        time.Sleep(100 * time.Millisecond)
                                        continue
                                }
-                               freq := state.queueTheir[0]
+                               freq := state.queueTheir[0].freq
                                state.RUnlock()
                                sdsp := SdsAdd(sds, SDS{
                                        "xx":   string(TTx),
@@ -601,7 +623,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa
                                sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
                                state.ctx.LogP("sp-file", sdsp, "")
                                state.Lock()
-                               if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
+                               if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
                                        if ourSize == fullSize {
                                                state.ctx.LogD("sp-file", sdsp, "finished")
                                                if len(state.queueTheir) > 1 {
@@ -610,7 +632,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa
                                                        state.queueTheir = state.queueTheir[:0]
                                                }
                                        } else {
-                                               state.queueTheir[0].Offset += uint64(len(buf))
+                                               state.queueTheir[0].freq.Offset += uint64(len(buf))
                                        }
                                } else {
                                        state.ctx.LogD("sp-file", sdsp, "queue disappeared")
@@ -746,22 +768,23 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.infosTheir[*info.Hash] = &info
                        state.Unlock()
                        state.ctx.LogD("sp-process", sdsp, "stating part")
-                       if _, err = os.Stat(filepath.Join(
+                       pktPath := filepath.Join(
                                state.ctx.Spool,
                                state.Node.Id.String(),
                                string(TRx),
                                ToBase32(info.Hash[:]),
-                       )); err == nil {
+                       )
+                       if _, err = os.Stat(pktPath); err == nil {
                                state.ctx.LogD("sp-process", sdsp, "already done")
                                replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
                                continue
                        }
-                       fi, err := os.Stat(filepath.Join(
-                               state.ctx.Spool,
-                               state.Node.Id.String(),
-                               string(TRx),
-                               ToBase32(info.Hash[:])+PartSuffix,
-                       ))
+                       if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
+                               state.ctx.LogD("sp-process", sdsp, "already seen")
+                               replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
+                               continue
+                       }
+                       fi, err := os.Stat(pktPath + PartSuffix)
                        var offset int64
                        if err == nil {
                                offset = fi.Size()
@@ -838,6 +861,8 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                continue
                        }
                        state.RUnlock()
+                       spWorkersGroup.Wait()
+                       spWorkersGroup.Add(1)
                        go func() {
                                if err := fd.Sync(); err != nil {
                                        state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
@@ -859,6 +884,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                state.Lock()
                                delete(state.infosTheir, *file.Hash)
                                state.Unlock()
+                               spWorkersGroup.Done()
                                go func() {
                                        state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
                                }()
@@ -902,9 +928,26 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                "hash":   ToBase32(freq.Hash[:]),
                                "offset": strconv.FormatInt(int64(freq.Offset), 10),
                        }), "queueing")
-                       state.Lock()
-                       state.queueTheir = append(state.queueTheir, &freq)
-                       state.Unlock()
+                       nice, exists := state.infosOurSeen[*freq.Hash]
+                       if exists {
+                               state.Lock()
+                               insertIdx := 0
+                               var freqWithNice *FreqWithNice
+                               for insertIdx, freqWithNice = range state.queueTheir {
+                                       if freqWithNice.nice > nice {
+                                               break
+                                       }
+                               }
+                               state.queueTheir = append(state.queueTheir, nil)
+                               copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
+                               state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
+                               state.Unlock()
+                       } else {
+                               state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
+                                       "hash":   ToBase32(freq.Hash[:]),
+                                       "offset": strconv.FormatInt(int64(freq.Offset), 10),
+                               }), "unknown")
+                       }
                case SPTypeHalt:
                        sdsp := SdsAdd(sds, SDS{"type": "halt"})
                        state.ctx.LogD("sp-process", sdsp, "")