From: Sergey Matveev Date: Sat, 20 Feb 2021 14:39:53 +0000 (+0300) Subject: Hash non-resumed files immediately X-Git-Tag: v6.1.0^2~2 X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=commitdiff_plain;h=4a936a26e5a220b0b3538c0719566ddf33ffbf7f Hash non-resumed files immediately --- diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 1c1a307..34d39d8 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -20,6 +20,10 @@ spool директории. суффиксом (non-checksummed), ожидая пока либо @command{nncp-check}, либо online демоны не выполнят проверку целостности. +@item +Оптимизация: для файлов, скачивание которых не было продолжено, сразу же +вычисляет контрольная сумма, пропуская промежуточный @file{.nock} шаг. + @end itemize @node Релиз 6.0.0 diff --git a/doc/news.texi b/doc/news.texi index c6ee9b9..eb4fbbe 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -21,6 +21,10 @@ Online downloaded files are saved with @file{.nock} (non-checksummed) suffix, waiting either for @command{nncp-check}, or online daemons to perform integrity check. +@item +Optimization: files, that are not resumed, are checksummed immediately +during the online download, skipping @file{.nock}-intermediate step. + @end itemize @node Release 6.0.0 diff --git a/src/sp.go b/src/sp.go index 26c73e4..3ff47ef 100644 --- a/src/sp.go +++ b/src/sp.go @@ -21,6 +21,7 @@ import ( "bytes" "crypto/subtle" "errors" + "hash" "io" "os" "path/filepath" @@ -30,6 +31,7 @@ import ( xdr "github.com/davecgh/go-xdr/xdr2" "github.com/flynn/noise" + "golang.org/x/crypto/blake2b" ) const ( @@ -62,6 +64,11 @@ type FdAndFullSize struct { fullSize int64 } +type HasherAndOffset struct { + h hash.Hash + offset uint64 +} + type SPType uint8 const ( @@ -217,6 +224,7 @@ type SPState struct { onlyPkts map[[32]byte]bool writeSPBuf bytes.Buffer fds map[string]FdAndFullSize + fileHashers map[string]*HasherAndOffset checkerJobs chan *[32]byte sync.RWMutex } @@ -596,6 +604,7 @@ func (state *SPState) StartWorkers( ) error { les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} state.fds = make(map[string]FdAndFullSize) + state.fileHashers = make(map[string]*HasherAndOffset) state.isDead = make(chan struct{}) if state.maxOnlineTime > 0 { state.mustFinishAt = state.started.Add(state.maxOnlineTime) @@ -937,8 +946,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Lock() state.queueTheir = nil state.Unlock() + case SPTypePing: state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "") + case SPTypeInfo: infosGot = true lesp := append(les, LE{"Type", "info"}) @@ -1005,6 +1016,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { SPFreq{info.Hash, uint64(offset)}, )) } + case SPTypeFile: lesp := append(les, LE{"Type", "file"}) state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") @@ -1041,6 +1053,13 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return nil, err } state.fds[filePathPart] = FdAndFullSize{fd: fd} + if file.Offset == 0 { + h, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + state.fileHashers[filePath] = &HasherAndOffset{h: h} + } } state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking") if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { @@ -1054,6 +1073,23 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.closeFd(filePathPart) return nil, err } + hasherAndOffset, hasherExists := state.fileHashers[filePath] + if hasherExists { + if hasherAndOffset.offset == file.Offset { + if _, err = hasherAndOffset.h.Write(file.Payload); err != nil { + panic(err) + } + hasherAndOffset.offset += uint64(len(file.Payload)) + } else { + state.Ctx.LogE( + "sp-file", lesp, + errors.New("offset differs"), + "deleting hasher", + ) + delete(state.fileHashers, filePath) + hasherExists = false + } + } ourSize := int64(file.Offset + uint64(len(file.Payload))) lesp[len(lesp)-1].V = ourSize fullsize := int64(0) @@ -1076,6 +1112,30 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogE("sp-file", lesp, err, "sync") continue } + if hasherExists { + if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 { + state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "") + continue + } + if err = os.Rename(filePathPart, filePath); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "rename") + continue + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "sync") + continue + } + state.Ctx.LogI("sp-file", lesp, "done") + state.wg.Add(1) + go func() { + state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) + state.wg.Done() + }() + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + continue + } if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { state.Ctx.LogE("sp-file", lesp, err, "rename") continue @@ -1091,6 +1151,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { if !state.NoCK { state.checkerJobs <- file.Hash } + case SPTypeDone: lesp := append(les, LE{"Type", "done"}) state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") @@ -1113,6 +1174,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } else { state.Ctx.LogE("sp-done", lesp, err, "") } + case SPTypeFreq: lesp := append(les, LE{"Type", "freq"}) state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") @@ -1145,6 +1207,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } else { state.Ctx.LogD("sp-process", lesp, "unknown") } + default: state.Ctx.LogE( "sp-process",