"bytes"
"crypto/subtle"
"errors"
+ "hash"
"io"
"os"
"path/filepath"
xdr "github.com/davecgh/go-xdr/xdr2"
"github.com/flynn/noise"
+ "golang.org/x/crypto/blake2b"
)
const (
fullSize int64
}
+type HasherAndOffset struct {
+ h hash.Hash
+ offset uint64
+}
+
type SPType uint8
const (
onlyPkts map[[32]byte]bool
writeSPBuf bytes.Buffer
fds map[string]FdAndFullSize
+ fileHashers map[string]*HasherAndOffset
checkerJobs chan *[32]byte
sync.RWMutex
}
) error {
les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
state.fds = make(map[string]FdAndFullSize)
+ state.fileHashers = make(map[string]*HasherAndOffset)
state.isDead = make(chan struct{})
if state.maxOnlineTime > 0 {
state.mustFinishAt = state.started.Add(state.maxOnlineTime)
state.Lock()
state.queueTheir = nil
state.Unlock()
+
case SPTypePing:
state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
+
case SPTypeInfo:
infosGot = true
lesp := append(les, LE{"Type", "info"})
SPFreq{info.Hash, uint64(offset)},
))
}
+
case SPTypeFile:
lesp := append(les, LE{"Type", "file"})
state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
return nil, err
}
state.fds[filePathPart] = FdAndFullSize{fd: fd}
+ if file.Offset == 0 {
+ h, err := blake2b.New256(nil)
+ if err != nil {
+ panic(err)
+ }
+ state.fileHashers[filePath] = &HasherAndOffset{h: h}
+ }
}
state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
state.closeFd(filePathPart)
return nil, err
}
+ hasherAndOffset, hasherExists := state.fileHashers[filePath]
+ if hasherExists {
+ if hasherAndOffset.offset == file.Offset {
+ if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
+ panic(err)
+ }
+ hasherAndOffset.offset += uint64(len(file.Payload))
+ } else {
+ state.Ctx.LogE(
+ "sp-file", lesp,
+ errors.New("offset differs"),
+ "deleting hasher",
+ )
+ delete(state.fileHashers, filePath)
+ hasherExists = false
+ }
+ }
ourSize := int64(file.Offset + uint64(len(file.Payload)))
lesp[len(lesp)-1].V = ourSize
fullsize := int64(0)
state.Ctx.LogE("sp-file", lesp, err, "sync")
continue
}
+ if hasherExists {
+ if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
+ state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
+ continue
+ }
+ if err = os.Rename(filePathPart, filePath); err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "rename")
+ continue
+ }
+ if err = DirSync(dirToSync); err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "sync")
+ continue
+ }
+ state.Ctx.LogI("sp-file", lesp, "done")
+ state.wg.Add(1)
+ go func() {
+ state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
+ state.wg.Done()
+ }()
+ state.Lock()
+ delete(state.infosTheir, *file.Hash)
+ state.Unlock()
+ continue
+ }
if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
state.Ctx.LogE("sp-file", lesp, err, "rename")
continue
if !state.NoCK {
state.checkerJobs <- file.Hash
}
+
case SPTypeDone:
lesp := append(les, LE{"Type", "done"})
state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
} else {
state.Ctx.LogE("sp-done", lesp, err, "")
}
+
case SPTypeFreq:
lesp := append(les, LE{"Type", "freq"})
state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
} else {
state.Ctx.LogD("sp-process", lesp, "unknown")
}
+
default:
state.Ctx.LogE(
"sp-process",