]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/sp.go
MTH
[nncp.git] / src / sp.go
index 00eaac6b50d58ee0dc8665fa04d2a0a54b6c913b..7049142421509aeedebbb9bd36f58a7b03c3d8d7 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -22,7 +22,6 @@ import (
        "crypto/subtle"
        "errors"
        "fmt"
-       "hash"
        "io"
        "os"
        "path/filepath"
@@ -33,7 +32,6 @@ import (
        xdr "github.com/davecgh/go-xdr/xdr2"
        "github.com/dustin/go-humanize"
        "github.com/flynn/noise"
-       "golang.org/x/crypto/blake2b"
 )
 
 const (
@@ -42,9 +40,16 @@ const (
        SPHeadOverhead = 4
 )
 
-type SPCheckerQueues struct {
-       appeared chan *[32]byte
-       checked  chan *[32]byte
+type MTHAndOffset struct {
+       mth    *MTH
+       offset uint64
+}
+
+type SPCheckerTask struct {
+       nodeId *NodeId
+       hsh    *[MTHSize]byte
+       mth    *MTH
+       done   chan []byte
 }
 
 var (
@@ -65,8 +70,9 @@ var (
        DefaultDeadline = 10 * time.Second
        PingTimeout     = time.Minute
 
-       spCheckers   = make(map[NodeId]*SPCheckerQueues)
-       SPCheckersWg sync.WaitGroup
+       spCheckerTasks chan SPCheckerTask
+       SPCheckerWg    sync.WaitGroup
+       spCheckerOnce  sync.Once
 )
 
 type FdAndFullSize struct {
@@ -74,11 +80,6 @@ type FdAndFullSize struct {
        fullSize int64
 }
 
-type HasherAndOffset struct {
-       h      hash.Hash
-       offset uint64
-}
-
 type SPType uint8
 
 const (
@@ -97,22 +98,22 @@ type SPHead struct {
 type SPInfo struct {
        Nice uint8
        Size uint64
-       Hash *[32]byte
+       Hash *[MTHSize]byte
 }
 
 type SPFreq struct {
-       Hash   *[32]byte
+       Hash   *[MTHSize]byte
        Offset uint64
 }
 
 type SPFile struct {
-       Hash    *[32]byte
+       Hash    *[MTHSize]byte
        Offset  uint64
        Payload []byte
 }
 
 type SPDone struct {
-       Hash *[32]byte
+       Hash *[MTHSize]byte
 }
 
 type SPRaw struct {
@@ -149,25 +150,26 @@ func init() {
        copy(SPPingMarshalized, buf.Bytes())
        buf.Reset()
 
-       spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
+       spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
        if _, err := xdr.Marshal(&buf, spInfo); err != nil {
                panic(err)
        }
        SPInfoOverhead = buf.Len()
        buf.Reset()
 
-       spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
+       spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
        if _, err := xdr.Marshal(&buf, spFreq); err != nil {
                panic(err)
        }
        SPFreqOverhead = buf.Len()
        buf.Reset()
 
-       spFile := SPFile{Hash: new([32]byte), Offset: 123}
+       spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
        if _, err := xdr.Marshal(&buf, spFile); err != nil {
                panic(err)
        }
        SPFileOverhead = buf.Len()
+       spCheckerTasks = make(chan SPCheckerTask)
 }
 
 func MarshalSP(typ SPType, sp interface{}) []byte {
@@ -209,8 +211,8 @@ type SPState struct {
        csTheir        *noise.CipherState
        payloads       chan []byte
        pings          chan struct{}
-       infosTheir     map[[32]byte]*SPInfo
-       infosOurSeen   map[[32]byte]uint8
+       infosTheir     map[[MTHSize]byte]*SPInfo
+       infosOurSeen   map[[MTHSize]byte]uint8
        queueTheir     []*FreqWithNice
        wg             sync.WaitGroup
        RxBytes        int64
@@ -231,12 +233,11 @@ type SPState struct {
        txRate         int
        isDead         chan struct{}
        listOnly       bool
-       onlyPkts       map[[32]byte]bool
+       onlyPkts       map[[MTHSize]byte]bool
        writeSPBuf     bytes.Buffer
        fds            map[string]FdAndFullSize
        fdsLock        sync.RWMutex
-       fileHashers    map[string]*HasherAndOffset
-       checkerQueues  SPCheckerQueues
+       fileHashers    map[string]*MTHAndOffset
        progressBars   map[string]struct{}
        sync.RWMutex
 }
@@ -275,40 +276,6 @@ func (state *SPState) dirUnlock() {
        state.Ctx.UnlockDir(state.txLock)
 }
 
-func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
-       for hshValue := range appeared {
-               pktName := Base32Codec.EncodeToString(hshValue[:])
-               les := LEs{
-                       {"XX", string(TRx)},
-                       {"Node", nodeId},
-                       {"Pkt", pktName},
-               }
-               SPCheckersWg.Add(1)
-               ctx.LogD("sp-checker", les, func(les LEs) string {
-                       return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
-               })
-               size, err := ctx.CheckNoCK(nodeId, hshValue)
-               les = append(les, LE{"Size", size})
-               if err != nil {
-                       ctx.LogE("sp-checker", les, err, func(les LEs) string {
-                               return fmt.Sprintf(
-                                       "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
-                                       humanize.IBytes(uint64(size)),
-                               )
-                       })
-                       continue
-               }
-               ctx.LogI("sp-checker-done", les, func(les LEs) string {
-                       return fmt.Sprintf(
-                               "Packet %s is retreived (%s)",
-                               pktName, humanize.IBytes(uint64(size)),
-                       )
-               })
-               SPCheckersWg.Done()
-               go func(hsh *[32]byte) { checked <- hsh }(hshValue)
-       }
-}
-
 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
        state.writeSPBuf.Reset()
        n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
@@ -346,7 +313,7 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
        return sp.Payload, nil
 }
 
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
        var infos []*SPInfo
        var totalSize int64
        for job := range ctx.Jobs(nodeId, TTx) {
@@ -438,8 +405,8 @@ func (state *SPState) StartI(conn ConnDeadlined) error {
        state.hs = hs
        state.payloads = make(chan []byte)
        state.pings = make(chan struct{})
-       state.infosTheir = make(map[[32]byte]*SPInfo)
-       state.infosOurSeen = make(map[[32]byte]uint8)
+       state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
+       state.infosOurSeen = make(map[[MTHSize]byte]uint8)
        state.progressBars = make(map[string]struct{})
        state.started = started
        state.rxLock = rxLock
@@ -556,8 +523,8 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        state.hs = hs
        state.payloads = make(chan []byte)
        state.pings = make(chan struct{})
-       state.infosOurSeen = make(map[[32]byte]uint8)
-       state.infosTheir = make(map[[32]byte]*SPInfo)
+       state.infosOurSeen = make(map[[MTHSize]byte]uint8)
+       state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
        state.progressBars = make(map[string]struct{})
        state.started = started
        state.xxOnly = xxOnly
@@ -686,43 +653,13 @@ func (state *SPState) StartWorkers(
 ) error {
        les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
        state.fds = make(map[string]FdAndFullSize)
-       state.fileHashers = make(map[string]*HasherAndOffset)
+       state.fileHashers = make(map[string]*MTHAndOffset)
        state.isDead = make(chan struct{})
        if state.maxOnlineTime > 0 {
                state.mustFinishAt = state.started.Add(state.maxOnlineTime)
        }
-
-       // Checker
        if !state.NoCK {
-               queues := spCheckers[*state.Node.Id]
-               if queues == nil {
-                       queues = &SPCheckerQueues{
-                               appeared: make(chan *[32]byte),
-                               checked:  make(chan *[32]byte),
-                       }
-                       spCheckers[*state.Node.Id] = queues
-                       go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
-               }
-               state.checkerQueues = *queues
-               go func() {
-                       for job := range state.Ctx.JobsNoCK(state.Node.Id) {
-                               if job.PktEnc.Nice <= state.Nice {
-                                       state.checkerQueues.appeared <- job.HshValue
-                               }
-                       }
-               }()
-               state.wg.Add(1)
-               go func() {
-                       defer state.wg.Done()
-                       for {
-                               select {
-                               case <-state.isDead:
-                                       return
-                               case hsh := <-state.checkerQueues.checked:
-                                       state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
-                               }
-                       }
-               }()
+               spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
        }
 
        // Remaining handshake payload sending
@@ -1033,7 +970,12 @@ func (state *SPState) StartWorkers(
                        }
                        state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
                        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
-                       if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
+                       ct, err := state.csOur.Encrypt(nil, nil, payload)
+                       if err != nil {
+                               state.Ctx.LogE("sp-encrypting", les, err, logMsg)
+                               return
+                       }
+                       if err := state.WriteSP(conn, ct, ping); err != nil {
                                state.Ctx.LogE("sp-sending", les, err, logMsg)
                                return
                        }
@@ -1139,7 +1081,6 @@ func (state *SPState) Wait() {
        close(state.payloads)
        close(state.pings)
        state.Duration = time.Now().Sub(state.started)
-       SPCheckersWg.Wait()
        state.dirUnlock()
        state.RxSpeed = state.RxBytes
        state.TxSpeed = state.TxBytes
@@ -1367,6 +1308,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        pktName, humanize.IBytes(uint64(len(file.Payload))),
                                )
                        }
+                       fullsize := int64(0)
+                       state.RLock()
+                       infoTheir, ok := state.infosTheir[*file.Hash]
+                       state.RUnlock()
+                       if !ok {
+                               state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
+                                       return logMsg(les) + ": unknown file"
+                               })
+                               continue
+                       }
+                       fullsize = int64(infoTheir.Size)
+                       lesp = append(lesp, LE{"FullSize", fullsize})
                        dirToSync := filepath.Join(
                                state.Ctx.Spool,
                                state.Node.Id.String(),
@@ -1380,6 +1333,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.fdsLock.RLock()
                        fdAndFullSize, exists := state.fds[filePathPart]
                        state.fdsLock.RUnlock()
+                       hasherAndOffset := state.fileHashers[filePath]
                        var fd *os.File
                        if exists {
                                fd = fdAndFullSize.fd
@@ -1398,12 +1352,12 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                state.fdsLock.Lock()
                                state.fds[filePathPart] = FdAndFullSize{fd: fd}
                                state.fdsLock.Unlock()
-                               if file.Offset == 0 {
-                                       h, err := blake2b.New256(nil)
-                                       if err != nil {
-                                               panic(err)
+                               if !state.NoCK {
+                                       hasherAndOffset = &MTHAndOffset{
+                                               mth:    MTHNew(fullsize, int64(file.Offset)),
+                                               offset: file.Offset,
                                        }
-                                       state.fileHashers[filePath] = &HasherAndOffset{h: h}
+                                       state.fileHashers[filePath] = hasherAndOffset
                                }
                        }
                        state.Ctx.LogD(
@@ -1429,34 +1383,25 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                state.closeFd(filePathPart)
                                return nil, err
                        }
-                       hasherAndOffset, hasherExists := state.fileHashers[filePath]
-                       if hasherExists {
+                       if hasherAndOffset != nil {
                                if hasherAndOffset.offset == file.Offset {
-                                       if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
+                                       if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
                                                panic(err)
                                        }
                                        hasherAndOffset.offset += uint64(len(file.Payload))
                                } else {
-                                       state.Ctx.LogD(
-                                               "sp-file-offset-differs", lesp,
+                                       state.Ctx.LogE(
+                                               "sp-file-offset-differs", lesp, errors.New("offset differs"),
                                                func(les LEs) string {
-                                                       return logMsg(les) + ": offset differs, deleting hasher"
+                                                       return logMsg(les) + ": deleting hasher"
                                                },
                                        )
                                        delete(state.fileHashers, filePath)
-                                       hasherExists = false
+                                       hasherAndOffset = nil
                                }
                        }
                        ourSize := int64(file.Offset + uint64(len(file.Payload)))
-                       lesp[len(lesp)-1].V = ourSize
-                       fullsize := int64(0)
-                       state.RLock()
-                       infoTheir, ok := state.infosTheir[*file.Hash]
-                       state.RUnlock()
-                       if ok {
-                               fullsize = int64(infoTheir.Size)
-                       }
-                       lesp = append(lesp, LE{"FullSize", fullsize})
+                       lesp[len(lesp)-2].V = ourSize
                        if state.Ctx.ShowPrgrs {
                                state.progressBars[pktName] = struct{}{}
                                Progress("Rx", lesp)
@@ -1483,59 +1428,64 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                state.closeFd(filePathPart)
                                continue
                        }
-                       if hasherExists {
-                               if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
-                                       state.Ctx.LogE(
-                                               "sp-file-bad-checksum", lesp,
-                                               errors.New("checksum mismatch"),
-                                               logMsg,
-                                       )
-                                       continue
-                               }
-                               if err = os.Rename(filePathPart, filePath); err != nil {
-                                       state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
-                                               return logMsg(les) + ": renaming"
-                                       })
-                                       continue
-                               }
-                               if err = DirSync(dirToSync); err != nil {
-                                       state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
-                                               return logMsg(les) + ": dirsyncing"
-                                       })
-                                       continue
-                               }
-                               state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
-                                       return logMsg(les) + ": done"
-                               })
-                               state.wg.Add(1)
-                               go func() {
-                                       state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
-                                       state.wg.Done()
-                               }()
-                               state.Lock()
-                               delete(state.infosTheir, *file.Hash)
-                               state.Unlock()
-                               if !state.Ctx.HdrUsage {
-                                       state.closeFd(filePathPart)
-                                       continue
-                               }
-                               if _, err = fd.Seek(0, io.SeekStart); err != nil {
-                                       state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
-                                               return logMsg(les) + ": seeking"
+                       if hasherAndOffset != nil {
+                               delete(state.fileHashers, filePath)
+                               if hasherAndOffset.mth.PrependSize == 0 {
+                                       if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
+                                               state.Ctx.LogE(
+                                                       "sp-file-bad-checksum", lesp,
+                                                       errors.New("checksum mismatch"),
+                                                       logMsg,
+                                               )
+                                               state.closeFd(filePathPart)
+                                               continue
+                                       }
+                                       if err = os.Rename(filePathPart, filePath); err != nil {
+                                               state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
+                                                       return logMsg(les) + ": renaming"
+                                               })
+                                               state.closeFd(filePathPart)
+                                               continue
+                                       }
+                                       if err = DirSync(dirToSync); err != nil {
+                                               state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
+                                                       return logMsg(les) + ": dirsyncing"
+                                               })
+                                               state.closeFd(filePathPart)
+                                               continue
+                                       }
+                                       state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
+                                               return logMsg(les) + ": done"
                                        })
+                                       state.wg.Add(1)
+                                       go func() {
+                                               state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
+                                               state.wg.Done()
+                                       }()
+                                       state.Lock()
+                                       delete(state.infosTheir, *file.Hash)
+                                       state.Unlock()
+                                       if !state.Ctx.HdrUsage {
+                                               continue
+                                       }
+                                       if _, err = fd.Seek(0, io.SeekStart); err != nil {
+                                               state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
+                                                       return logMsg(les) + ": seeking"
+                                               })
+                                               state.closeFd(filePathPart)
+                                               continue
+                                       }
+                                       _, pktEncRaw, err := state.Ctx.HdrRead(fd)
                                        state.closeFd(filePathPart)
+                                       if err != nil {
+                                               state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
+                                                       return logMsg(les) + ": HdrReading"
+                                               })
+                                               continue
+                                       }
+                                       state.Ctx.HdrWrite(pktEncRaw, filePath)
                                        continue
                                }
-                               _, pktEncRaw, err := state.Ctx.HdrRead(fd)
-                               state.closeFd(filePathPart)
-                               if err != nil {
-                                       state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
-                                               return logMsg(les) + ": HdrReading"
-                                       })
-                                       continue
-                               }
-                               state.Ctx.HdrWrite(pktEncRaw, filePath)
-                               continue
                        }
                        state.closeFd(filePathPart)
                        if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
@@ -1556,8 +1506,15 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Lock()
                        delete(state.infosTheir, *file.Hash)
                        state.Unlock()
-                       if !state.NoCK {
-                               state.checkerQueues.appeared <- file.Hash
+                       if hasherAndOffset != nil {
+                               go func() {
+                                       spCheckerTasks <- SPCheckerTask{
+                                               nodeId: state.Node.Id,
+                                               hsh:    file.Hash,
+                                               mth:    hasherAndOffset.mth,
+                                               done:   state.payloads,
+                                       }
+                               }()
                        }
 
                case SPTypeDone:
@@ -1701,3 +1658,41 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
        }
        return payloadsSplit(replies), nil
 }
+
+func SPChecker(ctx *Ctx) {
+       for t := range spCheckerTasks {
+               pktName := Base32Codec.EncodeToString(t.hsh[:])
+               les := LEs{
+                       {"XX", string(TRx)},
+                       {"Node", t.nodeId},
+                       {"Pkt", pktName},
+               }
+               SPCheckerWg.Add(1)
+               ctx.LogD("sp-checker", les, func(les LEs) string {
+                       return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
+               })
+               size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
+               les = append(les, LE{"Size", size})
+               if err != nil {
+                       ctx.LogE("sp-checker", les, err, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
+                                       humanize.IBytes(uint64(size)),
+                               )
+                       })
+                       SPCheckerWg.Done()
+                       continue
+               }
+               ctx.LogI("sp-checker-done", les, func(les LEs) string {
+                       return fmt.Sprintf(
+                               "Packet %s is retreived (%s)",
+                               pktName, humanize.IBytes(uint64(size)),
+                       )
+               })
+               SPCheckerWg.Done()
+               go func(t SPCheckerTask) {
+                       defer func() { recover() }()
+                       t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
+               }(t)
+       }
+}