]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/sp.go
Raise copyright years
[nncp.git] / src / sp.go
index 6d31375e137ff3a044d0bb5c1ad8ea4c481b4dd0..4db358742a17b1f584aa102ac853a1c6386c6d13 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -1,6 +1,6 @@
 /*
 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
 
 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
@@ -22,8 +22,8 @@ import (
        "crypto/subtle"
        "errors"
        "fmt"
-       "hash"
        "io"
+       "log"
        "os"
        "path/filepath"
        "sort"
@@ -33,7 +33,6 @@ import (
        xdr "github.com/davecgh/go-xdr/xdr2"
        "github.com/dustin/go-humanize"
        "github.com/flynn/noise"
-       "golang.org/x/crypto/blake2b"
 )
 
 const (
@@ -42,14 +41,19 @@ const (
        SPHeadOverhead = 4
 )
 
-type SPCheckerQueues struct {
-       appeared chan *[32]byte
-       checked  chan *[32]byte
+type MTHAndOffset struct {
+       mth    MTH
+       offset uint64
 }
 
-var (
-       MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
+type SPCheckerTask struct {
+       nodeId *NodeId
+       hsh    *[MTHSize]byte
+       mth    MTH
+       done   chan []byte
+}
 
+var (
        SPInfoOverhead    int
        SPFreqOverhead    int
        SPFileOverhead    int
@@ -65,7 +69,9 @@ var (
        DefaultDeadline = 10 * time.Second
        PingTimeout     = time.Minute
 
-       spCheckers = make(map[NodeId]*SPCheckerQueues)
+       spCheckerTasks chan SPCheckerTask
+       SPCheckerWg    sync.WaitGroup
+       spCheckerOnce  sync.Once
 )
 
 type FdAndFullSize struct {
@@ -73,11 +79,6 @@ type FdAndFullSize struct {
        fullSize int64
 }
 
-type HasherAndOffset struct {
-       h      hash.Hash
-       offset uint64
-}
-
 type SPType uint8
 
 const (
@@ -96,22 +97,22 @@ type SPHead struct {
 type SPInfo struct {
        Nice uint8
        Size uint64
-       Hash *[32]byte
+       Hash *[MTHSize]byte
 }
 
 type SPFreq struct {
-       Hash   *[32]byte
+       Hash   *[MTHSize]byte
        Offset uint64
 }
 
 type SPFile struct {
-       Hash    *[32]byte
+       Hash    *[MTHSize]byte
        Offset  uint64
        Payload []byte
 }
 
 type SPDone struct {
-       Hash *[32]byte
+       Hash *[MTHSize]byte
 }
 
 type SPRaw struct {
@@ -148,25 +149,26 @@ func init() {
        copy(SPPingMarshalized, buf.Bytes())
        buf.Reset()
 
-       spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
+       spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
        if _, err := xdr.Marshal(&buf, spInfo); err != nil {
                panic(err)
        }
        SPInfoOverhead = buf.Len()
        buf.Reset()
 
-       spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
+       spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
        if _, err := xdr.Marshal(&buf, spFreq); err != nil {
                panic(err)
        }
        SPFreqOverhead = buf.Len()
        buf.Reset()
 
-       spFile := SPFile{Hash: new([32]byte), Offset: 123}
+       spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
        if _, err := xdr.Marshal(&buf, spFile); err != nil {
                panic(err)
        }
        SPFileOverhead = buf.Len()
+       spCheckerTasks = make(chan SPCheckerTask)
 }
 
 func MarshalSP(typ SPType, sp interface{}) []byte {
@@ -208,8 +210,8 @@ type SPState struct {
        csTheir        *noise.CipherState
        payloads       chan []byte
        pings          chan struct{}
-       infosTheir     map[[32]byte]*SPInfo
-       infosOurSeen   map[[32]byte]uint8
+       infosTheir     map[[MTHSize]byte]*SPInfo
+       infosOurSeen   map[[MTHSize]byte]uint8
        queueTheir     []*FreqWithNice
        wg             sync.WaitGroup
        RxBytes        int64
@@ -230,12 +232,12 @@ type SPState struct {
        txRate         int
        isDead         chan struct{}
        listOnly       bool
-       onlyPkts       map[[32]byte]bool
+       onlyPkts       map[[MTHSize]byte]bool
        writeSPBuf     bytes.Buffer
        fds            map[string]FdAndFullSize
        fdsLock        sync.RWMutex
-       fileHashers    map[string]*HasherAndOffset
-       checkerQueues  SPCheckerQueues
+       fileHashers    map[string]*MTHAndOffset
+       progressBars   map[string]struct{}
        sync.RWMutex
 }
 
@@ -257,11 +259,6 @@ func (state *SPState) SetDead() {
                for range state.pings {
                }
        }()
-       go func() {
-               for _, s := range state.fds {
-                       s.fd.Close()
-               }
-       }()
 }
 
 func (state *SPState) NotAlive() bool {
@@ -278,42 +275,10 @@ func (state *SPState) dirUnlock() {
        state.Ctx.UnlockDir(state.txLock)
 }
 
-func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
-       for hshValue := range appeared {
-               pktName := Base32Codec.EncodeToString(hshValue[:])
-               les := LEs{
-                       {"XX", string(TRx)},
-                       {"Node", nodeId},
-                       {"Pkt", pktName},
-               }
-               ctx.LogD("sp-checker", les, func(les LEs) string {
-                       return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
-               })
-               size, err := ctx.CheckNoCK(nodeId, hshValue)
-               les = append(les, LE{"Size", size})
-               if err != nil {
-                       ctx.LogE("sp-checker", les, err, func(les LEs) string {
-                               return fmt.Sprintf(
-                                       "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
-                                       humanize.IBytes(uint64(size)),
-                               )
-                       })
-                       continue
-               }
-               ctx.LogI("sp-checker-done", les, func(les LEs) string {
-                       return fmt.Sprintf(
-                               "Packet %s is retreived (%s)",
-                               pktName, humanize.IBytes(uint64(size)),
-                       )
-               })
-               go func(hsh *[32]byte) { checked <- hsh }(hshValue)
-       }
-}
-
 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
        state.writeSPBuf.Reset()
        n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
-               Magic:   MagicNNCPLv1,
+               Magic:   MagicNNCPSv1.B,
                Payload: payload,
        })
        if err != nil {
@@ -341,13 +306,13 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
        }
        state.RxLastSeen = time.Now()
        state.RxBytes += int64(n)
-       if sp.Magic != MagicNNCPLv1 {
+       if sp.Magic != MagicNNCPSv1.B {
                return nil, BadMagic
        }
        return sp.Payload, nil
 }
 
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
        var infos []*SPInfo
        var totalSize int64
        for job := range ctx.Jobs(nodeId, TTx) {
@@ -439,8 +404,9 @@ func (state *SPState) StartI(conn ConnDeadlined) error {
        state.hs = hs
        state.payloads = make(chan []byte)
        state.pings = make(chan struct{})
-       state.infosTheir = make(map[[32]byte]*SPInfo)
-       state.infosOurSeen = make(map[[32]byte]uint8)
+       state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
+       state.infosOurSeen = make(map[[MTHSize]byte]uint8)
+       state.progressBars = make(map[string]struct{})
        state.started = started
        state.rxLock = rxLock
        state.txLock = txLock
@@ -473,7 +439,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error {
                        NicenessFmt(state.Nice),
                )
        })
-       conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+       conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
        if err = state.WriteSP(conn, buf, false); err != nil {
                state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
                        return fmt.Sprintf(
@@ -492,7 +458,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error {
                        NicenessFmt(state.Nice),
                )
        })
-       conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+       conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
        if buf, err = state.ReadSP(conn); err != nil {
                state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
                        return fmt.Sprintf(
@@ -556,8 +522,9 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        state.hs = hs
        state.payloads = make(chan []byte)
        state.pings = make(chan struct{})
-       state.infosOurSeen = make(map[[32]byte]uint8)
-       state.infosTheir = make(map[[32]byte]*SPInfo)
+       state.infosOurSeen = make(map[[MTHSize]byte]uint8)
+       state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
+       state.progressBars = make(map[string]struct{})
        state.started = started
        state.xxOnly = xxOnly
 
@@ -571,7 +538,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        }
        les := LEs{{"Nice", int(state.Nice)}}
        state.Ctx.LogD("sp-startR", les, logMsg)
-       conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+       conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
        if buf, err = state.ReadSP(conn); err != nil {
                state.Ctx.LogE("sp-startR-read", les, err, logMsg)
                return err
@@ -583,6 +550,9 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
 
        var node *Node
        for _, n := range state.Ctx.Neigh {
+               if n.NoisePub == nil {
+                       continue
+               }
                if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
                        node = n
                        break
@@ -645,7 +615,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
                state.dirUnlock()
                return err
        }
-       conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+       conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
        if err = state.WriteSP(conn, buf, false); err != nil {
                state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
                        return fmt.Sprintf(
@@ -685,40 +655,21 @@ func (state *SPState) StartWorkers(
 ) error {
        les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
        state.fds = make(map[string]FdAndFullSize)
-       state.fileHashers = make(map[string]*HasherAndOffset)
+       state.fileHashers = make(map[string]*MTHAndOffset)
        state.isDead = make(chan struct{})
        if state.maxOnlineTime > 0 {
                state.mustFinishAt = state.started.Add(state.maxOnlineTime)
        }
-
-       // Checker
        if !state.NoCK {
-               queues := spCheckers[*state.Node.Id]
-               if queues == nil {
-                       queues = &SPCheckerQueues{
-                               appeared: make(chan *[32]byte),
-                               checked:  make(chan *[32]byte),
-                       }
-                       spCheckers[*state.Node.Id] = queues
-                       go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
-               }
-               state.checkerQueues = *queues
+               spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
                go func() {
                        for job := range state.Ctx.JobsNoCK(state.Node.Id) {
                                if job.PktEnc.Nice <= state.Nice {
-                                       state.checkerQueues.appeared <- job.HshValue
-                               }
-                       }
-               }()
-               state.wg.Add(1)
-               go func() {
-                       defer state.wg.Done()
-                       for {
-                               select {
-                               case <-state.isDead:
-                                       return
-                               case hsh := <-state.checkerQueues.checked:
-                                       state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
+                                       spCheckerTasks <- SPCheckerTask{
+                                               nodeId: state.Node.Id,
+                                               hsh:    job.HshValue,
+                                               done:   state.payloads,
+                                       }
                                }
                        }
                }()
@@ -792,13 +743,20 @@ func (state *SPState) StartWorkers(
                                pingTicker.Stop()
                                return
                        case now := <-deadlineTicker.C:
-                               if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
-                                       now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
-                                       (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
-                                       (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
-                                       state.SetDead()
-                                       conn.Close() // #nosec G104
+                               if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
+                                       now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
+                                       goto Deadlined
+                               }
+                               if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
+                                       goto Deadlined
                                }
+                               if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
+                                       goto Deadlined
+                               }
+                               break
+                       Deadlined:
+                               state.SetDead()
+                               conn.Close()
                        case now := <-pingTicker.C:
                                if now.After(state.TxLastSeen.Add(PingTimeout)) {
                                        state.wg.Add(1)
@@ -815,14 +773,21 @@ func (state *SPState) StartWorkers(
        if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
                state.wg.Add(1)
                go func() {
-                       ticker := time.NewTicker(time.Second)
+                       dw, err := state.Ctx.NewDirWatcher(
+                               filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)),
+                               time.Second,
+                       )
+                       if err != nil {
+                               state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg)
+                               log.Fatalln(err)
+                       }
                        for {
                                select {
                                case <-state.isDead:
+                                       dw.Close()
                                        state.wg.Done()
-                                       ticker.Stop()
                                        return
-                               case <-ticker.C:
+                               case <-dw.C:
                                        for _, payload := range state.Ctx.infosOur(
                                                state.Node.Id,
                                                state.Nice,
@@ -852,6 +817,7 @@ func (state *SPState) StartWorkers(
                defer conn.Close()
                defer state.SetDead()
                defer state.wg.Done()
+               buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
                for {
                        if state.NotAlive() {
                                return
@@ -920,6 +886,9 @@ func (state *SPState) StartWorkers(
                                fdAndFullSize, exists := state.fds[pth]
                                state.fdsLock.RUnlock()
                                if !exists {
+                                       state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
+                                               return logMsg(les) + ": opening"
+                                       })
                                        fd, err := os.Open(pth)
                                        if err != nil {
                                                state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
@@ -941,7 +910,8 @@ func (state *SPState) StartWorkers(
                                }
                                fd := fdAndFullSize.fd
                                fullSize := fdAndFullSize.fullSize
-                               var buf []byte
+                               lesp = append(lesp, LE{"FullSize", fullSize})
+                               var bufRead []byte
                                if freq.Offset < uint64(fullSize) {
                                        state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
                                                return logMsg(les) + ": seeking"
@@ -952,7 +922,6 @@ func (state *SPState) StartWorkers(
                                                })
                                                return
                                        }
-                                       buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
                                        n, err := fd.Read(buf)
                                        if err != nil {
                                                state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
@@ -960,12 +929,13 @@ func (state *SPState) StartWorkers(
                                                })
                                                return
                                        }
-                                       buf = buf[:n]
+                                       bufRead = buf[:n]
                                        lesp = append(
                                                les,
                                                LE{"XX", string(TTx)},
                                                LE{"Pkt", pktName},
                                                LE{"Size", int64(n)},
+                                               LE{"FullSize", fullSize},
                                        )
                                        state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
                                                return fmt.Sprintf(
@@ -973,14 +943,15 @@ func (state *SPState) StartWorkers(
                                                        logMsg(les), humanize.IBytes(uint64(n)),
                                                )
                                        })
+                               } else {
+                                       state.closeFd(pth)
                                }
-                               state.closeFd(pth)
                                payload = MarshalSP(SPTypeFile, SPFile{
                                        Hash:    freq.Hash,
                                        Offset:  freq.Offset,
-                                       Payload: buf,
+                                       Payload: bufRead,
                                })
-                               ourSize := freq.Offset + uint64(len(buf))
+                               ourSize := freq.Offset + uint64(len(bufRead))
                                lesp = append(
                                        les,
                                        LE{"XX", string(TTx)},
@@ -989,26 +960,32 @@ func (state *SPState) StartWorkers(
                                        LE{"FullSize", fullSize},
                                )
                                if state.Ctx.ShowPrgrs {
+                                       state.progressBars[pktName] = struct{}{}
                                        Progress("Tx", lesp)
                                }
+                               if ourSize == uint64(fullSize) {
+                                       state.closeFd(pth)
+                                       state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
+                                               return logMsg(les) + ": finished"
+                                       })
+                                       if state.Ctx.ShowPrgrs {
+                                               delete(state.progressBars, pktName)
+                                       }
+                               }
                                state.Lock()
-                               if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
+                               for i, q := range state.queueTheir {
+                                       if *q.freq.Hash != *freq.Hash {
+                                               continue
+                                       }
                                        if ourSize == uint64(fullSize) {
-                                               state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
-                                                       return logMsg(les) + ": finished"
-                                               })
-                                               if len(state.queueTheir) > 1 {
-                                                       state.queueTheir = state.queueTheir[1:]
-                                               } else {
-                                                       state.queueTheir = state.queueTheir[:0]
-                                               }
+                                               state.queueTheir = append(
+                                                       state.queueTheir[:i],
+                                                       state.queueTheir[i+1:]...,
+                                               )
                                        } else {
-                                               state.queueTheir[0].freq.Offset += uint64(len(buf))
+                                               q.freq.Offset = ourSize
                                        }
-                               } else {
-                                       state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
-                                               return logMsg(les) + ": queue disappeared"
-                                       })
+                                       break
                                }
                                state.Unlock()
                        }
@@ -1020,8 +997,13 @@ func (state *SPState) StartWorkers(
                                )
                        }
                        state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
-                       conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
-                       if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
+                       conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
+                       ct, err := state.csOur.Encrypt(nil, nil, payload)
+                       if err != nil {
+                               state.Ctx.LogE("sp-encrypting", les, err, logMsg)
+                               return
+                       }
+                       if err := state.WriteSP(conn, ct, ping); err != nil {
                                state.Ctx.LogE("sp-sending", les, err, logMsg)
                                return
                        }
@@ -1042,7 +1024,7 @@ func (state *SPState) StartWorkers(
                                )
                        }
                        state.Ctx.LogD("sp-recv-wait", les, logMsg)
-                       conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+                       conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
                        payload, err := state.ReadSP(conn)
                        if err != nil {
                                if err == io.EOF {
@@ -1116,7 +1098,7 @@ func (state *SPState) StartWorkers(
                state.SetDead()
                state.wg.Done()
                state.SetDead()
-               conn.Close() // #nosec G104
+               conn.Close()
        }()
 
        return nil
@@ -1126,8 +1108,8 @@ func (state *SPState) Wait() {
        state.wg.Wait()
        close(state.payloads)
        close(state.pings)
-       state.dirUnlock()
        state.Duration = time.Now().Sub(state.started)
+       state.dirUnlock()
        state.RxSpeed = state.RxBytes
        state.TxSpeed = state.TxBytes
        rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
@@ -1138,6 +1120,12 @@ func (state *SPState) Wait() {
        if txDuration > 0 {
                state.TxSpeed = state.TxBytes / txDuration
        }
+       for _, s := range state.fds {
+               s.fd.Close()
+       }
+       for pktName := range state.progressBars {
+               ProgressKill(pktName)
+       }
 }
 
 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
@@ -1275,7 +1263,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                }
                                continue
                        }
-                       if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
+                       if _, err = os.Stat(filepath.Join(
+                               state.Ctx.Spool, state.Node.Id.String(), string(TRx),
+                               SeenDir, Base32Codec.EncodeToString(info.Hash[:]),
+                       )); err == nil {
                                state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
                                        return logMsg(les) + ": already seen"
                                })
@@ -1348,6 +1339,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        pktName, humanize.IBytes(uint64(len(file.Payload))),
                                )
                        }
+                       fullsize := int64(0)
+                       state.RLock()
+                       infoTheir := state.infosTheir[*file.Hash]
+                       state.RUnlock()
+                       if infoTheir == nil {
+                               state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
+                                       return logMsg(les) + ": unknown file"
+                               })
+                               continue
+                       }
+                       fullsize = int64(infoTheir.Size)
+                       lesp = append(lesp, LE{"FullSize", fullsize})
                        dirToSync := filepath.Join(
                                state.Ctx.Spool,
                                state.Node.Id.String(),
@@ -1361,6 +1364,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.fdsLock.RLock()
                        fdAndFullSize, exists := state.fds[filePathPart]
                        state.fdsLock.RUnlock()
+                       hasherAndOffset := state.fileHashers[filePath]
                        var fd *os.File
                        if exists {
                                fd = fdAndFullSize.fd
@@ -1379,12 +1383,12 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                state.fdsLock.Lock()
                                state.fds[filePathPart] = FdAndFullSize{fd: fd}
                                state.fdsLock.Unlock()
-                               if file.Offset == 0 {
-                                       h, err := blake2b.New256(nil)
-                                       if err != nil {
-                                               panic(err)
+                               if !state.NoCK {
+                                       hasherAndOffset = &MTHAndOffset{
+                                               mth:    MTHNew(fullsize, int64(file.Offset)),
+                                               offset: file.Offset,
                                        }
-                                       state.fileHashers[filePath] = &HasherAndOffset{h: h}
+                                       state.fileHashers[filePath] = hasherAndOffset
                                }
                        }
                        state.Ctx.LogD(
@@ -1410,40 +1414,35 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                state.closeFd(filePathPart)
                                return nil, err
                        }
-                       hasherAndOffset, hasherExists := state.fileHashers[filePath]
-                       if hasherExists {
+                       if hasherAndOffset != nil {
                                if hasherAndOffset.offset == file.Offset {
-                                       if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
+                                       if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
                                                panic(err)
                                        }
                                        hasherAndOffset.offset += uint64(len(file.Payload))
                                } else {
-                                       state.Ctx.LogD(
-                                               "sp-file-offset-differs", lesp,
+                                       state.Ctx.LogE(
+                                               "sp-file-offset-differs", lesp, errors.New("offset differs"),
                                                func(les LEs) string {
-                                                       return logMsg(les) + ": offset differs, deleting hasher"
+                                                       return logMsg(les) + ": deleting hasher"
                                                },
                                        )
                                        delete(state.fileHashers, filePath)
-                                       hasherExists = false
+                                       hasherAndOffset = nil
                                }
                        }
                        ourSize := int64(file.Offset + uint64(len(file.Payload)))
-                       lesp[len(lesp)-1].V = ourSize
-                       fullsize := int64(0)
-                       state.RLock()
-                       infoTheir, ok := state.infosTheir[*file.Hash]
-                       state.RUnlock()
-                       if ok {
-                               fullsize = int64(infoTheir.Size)
-                       }
-                       lesp = append(lesp, LE{"FullSize", fullsize})
+                       lesp[len(lesp)-2].V = ourSize
                        if state.Ctx.ShowPrgrs {
+                               state.progressBars[pktName] = struct{}{}
                                Progress("Rx", lesp)
                        }
                        if fullsize != ourSize {
                                continue
                        }
+                       if state.Ctx.ShowPrgrs {
+                               delete(state.progressBars, pktName)
+                       }
                        logMsg = func(les LEs) string {
                                return fmt.Sprintf(
                                        "Got packet %s %d%% (%s / %s)",
@@ -1460,59 +1459,64 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                state.closeFd(filePathPart)
                                continue
                        }
-                       if hasherExists {
-                               if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
-                                       state.Ctx.LogE(
-                                               "sp-file-bad-checksum", lesp,
-                                               errors.New("checksum mismatch"),
-                                               logMsg,
-                                       )
-                                       continue
-                               }
-                               if err = os.Rename(filePathPart, filePath); err != nil {
-                                       state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
-                                               return logMsg(les) + ": renaming"
-                                       })
-                                       continue
-                               }
-                               if err = DirSync(dirToSync); err != nil {
-                                       state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
-                                               return logMsg(les) + ": dirsyncing"
-                                       })
-                                       continue
-                               }
-                               state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
-                                       return logMsg(les) + ": done"
-                               })
-                               state.wg.Add(1)
-                               go func() {
-                                       state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
-                                       state.wg.Done()
-                               }()
-                               state.Lock()
-                               delete(state.infosTheir, *file.Hash)
-                               state.Unlock()
-                               if !state.Ctx.HdrUsage {
-                                       state.closeFd(filePathPart)
-                                       continue
-                               }
-                               if _, err = fd.Seek(0, io.SeekStart); err != nil {
-                                       state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
-                                               return logMsg(les) + ": seeking"
+                       if hasherAndOffset != nil {
+                               delete(state.fileHashers, filePath)
+                               if hasherAndOffset.mth.PreaddSize() == 0 {
+                                       if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
+                                               state.Ctx.LogE(
+                                                       "sp-file-bad-checksum", lesp,
+                                                       errors.New("checksum mismatch"),
+                                                       logMsg,
+                                               )
+                                               state.closeFd(filePathPart)
+                                               continue
+                                       }
+                                       if err = os.Rename(filePathPart, filePath); err != nil {
+                                               state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
+                                                       return logMsg(les) + ": renaming"
+                                               })
+                                               state.closeFd(filePathPart)
+                                               continue
+                                       }
+                                       if err = DirSync(dirToSync); err != nil {
+                                               state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
+                                                       return logMsg(les) + ": dirsyncing"
+                                               })
+                                               state.closeFd(filePathPart)
+                                               continue
+                                       }
+                                       state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
+                                               return logMsg(les) + ": done"
                                        })
+                                       state.wg.Add(1)
+                                       go func() {
+                                               state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
+                                               state.wg.Done()
+                                       }()
+                                       state.Lock()
+                                       delete(state.infosTheir, *file.Hash)
+                                       state.Unlock()
+                                       if !state.Ctx.HdrUsage {
+                                               continue
+                                       }
+                                       if _, err = fd.Seek(0, io.SeekStart); err != nil {
+                                               state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
+                                                       return logMsg(les) + ": seeking"
+                                               })
+                                               state.closeFd(filePathPart)
+                                               continue
+                                       }
+                                       _, pktEncRaw, err := state.Ctx.HdrRead(fd)
                                        state.closeFd(filePathPart)
+                                       if err != nil {
+                                               state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
+                                                       return logMsg(les) + ": HdrReading"
+                                               })
+                                               continue
+                                       }
+                                       state.Ctx.HdrWrite(pktEncRaw, filePath)
                                        continue
                                }
-                               _, pktEncRaw, err := state.Ctx.HdrRead(fd)
-                               state.closeFd(filePathPart)
-                               if err != nil {
-                                       state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
-                                               return logMsg(les) + ": HdrReading"
-                                       })
-                                       continue
-                               }
-                               state.Ctx.HdrWrite(pktEncRaw, filePath)
-                               continue
                        }
                        state.closeFd(filePathPart)
                        if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
@@ -1533,9 +1537,17 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Lock()
                        delete(state.infosTheir, *file.Hash)
                        state.Unlock()
-                       if !state.NoCK {
-                               state.checkerQueues.appeared <- file.Hash
-                       }
+                       go func() {
+                               t := SPCheckerTask{
+                                       nodeId: state.Node.Id,
+                                       hsh:    file.Hash,
+                                       done:   state.payloads,
+                               }
+                               if hasherAndOffset != nil {
+                                       t.mth = hasherAndOffset.mth
+                               }
+                               spCheckerTasks <- t
+                       }()
 
                case SPTypeDone:
                        lesp := append(les, LE{"Type", "done"})
@@ -1575,7 +1587,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        return fmt.Sprintf("Packet %s is sent", pktName)
                                })
                                if state.Ctx.HdrUsage {
-                                       os.Remove(pth + HdrSuffix)
+                                       os.Remove(JobPath2Hdr(pth))
                                }
                        } else {
                                state.Ctx.LogE("sp-done", lesp, err, logMsg)
@@ -1678,3 +1690,41 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
        }
        return payloadsSplit(replies), nil
 }
+
+func SPChecker(ctx *Ctx) {
+       for t := range spCheckerTasks {
+               pktName := Base32Codec.EncodeToString(t.hsh[:])
+               les := LEs{
+                       {"XX", string(TRx)},
+                       {"Node", t.nodeId},
+                       {"Pkt", pktName},
+               }
+               SPCheckerWg.Add(1)
+               ctx.LogD("sp-checker", les, func(les LEs) string {
+                       return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
+               })
+               size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
+               les = append(les, LE{"Size", size})
+               if err != nil {
+                       ctx.LogE("sp-checker", les, err, func(les LEs) string {
+                               return fmt.Sprintf(
+                                       "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
+                                       humanize.IBytes(uint64(size)),
+                               )
+                       })
+                       SPCheckerWg.Done()
+                       continue
+               }
+               ctx.LogI("sp-checker-done", les, func(les LEs) string {
+                       return fmt.Sprintf(
+                               "Packet %s is retreived (%s)",
+                               pktName, humanize.IBytes(uint64(size)),
+                       )
+               })
+               SPCheckerWg.Done()
+               go func(t SPCheckerTask) {
+                       defer func() { recover() }()
+                       t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
+               }(t)
+       }
+}