X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=4db358742a17b1f584aa102ac853a1c6386c6d13;hb=0367cce2741e1ce6a89a49fd5c4e9df6005c9744;hp=00eaac6b50d58ee0dc8665fa04d2a0a54b6c913b;hpb=ab288c7da7b7d9896a422bfe9a9fb91e00316c54;p=nncp.git diff --git a/src/sp.go b/src/sp.go index 00eaac6..4db3587 100644 --- a/src/sp.go +++ b/src/sp.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -22,8 +22,8 @@ import ( "crypto/subtle" "errors" "fmt" - "hash" "io" + "log" "os" "path/filepath" "sort" @@ -33,7 +33,6 @@ import ( xdr "github.com/davecgh/go-xdr/xdr2" "github.com/dustin/go-humanize" "github.com/flynn/noise" - "golang.org/x/crypto/blake2b" ) const ( @@ -42,14 +41,19 @@ const ( SPHeadOverhead = 4 ) -type SPCheckerQueues struct { - appeared chan *[32]byte - checked chan *[32]byte +type MTHAndOffset struct { + mth MTH + offset uint64 } -var ( - MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1} +type SPCheckerTask struct { + nodeId *NodeId + hsh *[MTHSize]byte + mth MTH + done chan []byte +} +var ( SPInfoOverhead int SPFreqOverhead int SPFileOverhead int @@ -65,8 +69,9 @@ var ( DefaultDeadline = 10 * time.Second PingTimeout = time.Minute - spCheckers = make(map[NodeId]*SPCheckerQueues) - SPCheckersWg sync.WaitGroup + spCheckerTasks chan SPCheckerTask + SPCheckerWg sync.WaitGroup + spCheckerOnce sync.Once ) type FdAndFullSize struct { @@ -74,11 +79,6 @@ type FdAndFullSize struct { fullSize int64 } -type HasherAndOffset struct { - h hash.Hash - offset uint64 -} - type SPType uint8 const ( @@ -97,22 +97,22 @@ type SPHead struct { type SPInfo struct { Nice uint8 Size uint64 - Hash *[32]byte + Hash *[MTHSize]byte } type SPFreq struct { - Hash *[32]byte + Hash *[MTHSize]byte Offset uint64 } type SPFile struct { - Hash *[32]byte + Hash *[MTHSize]byte Offset uint64 Payload []byte } type SPDone struct { - Hash *[32]byte + Hash *[MTHSize]byte } type SPRaw struct { @@ -149,25 +149,26 @@ func init() { copy(SPPingMarshalized, buf.Bytes()) buf.Reset() - spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)} + spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)} if _, err := xdr.Marshal(&buf, spInfo); err != nil { panic(err) } SPInfoOverhead = buf.Len() buf.Reset() - spFreq := SPFreq{Hash: new([32]byte), Offset: 123} + spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123} if _, err := xdr.Marshal(&buf, spFreq); err != nil { panic(err) } SPFreqOverhead = buf.Len() buf.Reset() - spFile := SPFile{Hash: new([32]byte), Offset: 123} + spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123} if _, err := xdr.Marshal(&buf, spFile); err != nil { panic(err) } SPFileOverhead = buf.Len() + spCheckerTasks = make(chan SPCheckerTask) } func MarshalSP(typ SPType, sp interface{}) []byte { @@ -209,8 +210,8 @@ type SPState struct { csTheir *noise.CipherState payloads chan []byte pings chan struct{} - infosTheir map[[32]byte]*SPInfo - infosOurSeen map[[32]byte]uint8 + infosTheir map[[MTHSize]byte]*SPInfo + infosOurSeen map[[MTHSize]byte]uint8 queueTheir []*FreqWithNice wg sync.WaitGroup RxBytes int64 @@ -231,12 +232,11 @@ type SPState struct { txRate int isDead chan struct{} listOnly bool - onlyPkts map[[32]byte]bool + onlyPkts map[[MTHSize]byte]bool writeSPBuf bytes.Buffer fds map[string]FdAndFullSize fdsLock sync.RWMutex - fileHashers map[string]*HasherAndOffset - checkerQueues SPCheckerQueues + fileHashers map[string]*MTHAndOffset progressBars map[string]struct{} sync.RWMutex } @@ -275,44 +275,10 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } -func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) { - for hshValue := range appeared { - pktName := Base32Codec.EncodeToString(hshValue[:]) - les := LEs{ - {"XX", string(TRx)}, - {"Node", nodeId}, - {"Pkt", pktName}, - } - SPCheckersWg.Add(1) - ctx.LogD("sp-checker", les, func(les LEs) string { - return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName) - }) - size, err := ctx.CheckNoCK(nodeId, hshValue) - les = append(les, LE{"Size", size}) - if err != nil { - ctx.LogE("sp-checker", les, err, func(les LEs) string { - return fmt.Sprintf( - "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName, - humanize.IBytes(uint64(size)), - ) - }) - continue - } - ctx.LogI("sp-checker-done", les, func(les LEs) string { - return fmt.Sprintf( - "Packet %s is retreived (%s)", - pktName, humanize.IBytes(uint64(size)), - ) - }) - SPCheckersWg.Done() - go func(hsh *[32]byte) { checked <- hsh }(hshValue) - } -} - func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { state.writeSPBuf.Reset() n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ - Magic: MagicNNCPLv1, + Magic: MagicNNCPSv1.B, Payload: payload, }) if err != nil { @@ -340,13 +306,13 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { } state.RxLastSeen = time.Now() state.RxBytes += int64(n) - if sp.Magic != MagicNNCPLv1 { + if sp.Magic != MagicNNCPSv1.B { return nil, BadMagic } return sp.Payload, nil } -func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte { +func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte { var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { @@ -438,8 +404,8 @@ func (state *SPState) StartI(conn ConnDeadlined) error { state.hs = hs state.payloads = make(chan []byte) state.pings = make(chan struct{}) - state.infosTheir = make(map[[32]byte]*SPInfo) - state.infosOurSeen = make(map[[32]byte]uint8) + state.infosTheir = make(map[[MTHSize]byte]*SPInfo) + state.infosOurSeen = make(map[[MTHSize]byte]uint8) state.progressBars = make(map[string]struct{}) state.started = started state.rxLock = rxLock @@ -473,7 +439,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { NicenessFmt(state.Nice), ) }) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-startI", les, err, func(les LEs) string { return fmt.Sprintf( @@ -492,7 +458,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { NicenessFmt(state.Nice), ) }) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) if buf, err = state.ReadSP(conn); err != nil { state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { return fmt.Sprintf( @@ -556,8 +522,8 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.hs = hs state.payloads = make(chan []byte) state.pings = make(chan struct{}) - state.infosOurSeen = make(map[[32]byte]uint8) - state.infosTheir = make(map[[32]byte]*SPInfo) + state.infosOurSeen = make(map[[MTHSize]byte]uint8) + state.infosTheir = make(map[[MTHSize]byte]*SPInfo) state.progressBars = make(map[string]struct{}) state.started = started state.xxOnly = xxOnly @@ -572,7 +538,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } les := LEs{{"Nice", int(state.Nice)}} state.Ctx.LogD("sp-startR", les, logMsg) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) if buf, err = state.ReadSP(conn); err != nil { state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err @@ -584,6 +550,9 @@ func (state *SPState) StartR(conn ConnDeadlined) error { var node *Node for _, n := range state.Ctx.Neigh { + if n.NoisePub == nil { + continue + } if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 { node = n break @@ -646,7 +615,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.dirUnlock() return err } - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string { return fmt.Sprintf( @@ -686,40 +655,21 @@ func (state *SPState) StartWorkers( ) error { les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} state.fds = make(map[string]FdAndFullSize) - state.fileHashers = make(map[string]*HasherAndOffset) + state.fileHashers = make(map[string]*MTHAndOffset) state.isDead = make(chan struct{}) if state.maxOnlineTime > 0 { state.mustFinishAt = state.started.Add(state.maxOnlineTime) } - - // Checker if !state.NoCK { - queues := spCheckers[*state.Node.Id] - if queues == nil { - queues = &SPCheckerQueues{ - appeared: make(chan *[32]byte), - checked: make(chan *[32]byte), - } - spCheckers[*state.Node.Id] = queues - go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked) - } - state.checkerQueues = *queues + spCheckerOnce.Do(func() { go SPChecker(state.Ctx) }) go func() { for job := range state.Ctx.JobsNoCK(state.Node.Id) { if job.PktEnc.Nice <= state.Nice { - state.checkerQueues.appeared <- job.HshValue - } - } - }() - state.wg.Add(1) - go func() { - defer state.wg.Done() - for { - select { - case <-state.isDead: - return - case hsh := <-state.checkerQueues.checked: - state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh}) + spCheckerTasks <- SPCheckerTask{ + nodeId: state.Node.Id, + hsh: job.HshValue, + done: state.payloads, + } } } }() @@ -806,7 +756,7 @@ func (state *SPState) StartWorkers( break Deadlined: state.SetDead() - conn.Close() // #nosec G104 + conn.Close() case now := <-pingTicker.C: if now.After(state.TxLastSeen.Add(PingTimeout)) { state.wg.Add(1) @@ -823,14 +773,21 @@ func (state *SPState) StartWorkers( if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { state.wg.Add(1) go func() { - ticker := time.NewTicker(time.Second) + dw, err := state.Ctx.NewDirWatcher( + filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)), + time.Second, + ) + if err != nil { + state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg) + log.Fatalln(err) + } for { select { case <-state.isDead: + dw.Close() state.wg.Done() - ticker.Stop() return - case <-ticker.C: + case <-dw.C: for _, payload := range state.Ctx.infosOur( state.Node.Id, state.Nice, @@ -860,6 +817,7 @@ func (state *SPState) StartWorkers( defer conn.Close() defer state.SetDead() defer state.wg.Done() + buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) for { if state.NotAlive() { return @@ -928,6 +886,9 @@ func (state *SPState) StartWorkers( fdAndFullSize, exists := state.fds[pth] state.fdsLock.RUnlock() if !exists { + state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string { + return logMsg(les) + ": opening" + }) fd, err := os.Open(pth) if err != nil { state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string { @@ -949,7 +910,8 @@ func (state *SPState) StartWorkers( } fd := fdAndFullSize.fd fullSize := fdAndFullSize.fullSize - var buf []byte + lesp = append(lesp, LE{"FullSize", fullSize}) + var bufRead []byte if freq.Offset < uint64(fullSize) { state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string { return logMsg(les) + ": seeking" @@ -960,7 +922,6 @@ func (state *SPState) StartWorkers( }) return } - buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string { @@ -968,12 +929,13 @@ func (state *SPState) StartWorkers( }) return } - buf = buf[:n] + bufRead = buf[:n] lesp = append( les, LE{"XX", string(TTx)}, LE{"Pkt", pktName}, LE{"Size", int64(n)}, + LE{"FullSize", fullSize}, ) state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string { return fmt.Sprintf( @@ -981,14 +943,15 @@ func (state *SPState) StartWorkers( logMsg(les), humanize.IBytes(uint64(n)), ) }) + } else { + state.closeFd(pth) } - state.closeFd(pth) payload = MarshalSP(SPTypeFile, SPFile{ Hash: freq.Hash, Offset: freq.Offset, - Payload: buf, + Payload: bufRead, }) - ourSize := freq.Offset + uint64(len(buf)) + ourSize := freq.Offset + uint64(len(bufRead)) lesp = append( les, LE{"XX", string(TTx)}, @@ -1000,27 +963,29 @@ func (state *SPState) StartWorkers( state.progressBars[pktName] = struct{}{} Progress("Tx", lesp) } + if ourSize == uint64(fullSize) { + state.closeFd(pth) + state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string { + return logMsg(les) + ": finished" + }) + if state.Ctx.ShowPrgrs { + delete(state.progressBars, pktName) + } + } state.Lock() - if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { + for i, q := range state.queueTheir { + if *q.freq.Hash != *freq.Hash { + continue + } if ourSize == uint64(fullSize) { - state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string { - return logMsg(les) + ": finished" - }) - if len(state.queueTheir) > 1 { - state.queueTheir = state.queueTheir[1:] - } else { - state.queueTheir = state.queueTheir[:0] - } - if state.Ctx.ShowPrgrs { - delete(state.progressBars, pktName) - } + state.queueTheir = append( + state.queueTheir[:i], + state.queueTheir[i+1:]..., + ) } else { - state.queueTheir[0].freq.Offset += uint64(len(buf)) + q.freq.Offset = ourSize } - } else { - state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string { - return logMsg(les) + ": queue disappeared" - }) + break } state.Unlock() } @@ -1032,8 +997,13 @@ func (state *SPState) StartWorkers( ) } state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 - if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) + ct, err := state.csOur.Encrypt(nil, nil, payload) + if err != nil { + state.Ctx.LogE("sp-encrypting", les, err, logMsg) + return + } + if err := state.WriteSP(conn, ct, ping); err != nil { state.Ctx.LogE("sp-sending", les, err, logMsg) return } @@ -1054,7 +1024,7 @@ func (state *SPState) StartWorkers( ) } state.Ctx.LogD("sp-recv-wait", les, logMsg) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) payload, err := state.ReadSP(conn) if err != nil { if err == io.EOF { @@ -1128,7 +1098,7 @@ func (state *SPState) StartWorkers( state.SetDead() state.wg.Done() state.SetDead() - conn.Close() // #nosec G104 + conn.Close() }() return nil @@ -1139,7 +1109,6 @@ func (state *SPState) Wait() { close(state.payloads) close(state.pings) state.Duration = time.Now().Sub(state.started) - SPCheckersWg.Wait() state.dirUnlock() state.RxSpeed = state.RxBytes state.TxSpeed = state.TxBytes @@ -1294,7 +1263,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } continue } - if _, err = os.Stat(pktPath + SeenSuffix); err == nil { + if _, err = os.Stat(filepath.Join( + state.Ctx.Spool, state.Node.Id.String(), string(TRx), + SeenDir, Base32Codec.EncodeToString(info.Hash[:]), + )); err == nil { state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string { return logMsg(les) + ": already seen" }) @@ -1367,6 +1339,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { pktName, humanize.IBytes(uint64(len(file.Payload))), ) } + fullsize := int64(0) + state.RLock() + infoTheir := state.infosTheir[*file.Hash] + state.RUnlock() + if infoTheir == nil { + state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": unknown file" + }) + continue + } + fullsize = int64(infoTheir.Size) + lesp = append(lesp, LE{"FullSize", fullsize}) dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), @@ -1380,6 +1364,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.fdsLock.RLock() fdAndFullSize, exists := state.fds[filePathPart] state.fdsLock.RUnlock() + hasherAndOffset := state.fileHashers[filePath] var fd *os.File if exists { fd = fdAndFullSize.fd @@ -1398,12 +1383,12 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.fdsLock.Lock() state.fds[filePathPart] = FdAndFullSize{fd: fd} state.fdsLock.Unlock() - if file.Offset == 0 { - h, err := blake2b.New256(nil) - if err != nil { - panic(err) + if !state.NoCK { + hasherAndOffset = &MTHAndOffset{ + mth: MTHNew(fullsize, int64(file.Offset)), + offset: file.Offset, } - state.fileHashers[filePath] = &HasherAndOffset{h: h} + state.fileHashers[filePath] = hasherAndOffset } } state.Ctx.LogD( @@ -1429,34 +1414,25 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.closeFd(filePathPart) return nil, err } - hasherAndOffset, hasherExists := state.fileHashers[filePath] - if hasherExists { + if hasherAndOffset != nil { if hasherAndOffset.offset == file.Offset { - if _, err = hasherAndOffset.h.Write(file.Payload); err != nil { + if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil { panic(err) } hasherAndOffset.offset += uint64(len(file.Payload)) } else { - state.Ctx.LogD( - "sp-file-offset-differs", lesp, + state.Ctx.LogE( + "sp-file-offset-differs", lesp, errors.New("offset differs"), func(les LEs) string { - return logMsg(les) + ": offset differs, deleting hasher" + return logMsg(les) + ": deleting hasher" }, ) delete(state.fileHashers, filePath) - hasherExists = false + hasherAndOffset = nil } } ourSize := int64(file.Offset + uint64(len(file.Payload))) - lesp[len(lesp)-1].V = ourSize - fullsize := int64(0) - state.RLock() - infoTheir, ok := state.infosTheir[*file.Hash] - state.RUnlock() - if ok { - fullsize = int64(infoTheir.Size) - } - lesp = append(lesp, LE{"FullSize", fullsize}) + lesp[len(lesp)-2].V = ourSize if state.Ctx.ShowPrgrs { state.progressBars[pktName] = struct{}{} Progress("Rx", lesp) @@ -1483,59 +1459,64 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.closeFd(filePathPart) continue } - if hasherExists { - if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 { - state.Ctx.LogE( - "sp-file-bad-checksum", lesp, - errors.New("checksum mismatch"), - logMsg, - ) - continue - } - if err = os.Rename(filePathPart, filePath); err != nil { - state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { - return logMsg(les) + ": renaming" - }) - continue - } - if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { - return logMsg(les) + ": dirsyncing" - }) - continue - } - state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string { - return logMsg(les) + ": done" - }) - state.wg.Add(1) - go func() { - state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) - state.wg.Done() - }() - state.Lock() - delete(state.infosTheir, *file.Hash) - state.Unlock() - if !state.Ctx.HdrUsage { - state.closeFd(filePathPart) - continue - } - if _, err = fd.Seek(0, io.SeekStart); err != nil { - state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { - return logMsg(les) + ": seeking" + if hasherAndOffset != nil { + delete(state.fileHashers, filePath) + if hasherAndOffset.mth.PreaddSize() == 0 { + if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 { + state.Ctx.LogE( + "sp-file-bad-checksum", lesp, + errors.New("checksum mismatch"), + logMsg, + ) + state.closeFd(filePathPart) + continue + } + if err = os.Rename(filePathPart, filePath); err != nil { + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) + state.closeFd(filePathPart) + continue + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) + state.closeFd(filePathPart) + continue + } + state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string { + return logMsg(les) + ": done" }) + state.wg.Add(1) + go func() { + state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) + state.wg.Done() + }() + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + if !state.Ctx.HdrUsage { + continue + } + if _, err = fd.Seek(0, io.SeekStart); err != nil { + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) + state.closeFd(filePathPart) + continue + } + _, pktEncRaw, err := state.Ctx.HdrRead(fd) state.closeFd(filePathPart) + if err != nil { + state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": HdrReading" + }) + continue + } + state.Ctx.HdrWrite(pktEncRaw, filePath) continue } - _, pktEncRaw, err := state.Ctx.HdrRead(fd) - state.closeFd(filePathPart) - if err != nil { - state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string { - return logMsg(les) + ": HdrReading" - }) - continue - } - state.Ctx.HdrWrite(pktEncRaw, filePath) - continue } state.closeFd(filePathPart) if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { @@ -1556,9 +1537,17 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() - if !state.NoCK { - state.checkerQueues.appeared <- file.Hash - } + go func() { + t := SPCheckerTask{ + nodeId: state.Node.Id, + hsh: file.Hash, + done: state.payloads, + } + if hasherAndOffset != nil { + t.mth = hasherAndOffset.mth + } + spCheckerTasks <- t + }() case SPTypeDone: lesp := append(les, LE{"Type", "done"}) @@ -1598,7 +1587,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return fmt.Sprintf("Packet %s is sent", pktName) }) if state.Ctx.HdrUsage { - os.Remove(pth + HdrSuffix) + os.Remove(JobPath2Hdr(pth)) } } else { state.Ctx.LogE("sp-done", lesp, err, logMsg) @@ -1701,3 +1690,41 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } return payloadsSplit(replies), nil } + +func SPChecker(ctx *Ctx) { + for t := range spCheckerTasks { + pktName := Base32Codec.EncodeToString(t.hsh[:]) + les := LEs{ + {"XX", string(TRx)}, + {"Node", t.nodeId}, + {"Pkt", pktName}, + } + SPCheckerWg.Add(1) + ctx.LogD("sp-checker", les, func(les LEs) string { + return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName) + }) + size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth) + les = append(les, LE{"Size", size}) + if err != nil { + ctx.LogE("sp-checker", les, err, func(les LEs) string { + return fmt.Sprintf( + "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName, + humanize.IBytes(uint64(size)), + ) + }) + SPCheckerWg.Done() + continue + } + ctx.LogI("sp-checker-done", les, func(les LEs) string { + return fmt.Sprintf( + "Packet %s is retreived (%s)", + pktName, humanize.IBytes(uint64(size)), + ) + }) + SPCheckerWg.Done() + go func(t SPCheckerTask) { + defer func() { recover() }() + t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh}) + }(t) + } +}