]> Cypherpunks.ru repositories - nncp.git/commitdiff
Hash non-resumed files immediately
authorSergey Matveev <stargrave@stargrave.org>
Sat, 20 Feb 2021 14:39:53 +0000 (17:39 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sat, 20 Feb 2021 19:37:04 +0000 (22:37 +0300)
doc/news.ru.texi
doc/news.texi
src/sp.go

index 1c1a30785225c14c7cb4c4a289fb88402fe3e731..34d39d834df9926adb4fd7f9402f77594e531f29 100644 (file)
@@ -20,6 +20,10 @@ spool директории.
 суффиксом (non-checksummed), ожидая пока либо @command{nncp-check}, либо
 online демоны не выполнят проверку целостности.
 
+@item
+Оптимизация: для файлов, скачивание которых не было продолжено, сразу же
+вычисляет контрольная сумма, пропуская промежуточный @file{.nock} шаг.
+
 @end itemize
 
 @node Релиз 6.0.0
index c6ee9b95a0b3a37084d6ca227ad31c0745080a67..eb4fbbe7ef82789054ef21ef94fb3625831bc607 100644 (file)
@@ -21,6 +21,10 @@ Online downloaded files are saved with @file{.nock} (non-checksummed)
 suffix, waiting either for @command{nncp-check}, or online daemons to
 perform integrity check.
 
+@item
+Optimization: files, that are not resumed, are checksummed immediately
+during the online download, skipping @file{.nock}-intermediate step.
+
 @end itemize
 
 @node Release 6.0.0
index 26c73e482d5dce4303510ef1f41b9879ba321c1f..3ff47efc9881f1635bd5d28275e5580e723ea330 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -21,6 +21,7 @@ import (
        "bytes"
        "crypto/subtle"
        "errors"
+       "hash"
        "io"
        "os"
        "path/filepath"
@@ -30,6 +31,7 @@ import (
 
        xdr "github.com/davecgh/go-xdr/xdr2"
        "github.com/flynn/noise"
+       "golang.org/x/crypto/blake2b"
 )
 
 const (
@@ -62,6 +64,11 @@ type FdAndFullSize struct {
        fullSize int64
 }
 
+type HasherAndOffset struct {
+       h      hash.Hash
+       offset uint64
+}
+
 type SPType uint8
 
 const (
@@ -217,6 +224,7 @@ type SPState struct {
        onlyPkts       map[[32]byte]bool
        writeSPBuf     bytes.Buffer
        fds            map[string]FdAndFullSize
+       fileHashers    map[string]*HasherAndOffset
        checkerJobs    chan *[32]byte
        sync.RWMutex
 }
@@ -596,6 +604,7 @@ func (state *SPState) StartWorkers(
 ) error {
        les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
        state.fds = make(map[string]FdAndFullSize)
+       state.fileHashers = make(map[string]*HasherAndOffset)
        state.isDead = make(chan struct{})
        if state.maxOnlineTime > 0 {
                state.mustFinishAt = state.started.Add(state.maxOnlineTime)
@@ -937,8 +946,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Lock()
                        state.queueTheir = nil
                        state.Unlock()
+
                case SPTypePing:
                        state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
+
                case SPTypeInfo:
                        infosGot = true
                        lesp := append(les, LE{"Type", "info"})
@@ -1005,6 +1016,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        SPFreq{info.Hash, uint64(offset)},
                                ))
                        }
+
                case SPTypeFile:
                        lesp := append(les, LE{"Type", "file"})
                        state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
@@ -1041,6 +1053,13 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        return nil, err
                                }
                                state.fds[filePathPart] = FdAndFullSize{fd: fd}
+                               if file.Offset == 0 {
+                                       h, err := blake2b.New256(nil)
+                                       if err != nil {
+                                               panic(err)
+                                       }
+                                       state.fileHashers[filePath] = &HasherAndOffset{h: h}
+                               }
                        }
                        state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
                        if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
@@ -1054,6 +1073,23 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                state.closeFd(filePathPart)
                                return nil, err
                        }
+                       hasherAndOffset, hasherExists := state.fileHashers[filePath]
+                       if hasherExists {
+                               if hasherAndOffset.offset == file.Offset {
+                                       if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
+                                               panic(err)
+                                       }
+                                       hasherAndOffset.offset += uint64(len(file.Payload))
+                               } else {
+                                       state.Ctx.LogE(
+                                               "sp-file", lesp,
+                                               errors.New("offset differs"),
+                                               "deleting hasher",
+                                       )
+                                       delete(state.fileHashers, filePath)
+                                       hasherExists = false
+                               }
+                       }
                        ourSize := int64(file.Offset + uint64(len(file.Payload)))
                        lesp[len(lesp)-1].V = ourSize
                        fullsize := int64(0)
@@ -1076,6 +1112,30 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                state.Ctx.LogE("sp-file", lesp, err, "sync")
                                continue
                        }
+                       if hasherExists {
+                               if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
+                                       state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
+                                       continue
+                               }
+                               if err = os.Rename(filePathPart, filePath); err != nil {
+                                       state.Ctx.LogE("sp-file", lesp, err, "rename")
+                                       continue
+                               }
+                               if err = DirSync(dirToSync); err != nil {
+                                       state.Ctx.LogE("sp-file", lesp, err, "sync")
+                                       continue
+                               }
+                               state.Ctx.LogI("sp-file", lesp, "done")
+                               state.wg.Add(1)
+                               go func() {
+                                       state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
+                                       state.wg.Done()
+                               }()
+                               state.Lock()
+                               delete(state.infosTheir, *file.Hash)
+                               state.Unlock()
+                               continue
+                       }
                        if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
                                state.Ctx.LogE("sp-file", lesp, err, "rename")
                                continue
@@ -1091,6 +1151,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        if !state.NoCK {
                                state.checkerJobs <- file.Hash
                        }
+
                case SPTypeDone:
                        lesp := append(les, LE{"Type", "done"})
                        state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
@@ -1113,6 +1174,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        } else {
                                state.Ctx.LogE("sp-done", lesp, err, "")
                        }
+
                case SPTypeFreq:
                        lesp := append(les, LE{"Type", "freq"})
                        state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
@@ -1145,6 +1207,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        } else {
                                state.Ctx.LogD("sp-process", lesp, "unknown")
                        }
+
                default:
                        state.Ctx.LogE(
                                "sp-process",