/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
"crypto/subtle"
"errors"
"fmt"
- "hash"
"io"
+ "log"
"os"
"path/filepath"
"sort"
+ "strconv"
"sync"
"time"
xdr "github.com/davecgh/go-xdr/xdr2"
"github.com/dustin/go-humanize"
"github.com/flynn/noise"
- "golang.org/x/crypto/blake2b"
)
const (
MaxSPSize = 1<<16 - 256
PartSuffix = ".part"
SPHeadOverhead = 4
+ CfgDeadline = "NNCPDEADLINE"
)
-type SPCheckerQueues struct {
- appeared chan *[32]byte
- checked chan *[32]byte
+type MTHAndOffset struct {
+ mth MTH
+ offset uint64
}
-var (
- MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
+type SPCheckerTask struct {
+ nodeId *NodeId
+ hsh *[MTHSize]byte
+ mth MTH
+ done chan []byte
+}
+var (
SPInfoOverhead int
SPFreqOverhead int
SPFileOverhead int
DefaultDeadline = 10 * time.Second
PingTimeout = time.Minute
- spCheckers = make(map[NodeId]*SPCheckerQueues)
+ spCheckerTasks chan SPCheckerTask
+ SPCheckerWg sync.WaitGroup
+ spCheckerOnce sync.Once
)
type FdAndFullSize struct {
fullSize int64
}
-type HasherAndOffset struct {
- h hash.Hash
- offset uint64
-}
-
type SPType uint8
const (
type SPInfo struct {
Nice uint8
Size uint64
- Hash *[32]byte
+ Hash *[MTHSize]byte
}
type SPFreq struct {
- Hash *[32]byte
+ Hash *[MTHSize]byte
Offset uint64
}
type SPFile struct {
- Hash *[32]byte
+ Hash *[MTHSize]byte
Offset uint64
Payload []byte
}
type SPDone struct {
- Hash *[32]byte
+ Hash *[MTHSize]byte
}
type SPRaw struct {
}
func init() {
+ if v := os.Getenv(CfgDeadline); v != "" {
+ i, err := strconv.Atoi(v)
+ if err != nil {
+ log.Fatalln("Can not convert", CfgDeadline, "to integer:", err)
+ }
+ DefaultDeadline = time.Duration(i) * time.Second
+ }
+
var buf bytes.Buffer
spHead := SPHead{Type: SPTypeHalt}
if _, err := xdr.Marshal(&buf, spHead); err != nil {
copy(SPPingMarshalized, buf.Bytes())
buf.Reset()
- spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
+ spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
if _, err := xdr.Marshal(&buf, spInfo); err != nil {
panic(err)
}
SPInfoOverhead = buf.Len()
buf.Reset()
- spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
+ spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
if _, err := xdr.Marshal(&buf, spFreq); err != nil {
panic(err)
}
SPFreqOverhead = buf.Len()
buf.Reset()
- spFile := SPFile{Hash: new([32]byte), Offset: 123}
+ spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
if _, err := xdr.Marshal(&buf, spFile); err != nil {
panic(err)
}
SPFileOverhead = buf.Len()
+ spCheckerTasks = make(chan SPCheckerTask)
}
func MarshalSP(typ SPType, sp interface{}) []byte {
csTheir *noise.CipherState
payloads chan []byte
pings chan struct{}
- infosTheir map[[32]byte]*SPInfo
- infosOurSeen map[[32]byte]uint8
+ infosTheir map[[MTHSize]byte]*SPInfo
+ infosOurSeen map[[MTHSize]byte]uint8
queueTheir []*FreqWithNice
wg sync.WaitGroup
RxBytes int64
txRate int
isDead chan struct{}
listOnly bool
- onlyPkts map[[32]byte]bool
+ onlyPkts map[[MTHSize]byte]bool
writeSPBuf bytes.Buffer
fds map[string]FdAndFullSize
fdsLock sync.RWMutex
- fileHashers map[string]*HasherAndOffset
- checkerQueues SPCheckerQueues
+ fileHashers map[string]*MTHAndOffset
+ progressBars map[string]struct{}
sync.RWMutex
}
for range state.pings {
}
}()
- go func() {
- for _, s := range state.fds {
- s.fd.Close()
- }
- }()
}
func (state *SPState) NotAlive() bool {
state.Ctx.UnlockDir(state.txLock)
}
-func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
- for hshValue := range appeared {
- pktName := Base32Codec.EncodeToString(hshValue[:])
- les := LEs{
- {"XX", string(TRx)},
- {"Node", nodeId},
- {"Pkt", pktName},
- }
- ctx.LogD("sp-checker", les, func(les LEs) string {
- return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
- })
- size, err := ctx.CheckNoCK(nodeId, hshValue)
- les = append(les, LE{"Size", size})
- if err != nil {
- ctx.LogE("sp-checker", les, err, func(les LEs) string {
- return fmt.Sprintf(
- "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
- humanize.IBytes(uint64(size)),
- )
- })
- continue
- }
- ctx.LogI("sp-checker-done", les, func(les LEs) string {
- return fmt.Sprintf(
- "Packet %s is retreived (%s)",
- pktName, humanize.IBytes(uint64(size)),
- )
- })
- go func(hsh *[32]byte) { checked <- hsh }(hshValue)
- }
-}
-
func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
state.writeSPBuf.Reset()
n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
- Magic: MagicNNCPLv1,
+ Magic: MagicNNCPSv1.B,
Payload: payload,
})
if err != nil {
}
state.RxLastSeen = time.Now()
state.RxBytes += int64(n)
- if sp.Magic != MagicNNCPLv1 {
+ if sp.Magic != MagicNNCPSv1.B {
return nil, BadMagic
}
return sp.Payload, nil
}
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
var infos []*SPInfo
var totalSize int64
for job := range ctx.Jobs(nodeId, TTx) {
state.hs = hs
state.payloads = make(chan []byte)
state.pings = make(chan struct{})
- state.infosTheir = make(map[[32]byte]*SPInfo)
- state.infosOurSeen = make(map[[32]byte]uint8)
+ state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
+ state.infosOurSeen = make(map[[MTHSize]byte]uint8)
+ state.progressBars = make(map[string]struct{})
state.started = started
state.rxLock = rxLock
state.txLock = txLock
NicenessFmt(state.Nice),
)
})
- conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
if err = state.WriteSP(conn, buf, false); err != nil {
state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
return fmt.Sprintf(
NicenessFmt(state.Nice),
)
})
- conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
if buf, err = state.ReadSP(conn); err != nil {
state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
return fmt.Sprintf(
state.hs = hs
state.payloads = make(chan []byte)
state.pings = make(chan struct{})
- state.infosOurSeen = make(map[[32]byte]uint8)
- state.infosTheir = make(map[[32]byte]*SPInfo)
+ state.infosOurSeen = make(map[[MTHSize]byte]uint8)
+ state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
+ state.progressBars = make(map[string]struct{})
state.started = started
state.xxOnly = xxOnly
}
les := LEs{{"Nice", int(state.Nice)}}
state.Ctx.LogD("sp-startR", les, logMsg)
- conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
if buf, err = state.ReadSP(conn); err != nil {
state.Ctx.LogE("sp-startR-read", les, err, logMsg)
return err
var node *Node
for _, n := range state.Ctx.Neigh {
+ if n.NoisePub == nil {
+ continue
+ }
if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
node = n
break
state.dirUnlock()
return err
}
- conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
if err = state.WriteSP(conn, buf, false); err != nil {
state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
return fmt.Sprintf(
) error {
les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
state.fds = make(map[string]FdAndFullSize)
- state.fileHashers = make(map[string]*HasherAndOffset)
+ state.fileHashers = make(map[string]*MTHAndOffset)
state.isDead = make(chan struct{})
if state.maxOnlineTime > 0 {
state.mustFinishAt = state.started.Add(state.maxOnlineTime)
}
-
- // Checker
if !state.NoCK {
- queues := spCheckers[*state.Node.Id]
- if queues == nil {
- queues = &SPCheckerQueues{
- appeared: make(chan *[32]byte),
- checked: make(chan *[32]byte),
- }
- spCheckers[*state.Node.Id] = queues
- go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
- }
- state.checkerQueues = *queues
+ spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
go func() {
for job := range state.Ctx.JobsNoCK(state.Node.Id) {
if job.PktEnc.Nice <= state.Nice {
- state.checkerQueues.appeared <- job.HshValue
- }
- }
- }()
- state.wg.Add(1)
- go func() {
- defer state.wg.Done()
- for {
- select {
- case <-state.isDead:
- return
- case hsh := <-state.checkerQueues.checked:
- state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
+ spCheckerTasks <- SPCheckerTask{
+ nodeId: state.Node.Id,
+ hsh: job.HshValue,
+ done: state.payloads,
+ }
}
}
}()
pingTicker.Stop()
return
case now := <-deadlineTicker.C:
- if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
- now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
- (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
- (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
- state.SetDead()
- conn.Close() // #nosec G104
+ if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
+ now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
+ goto Deadlined
+ }
+ if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
+ goto Deadlined
}
+ if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
+ goto Deadlined
+ }
+ break
+ Deadlined:
+ state.SetDead()
+ conn.Close()
case now := <-pingTicker.C:
if now.After(state.TxLastSeen.Add(PingTimeout)) {
state.wg.Add(1)
if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
state.wg.Add(1)
go func() {
- ticker := time.NewTicker(time.Second)
+ dw, err := state.Ctx.NewDirWatcher(
+ filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)),
+ time.Second,
+ )
+ if err != nil {
+ state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg)
+ log.Fatalln(err)
+ }
for {
select {
case <-state.isDead:
+ dw.Close()
state.wg.Done()
- ticker.Stop()
return
- case <-ticker.C:
+ case <-dw.C:
for _, payload := range state.Ctx.infosOur(
state.Node.Id,
state.Nice,
defer conn.Close()
defer state.SetDead()
defer state.wg.Done()
+ buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
for {
if state.NotAlive() {
return
fdAndFullSize, exists := state.fds[pth]
state.fdsLock.RUnlock()
if !exists {
+ state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
+ return logMsg(les) + ": opening"
+ })
fd, err := os.Open(pth)
if err != nil {
state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
}
fd := fdAndFullSize.fd
fullSize := fdAndFullSize.fullSize
- var buf []byte
+ lesp = append(lesp, LE{"FullSize", fullSize})
+ var bufRead []byte
if freq.Offset < uint64(fullSize) {
state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
return logMsg(les) + ": seeking"
})
return
}
- buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
n, err := fd.Read(buf)
if err != nil {
state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
})
return
}
- buf = buf[:n]
+ bufRead = buf[:n]
lesp = append(
les,
LE{"XX", string(TTx)},
LE{"Pkt", pktName},
LE{"Size", int64(n)},
+ LE{"FullSize", fullSize},
)
state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
return fmt.Sprintf(
logMsg(les), humanize.IBytes(uint64(n)),
)
})
+ } else {
+ state.closeFd(pth)
}
- state.closeFd(pth)
payload = MarshalSP(SPTypeFile, SPFile{
Hash: freq.Hash,
Offset: freq.Offset,
- Payload: buf,
+ Payload: bufRead,
})
- ourSize := freq.Offset + uint64(len(buf))
+ ourSize := freq.Offset + uint64(len(bufRead))
lesp = append(
les,
LE{"XX", string(TTx)},
LE{"FullSize", fullSize},
)
if state.Ctx.ShowPrgrs {
+ state.progressBars[pktName] = struct{}{}
Progress("Tx", lesp)
}
+ if ourSize == uint64(fullSize) {
+ state.closeFd(pth)
+ state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
+ return logMsg(les) + ": finished"
+ })
+ if state.Ctx.ShowPrgrs {
+ delete(state.progressBars, pktName)
+ }
+ }
state.Lock()
- if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
+ for i, q := range state.queueTheir {
+ if *q.freq.Hash != *freq.Hash {
+ continue
+ }
if ourSize == uint64(fullSize) {
- state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
- return logMsg(les) + ": finished"
- })
- if len(state.queueTheir) > 1 {
- state.queueTheir = state.queueTheir[1:]
- } else {
- state.queueTheir = state.queueTheir[:0]
- }
+ state.queueTheir = append(
+ state.queueTheir[:i],
+ state.queueTheir[i+1:]...,
+ )
} else {
- state.queueTheir[0].freq.Offset += uint64(len(buf))
+ q.freq.Offset = ourSize
}
- } else {
- state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
- return logMsg(les) + ": queue disappeared"
- })
+ break
}
state.Unlock()
}
)
}
state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
- conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
- if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
+ conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
+ ct, err := state.csOur.Encrypt(nil, nil, payload)
+ if err != nil {
+ state.Ctx.LogE("sp-encrypting", les, err, logMsg)
+ return
+ }
+ if err := state.WriteSP(conn, ct, ping); err != nil {
state.Ctx.LogE("sp-sending", les, err, logMsg)
return
}
)
}
state.Ctx.LogD("sp-recv-wait", les, logMsg)
- conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
payload, err := state.ReadSP(conn)
if err != nil {
if err == io.EOF {
state.SetDead()
state.wg.Done()
state.SetDead()
- conn.Close() // #nosec G104
+ conn.Close()
}()
return nil
state.wg.Wait()
close(state.payloads)
close(state.pings)
- state.dirUnlock()
state.Duration = time.Now().Sub(state.started)
+ state.dirUnlock()
state.RxSpeed = state.RxBytes
state.TxSpeed = state.TxBytes
rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
if txDuration > 0 {
state.TxSpeed = state.TxBytes / txDuration
}
+ for _, s := range state.fds {
+ s.fd.Close()
+ }
+ for pktName := range state.progressBars {
+ ProgressKill(pktName)
+ }
}
func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
}
continue
}
- if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
+ if _, err = os.Stat(filepath.Join(
+ state.Ctx.Spool, state.Node.Id.String(), string(TRx),
+ SeenDir, Base32Codec.EncodeToString(info.Hash[:]),
+ )); err == nil {
state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
return logMsg(les) + ": already seen"
})
pktName, humanize.IBytes(uint64(len(file.Payload))),
)
}
+ fullsize := int64(0)
+ state.RLock()
+ infoTheir := state.infosTheir[*file.Hash]
+ state.RUnlock()
+ if infoTheir == nil {
+ state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": unknown file"
+ })
+ continue
+ }
+ fullsize = int64(infoTheir.Size)
+ lesp = append(lesp, LE{"FullSize", fullsize})
dirToSync := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
state.fdsLock.RLock()
fdAndFullSize, exists := state.fds[filePathPart]
state.fdsLock.RUnlock()
+ hasherAndOffset := state.fileHashers[filePath]
var fd *os.File
if exists {
fd = fdAndFullSize.fd
state.fdsLock.Lock()
state.fds[filePathPart] = FdAndFullSize{fd: fd}
state.fdsLock.Unlock()
- if file.Offset == 0 {
- h, err := blake2b.New256(nil)
- if err != nil {
- panic(err)
+ if !state.NoCK {
+ hasherAndOffset = &MTHAndOffset{
+ mth: MTHNew(fullsize, int64(file.Offset)),
+ offset: file.Offset,
}
- state.fileHashers[filePath] = &HasherAndOffset{h: h}
+ state.fileHashers[filePath] = hasherAndOffset
}
}
state.Ctx.LogD(
state.closeFd(filePathPart)
return nil, err
}
- hasherAndOffset, hasherExists := state.fileHashers[filePath]
- if hasherExists {
+ if hasherAndOffset != nil {
if hasherAndOffset.offset == file.Offset {
- if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
+ if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
panic(err)
}
hasherAndOffset.offset += uint64(len(file.Payload))
} else {
- state.Ctx.LogD(
- "sp-file-offset-differs", lesp,
+ state.Ctx.LogE(
+ "sp-file-offset-differs", lesp, errors.New("offset differs"),
func(les LEs) string {
- return logMsg(les) + ": offset differs, deleting hasher"
+ return logMsg(les) + ": deleting hasher"
},
)
delete(state.fileHashers, filePath)
- hasherExists = false
+ hasherAndOffset = nil
}
}
ourSize := int64(file.Offset + uint64(len(file.Payload)))
- lesp[len(lesp)-1].V = ourSize
- fullsize := int64(0)
- state.RLock()
- infoTheir, ok := state.infosTheir[*file.Hash]
- state.RUnlock()
- if ok {
- fullsize = int64(infoTheir.Size)
- }
- lesp = append(lesp, LE{"FullSize", fullsize})
+ lesp[len(lesp)-2].V = ourSize
if state.Ctx.ShowPrgrs {
+ state.progressBars[pktName] = struct{}{}
Progress("Rx", lesp)
}
if fullsize != ourSize {
continue
}
+ if state.Ctx.ShowPrgrs {
+ delete(state.progressBars, pktName)
+ }
logMsg = func(les LEs) string {
return fmt.Sprintf(
"Got packet %s %d%% (%s / %s)",
humanize.IBytes(uint64(fullsize)),
)
}
- err = fd.Sync()
- if err != nil {
- state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
- return logMsg(les) + ": syncing"
- })
- state.closeFd(filePathPart)
- continue
- }
- if hasherExists {
- if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
- state.Ctx.LogE(
- "sp-file-bad-checksum", lesp,
- errors.New("checksum mismatch"),
- logMsg,
- )
- continue
- }
- if err = os.Rename(filePathPart, filePath); err != nil {
- state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
- return logMsg(les) + ": renaming"
- })
- continue
- }
- if err = DirSync(dirToSync); err != nil {
- state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
- return logMsg(les) + ": dirsyncing"
+ if !NoSync {
+ err = fd.Sync()
+ if err != nil {
+ state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": syncing"
})
- continue
- }
- state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
- return logMsg(les) + ": done"
- })
- state.wg.Add(1)
- go func() {
- state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
- state.wg.Done()
- }()
- state.Lock()
- delete(state.infosTheir, *file.Hash)
- state.Unlock()
- if !state.Ctx.HdrUsage {
state.closeFd(filePathPart)
continue
}
- if _, err = fd.Seek(0, io.SeekStart); err != nil {
- state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
- return logMsg(les) + ": seeking"
+ }
+ if hasherAndOffset != nil {
+ delete(state.fileHashers, filePath)
+ if hasherAndOffset.mth.PreaddSize() == 0 {
+ if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
+ state.Ctx.LogE(
+ "sp-file-bad-checksum", lesp,
+ errors.New("checksum mismatch"),
+ logMsg,
+ )
+ state.closeFd(filePathPart)
+ continue
+ }
+ if err = os.Rename(filePathPart, filePath); err != nil {
+ state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": renaming"
+ })
+ state.closeFd(filePathPart)
+ continue
+ }
+ if err = DirSync(dirToSync); err != nil {
+ state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": dirsyncing"
+ })
+ state.closeFd(filePathPart)
+ continue
+ }
+ state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
+ return logMsg(les) + ": done"
})
+ state.wg.Add(1)
+ go func() {
+ state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
+ state.wg.Done()
+ }()
+ state.Lock()
+ delete(state.infosTheir, *file.Hash)
+ state.Unlock()
+ if !state.Ctx.HdrUsage {
+ continue
+ }
+ if _, err = fd.Seek(0, io.SeekStart); err != nil {
+ state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": seeking"
+ })
+ state.closeFd(filePathPart)
+ continue
+ }
+ _, pktEncRaw, err := state.Ctx.HdrRead(fd)
state.closeFd(filePathPart)
+ if err != nil {
+ state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": HdrReading"
+ })
+ continue
+ }
+ state.Ctx.HdrWrite(pktEncRaw, filePath)
continue
}
- _, pktEncRaw, err := state.Ctx.HdrRead(fd)
- state.closeFd(filePathPart)
- if err != nil {
- state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
- return logMsg(les) + ": HdrReading"
- })
- continue
- }
- state.Ctx.HdrWrite(pktEncRaw, filePath)
- continue
}
state.closeFd(filePathPart)
if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
state.Lock()
delete(state.infosTheir, *file.Hash)
state.Unlock()
- if !state.NoCK {
- state.checkerQueues.appeared <- file.Hash
- }
+ go func() {
+ t := SPCheckerTask{
+ nodeId: state.Node.Id,
+ hsh: file.Hash,
+ done: state.payloads,
+ }
+ if hasherAndOffset != nil {
+ t.mth = hasherAndOffset.mth
+ }
+ spCheckerTasks <- t
+ }()
case SPTypeDone:
lesp := append(les, LE{"Type", "done"})
return fmt.Sprintf("Packet %s is sent", pktName)
})
if state.Ctx.HdrUsage {
- os.Remove(pth + HdrSuffix)
+ os.Remove(JobPath2Hdr(pth))
}
} else {
state.Ctx.LogE("sp-done", lesp, err, logMsg)
}
return payloadsSplit(replies), nil
}
+
+func SPChecker(ctx *Ctx) {
+ for t := range spCheckerTasks {
+ pktName := Base32Codec.EncodeToString(t.hsh[:])
+ les := LEs{
+ {"XX", string(TRx)},
+ {"Node", t.nodeId},
+ {"Pkt", pktName},
+ }
+ SPCheckerWg.Add(1)
+ ctx.LogD("sp-checker", les, func(les LEs) string {
+ return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
+ })
+ size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
+ les = append(les, LE{"Size", size})
+ if err != nil {
+ ctx.LogE("sp-checker", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
+ humanize.IBytes(uint64(size)),
+ )
+ })
+ SPCheckerWg.Done()
+ continue
+ }
+ ctx.LogI("sp-checker-done", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Packet %s is retreived (%s)",
+ pktName, humanize.IBytes(uint64(size)),
+ )
+ })
+ SPCheckerWg.Done()
+ go func(t SPCheckerTask) {
+ defer func() { recover() }()
+ t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
+ }(t)
+ }
+}