"bytes"
"crypto/subtle"
"errors"
+ "hash"
"io"
"os"
"path/filepath"
xdr "github.com/davecgh/go-xdr/xdr2"
"github.com/flynn/noise"
+ "golang.org/x/crypto/blake2b"
)
const (
SPHeadOverhead = 4
)
+type SPCheckerQueues struct {
+ appeared chan *[32]byte
+ checked chan *[32]byte
+}
+
var (
MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
DefaultDeadline = 10 * time.Second
PingTimeout = time.Minute
- spCheckerToken chan struct{}
+ spCheckers = make(map[NodeId]*SPCheckerQueues)
)
type FdAndFullSize struct {
fullSize int64
}
+type HasherAndOffset struct {
+ h hash.Hash
+ offset uint64
+}
+
type SPType uint8
const (
panic(err)
}
SPFileOverhead = buf.Len()
- spCheckerToken = make(chan struct{}, 1)
- spCheckerToken <- struct{}{}
}
func MarshalSP(typ SPType, sp interface{}) []byte {
Ctx *Ctx
Node *Node
Nice uint8
+ NoCK bool
onlineDeadline time.Duration
maxOnlineTime time.Duration
hs *noise.HandshakeState
onlyPkts map[[32]byte]bool
writeSPBuf bytes.Buffer
fds map[string]FdAndFullSize
+ fileHashers map[string]*HasherAndOffset
+ checkerQueues SPCheckerQueues
sync.RWMutex
}
for range state.pings {
}
}()
+ go func() {
+ for _, s := range state.fds {
+ s.fd.Close()
+ }
+ }()
}
func (state *SPState) NotAlive() bool {
state.Ctx.UnlockDir(state.txLock)
}
+func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
+ for hshValue := range appeared {
+ les := LEs{
+ {"XX", string(TRx)},
+ {"Node", nodeId},
+ {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
+ }
+ ctx.LogD("sp-checker", les, "checking")
+ size, err := ctx.CheckNoCK(nodeId, hshValue)
+ les = append(les, LE{"Size", size})
+ if err != nil {
+ ctx.LogE("sp-checker", les, err, "")
+ continue
+ }
+ ctx.LogI("sp-done", les, "")
+ go func(hsh *[32]byte) { checked <- hsh }(hshValue)
+ }
+}
+
func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
state.writeSPBuf.Reset()
n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
state.infosTheir = make(map[[32]byte]*SPInfo)
state.started = started
state.xxOnly = xxOnly
+
var buf []byte
var payload []byte
state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
) error {
les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
state.fds = make(map[string]FdAndFullSize)
+ state.fileHashers = make(map[string]*HasherAndOffset)
state.isDead = make(chan struct{})
if state.maxOnlineTime > 0 {
state.mustFinishAt = state.started.Add(state.maxOnlineTime)
}
+ // Checker
+ if !state.NoCK {
+ queues := spCheckers[*state.Node.Id]
+ if queues == nil {
+ queues = &SPCheckerQueues{
+ appeared: make(chan *[32]byte),
+ checked: make(chan *[32]byte),
+ }
+ spCheckers[*state.Node.Id] = queues
+ go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
+ }
+ state.checkerQueues = *queues
+ go func() {
+ for job := range state.Ctx.JobsNoCK(state.Node.Id) {
+ if job.PktEnc.Nice <= state.Nice {
+ state.checkerQueues.appeared <- job.HshValue
+ }
+ }
+ }()
+ state.wg.Add(1)
+ go func() {
+ defer state.wg.Done()
+ for {
+ select {
+ case <-state.isDead:
+ return
+ case hsh := <-state.checkerQueues.checked:
+ state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
+ }
+ }
+ }()
+ }
+
// Remaining handshake payload sending
if len(infosPayloads) > 1 {
state.wg.Add(1)
state.wg.Done()
state.SetDead()
conn.Close() // #nosec G104
- for _, s := range state.fds {
- s.fd.Close()
- }
}()
return nil
state.Lock()
state.queueTheir = nil
state.Unlock()
+
case SPTypePing:
state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
+
case SPTypeInfo:
infosGot = true
lesp := append(les, LE{"Type", "info"})
}
continue
}
+ if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
+ state.Ctx.LogI("sp-info", lesp, "still non checksummed")
+ continue
+ }
fi, err := os.Stat(pktPath + PartSuffix)
var offset int64
if err == nil {
SPFreq{info.Hash, uint64(offset)},
))
}
+
case SPTypeFile:
lesp := append(les, LE{"Type", "file"})
state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
return nil, err
}
state.fds[filePathPart] = FdAndFullSize{fd: fd}
+ if file.Offset == 0 {
+ h, err := blake2b.New256(nil)
+ if err != nil {
+ panic(err)
+ }
+ state.fileHashers[filePath] = &HasherAndOffset{h: h}
+ }
}
state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
state.closeFd(filePathPart)
return nil, err
}
+ hasherAndOffset, hasherExists := state.fileHashers[filePath]
+ if hasherExists {
+ if hasherAndOffset.offset == file.Offset {
+ if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
+ panic(err)
+ }
+ hasherAndOffset.offset += uint64(len(file.Payload))
+ } else {
+ state.Ctx.LogE(
+ "sp-file", lesp,
+ errors.New("offset differs"),
+ "deleting hasher",
+ )
+ delete(state.fileHashers, filePath)
+ hasherExists = false
+ }
+ }
ourSize := int64(file.Offset + uint64(len(file.Payload)))
lesp[len(lesp)-1].V = ourSize
fullsize := int64(0)
if fullsize != ourSize {
continue
}
- <-spCheckerToken
- go func() {
- defer func() {
- spCheckerToken <- struct{}{}
- }()
- if err := fd.Sync(); err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "sync")
- state.closeFd(filePathPart)
- return
- }
- state.wg.Add(1)
- defer state.wg.Done()
- if _, err = fd.Seek(0, io.SeekStart); err != nil {
- state.closeFd(filePathPart)
- state.Ctx.LogE("sp-file", lesp, err, "")
- return
- }
- state.Ctx.LogD("sp-file", lesp, "checking")
- gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
+ err = fd.Sync()
+ if err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "sync")
state.closeFd(filePathPart)
- if err != nil || !gut {
+ continue
+ }
+ if hasherExists {
+ if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
- return
+ continue
}
- state.Ctx.LogI("sp-done", lesp, "")
- if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
+ if err = os.Rename(filePathPart, filePath); err != nil {
state.Ctx.LogE("sp-file", lesp, err, "rename")
- return
+ continue
}
if err = DirSync(dirToSync); err != nil {
state.Ctx.LogE("sp-file", lesp, err, "sync")
- return
+ continue
}
- state.Lock()
- delete(state.infosTheir, *file.Hash)
- state.Unlock()
+ state.Ctx.LogI("sp-file", lesp, "done")
state.wg.Add(1)
go func() {
state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
state.wg.Done()
}()
- }()
+ state.Lock()
+ delete(state.infosTheir, *file.Hash)
+ state.Unlock()
+ if !state.Ctx.HdrUsage {
+ state.closeFd(filePathPart)
+ continue
+ }
+ if _, err = fd.Seek(0, io.SeekStart); err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "seek")
+ state.closeFd(filePathPart)
+ continue
+ }
+ _, pktEncRaw, err := state.Ctx.HdrRead(fd)
+ state.closeFd(filePathPart)
+ if err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "HdrRead")
+ continue
+ }
+ state.Ctx.HdrWrite(pktEncRaw, filePath)
+ continue
+ }
+ state.closeFd(filePathPart)
+ if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "rename")
+ continue
+ }
+ if err = DirSync(dirToSync); err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "sync")
+ continue
+ }
+ state.Ctx.LogI("sp-file", lesp, "downloaded")
+ state.Lock()
+ delete(state.infosTheir, *file.Hash)
+ state.Unlock()
+ if !state.NoCK {
+ state.checkerQueues.appeared <- file.Hash
+ }
+
case SPTypeDone:
lesp := append(les, LE{"Type", "done"})
state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
return nil, err
}
lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
+ lesp = append(lesp, LE{"XX", string(TTx)})
state.Ctx.LogD("sp-done", lesp, "removing")
- err := os.Remove(filepath.Join(
+ pth := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
string(TTx),
Base32Codec.EncodeToString(done.Hash[:]),
- ))
- lesp = append(lesp, LE{"XX", string(TTx)})
- if err == nil {
+ )
+ if err = os.Remove(pth); err == nil {
state.Ctx.LogI("sp-done", lesp, "")
+ if state.Ctx.HdrUsage {
+ os.Remove(pth + HdrSuffix)
+ }
} else {
state.Ctx.LogE("sp-done", lesp, err, "")
}
+
case SPTypeFreq:
lesp := append(les, LE{"Type", "freq"})
state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
} else {
state.Ctx.LogD("sp-process", lesp, "unknown")
}
+
default:
state.Ctx.LogE(
"sp-process",