From 4859da5e7e24cb8ba262d8e5d793b9070c2d00b6 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Thu, 8 Jul 2021 19:52:26 +0300 Subject: [PATCH] Sequential MTH optimization --- doc/news.ru.texi | 5 + doc/news.texi | 5 + src/check.go | 4 +- src/cmd/nncp-hash/main.go | 14 +- src/mth.go | 260 ++++++++++++++++++++++++++++++++------ src/mth_test.go | 52 ++++++-- src/sp.go | 6 +- 7 files changed, 287 insertions(+), 59 deletions(-) diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 4f87e64..eeaca1e 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -16,6 +16,11 @@ Не отправлять multicast пакет оригинатору сообщения, очевидно точно видящего свой собственный пакет. +@item +Намного меньшее потребление памяти во время MTH хэширования когда +смещение равно нулю: когда пакет не является докачиванием, а например +проверяется @command{nncp-check} командой. + @end itemize @node Релиз 7.1.1 diff --git a/doc/news.texi b/doc/news.texi index ba822d2..adf538b 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -18,6 +18,11 @@ route, then do not ignore, but use it. Do not relay multicast packet to area message's originator, that obviously has seen its own packet. +@item +Much less memory usage during MTH hashing when offset is zero: when +packet is not resumed, but for example checked with @command{nncp-check} +command. + @end itemize @node Release 7_1_1 diff --git a/src/check.go b/src/check.go index 4f0a081..7a3d5b8 100644 --- a/src/check.go +++ b/src/check.go @@ -79,7 +79,7 @@ func (ctx *Ctx) Check(nodeId *NodeId) bool { return !(ctx.checkXxIsBad(nodeId, TRx) || ctx.checkXxIsBad(nodeId, TTx)) } -func (ctx *Ctx) CheckNoCK(nodeId *NodeId, hshValue *[MTHSize]byte, mth *MTH) (int64, error) { +func (ctx *Ctx) CheckNoCK(nodeId *NodeId, hshValue *[MTHSize]byte, mth MTH) (int64, error) { dirToSync := filepath.Join(ctx.Spool, nodeId.String(), string(TRx)) pktName := Base32Codec.EncodeToString(hshValue[:]) pktPath := filepath.Join(dirToSync, pktName) @@ -103,7 +103,7 @@ func (ctx *Ctx) CheckNoCK(nodeId *NodeId, hshValue *[MTHSize]byte, mth *MTH) (in if mth == nil { gut, err = Check(fd, size, hshValue[:], les, ctx.ShowPrgrs) } else { - mth.PktName = pktName + mth.SetPktName(pktName) if _, err = mth.PrependFrom(bufio.NewReaderSize(fd, MTHSize)); err != nil { return 0, err } diff --git a/src/cmd/nncp-hash/main.go b/src/cmd/nncp-hash/main.go index ba07232..8b90ab9 100644 --- a/src/cmd/nncp-hash/main.go +++ b/src/cmd/nncp-hash/main.go @@ -42,6 +42,7 @@ func main() { var ( fn = flag.String("file", "", "Read the file instead of stdin") seek = flag.Uint64("seek", 0, "Seek the file, hash, rewind, hash remaining") + forceFat = flag.Bool("force-fat", false, "Force MTHFat implementation usage") showPrgrs = flag.Bool("progress", false, "Progress showing") debug = flag.Bool("debug", false, "Print MTH steps calculations") version = flag.Bool("version", false, "Print version information") @@ -75,15 +76,20 @@ func main() { } size = fi.Size() } - mth := nncp.MTHNew(size, int64(*seek)) + var mth nncp.MTH + if *forceFat { + mth = nncp.MTHFatNew(size, int64(*seek)) + } else { + mth = nncp.MTHNew(size, int64(*seek)) + } var debugger sync.WaitGroup if *debug { fmt.Println("Leaf BLAKE3 key:", hex.EncodeToString(nncp.MTHLeafKey[:])) fmt.Println("Node BLAKE3 key:", hex.EncodeToString(nncp.MTHNodeKey[:])) - mth.Events = make(chan nncp.MTHEvent) + events := mth.Events() debugger.Add(1) go func() { - for e := range mth.Events { + for e := range events { var t string switch e.Type { case nncp.MTHEventAppend: @@ -121,7 +127,7 @@ func main() { log.Fatalln(err) } if *showPrgrs { - mth.PktName = *fn + mth.SetPktName(*fn) } if _, err = mth.PrependFrom(bufio.NewReaderSize(fd, nncp.MTHBlockSize)); err != nil { log.Fatalln(err) diff --git a/src/mth.go b/src/mth.go index 6cb1f36..a766925 100644 --- a/src/mth.go +++ b/src/mth.go @@ -20,6 +20,7 @@ package nncp import ( "bytes" "errors" + "hash" "io" "lukechampine.com/blake3" @@ -50,21 +51,29 @@ type MTHEvent struct { Hsh []byte } -type MTH struct { +type MTH interface { + hash.Hash + PrependFrom(r io.Reader) (int, error) + SetPktName(n string) + PrependSize() int64 + Events() chan MTHEvent +} + +type MTHFat struct { size int64 - PrependSize int64 + prependSize int64 skip int64 skipped bool hasher *blake3.Hasher hashes [][MTHSize]byte buf *bytes.Buffer finished bool - Events chan MTHEvent - PktName string + events chan MTHEvent + pktName string } -func MTHNew(size, offset int64) *MTH { - mth := MTH{ +func MTHFatNew(size, offset int64) MTH { + mth := MTHFat{ hasher: blake3.New(MTHSize, MTHLeafKey[:]), buf: bytes.NewBuffer(make([]byte, 0, 2*MTHBlockSize)), } @@ -86,19 +95,28 @@ func MTHNew(size, offset int64) *MTH { skip = size - offset } mth.size = size - mth.PrependSize = prependSize + mth.prependSize = prependSize mth.skip = skip mth.hashes = make([][MTHSize]byte, prepends, 1+size/MTHBlockSize) return &mth } -func (mth *MTH) Reset() { panic("not implemented") } +func (mth *MTHFat) Events() chan MTHEvent { + mth.events = make(chan MTHEvent) + return mth.events +} + +func (mth *MTHFat) SetPktName(pktName string) { mth.pktName = pktName } + +func (mth *MTHFat) PrependSize() int64 { return mth.prependSize } -func (mth *MTH) Size() int { return MTHSize } +func (mth *MTHFat) Reset() { panic("not implemented") } -func (mth *MTH) BlockSize() int { return MTHBlockSize } +func (mth *MTHFat) Size() int { return MTHSize } -func (mth *MTH) Write(data []byte) (int, error) { +func (mth *MTHFat) BlockSize() int { return MTHBlockSize } + +func (mth *MTHFat) Write(data []byte) (int, error) { if mth.finished { return 0, errors.New("already Sum()ed") } @@ -118,8 +136,8 @@ func (mth *MTH) Write(data []byte) (int, error) { mth.hasher.Sum(h[:0]) mth.hasher.Reset() mth.hashes = append(mth.hashes, *h) - if mth.Events != nil { - mth.Events <- MTHEvent{ + if mth.events != nil { + mth.events <- MTHEvent{ MTHEventAppend, 0, len(mth.hashes) - 1, mth.hashes[len(mth.hashes)-1][:], @@ -129,19 +147,19 @@ func (mth *MTH) Write(data []byte) (int, error) { return n, err } -func (mth *MTH) PrependFrom(r io.Reader) (int, error) { +func (mth *MTHFat) PrependFrom(r io.Reader) (int, error) { if mth.finished { return 0, errors.New("already Sum()ed") } var err error buf := make([]byte, MTHBlockSize) var i, n, read int - fullsize := mth.PrependSize - les := LEs{{"Pkt", mth.PktName}, {"FullSize", fullsize}, {"Size", 0}} - for mth.PrependSize >= MTHBlockSize { + fullsize := mth.prependSize + les := LEs{{"Pkt", mth.pktName}, {"FullSize", fullsize}, {"Size", 0}} + for mth.prependSize >= MTHBlockSize { n, err = io.ReadFull(r, buf) read += n - mth.PrependSize -= MTHBlockSize + mth.prependSize -= MTHBlockSize if err != nil { return read, err } @@ -150,30 +168,30 @@ func (mth *MTH) PrependFrom(r io.Reader) (int, error) { } mth.hasher.Sum(mth.hashes[i][:0]) mth.hasher.Reset() - if mth.Events != nil { - mth.Events <- MTHEvent{MTHEventPrepend, 0, i, mth.hashes[i][:]} + if mth.events != nil { + mth.events <- MTHEvent{MTHEventPrepend, 0, i, mth.hashes[i][:]} } - if mth.PktName != "" { + if mth.pktName != "" { les[len(les)-1].V = int64(read) Progress("check", les) } i++ } - if mth.PrependSize > 0 { - n, err = io.ReadFull(r, buf[:mth.PrependSize]) + if mth.prependSize > 0 { + n, err = io.ReadFull(r, buf[:mth.prependSize]) read += n if err != nil { return read, err } - if _, err = mth.hasher.Write(buf[:mth.PrependSize]); err != nil { + if _, err = mth.hasher.Write(buf[:mth.prependSize]); err != nil { panic(err) } mth.hasher.Sum(mth.hashes[i][:0]) mth.hasher.Reset() - if mth.Events != nil { - mth.Events <- MTHEvent{MTHEventPrepend, 0, i, mth.hashes[i][:]} + if mth.events != nil { + mth.events <- MTHEvent{MTHEventPrepend, 0, i, mth.hashes[i][:]} } - if mth.PktName != "" { + if mth.pktName != "" { les[len(les)-1].V = fullsize Progress("check", les) } @@ -181,7 +199,7 @@ func (mth *MTH) PrependFrom(r io.Reader) (int, error) { return read, nil } -func (mth *MTH) Sum(b []byte) []byte { +func (mth *MTHFat) Sum(b []byte) []byte { if mth.finished { return append(b, mth.hashes[0][:]...) } @@ -194,8 +212,8 @@ func (mth *MTH) Sum(b []byte) []byte { mth.hasher.Sum(h[:0]) mth.hasher.Reset() mth.hashes = append(mth.hashes, *h) - if mth.Events != nil { - mth.Events <- MTHEvent{ + if mth.events != nil { + mth.events <- MTHEvent{ MTHEventAppend, 0, len(mth.hashes) - 1, mth.hashes[len(mth.hashes)-1][:], @@ -211,14 +229,14 @@ func (mth *MTH) Sum(b []byte) []byte { mth.hasher.Sum(h[:0]) mth.hasher.Reset() mth.hashes = append(mth.hashes, *h) - if mth.Events != nil { - mth.Events <- MTHEvent{MTHEventAppend, 0, 0, mth.hashes[0][:]} + if mth.events != nil { + mth.events <- MTHEvent{MTHEventAppend, 0, 0, mth.hashes[0][:]} } fallthrough case 1: mth.hashes = append(mth.hashes, mth.hashes[0]) - if mth.Events != nil { - mth.Events <- MTHEvent{MTHEventAppend, 0, 1, mth.hashes[1][:]} + if mth.events != nil { + mth.events <- MTHEvent{MTHEventAppend, 0, 1, mth.hashes[1][:]} } } mth.hasher = blake3.New(MTHSize, MTHNodeKey[:]) @@ -237,8 +255,8 @@ func (mth *MTH) Sum(b []byte) []byte { mth.hasher.Sum(h[:0]) mth.hasher.Reset() hashesUp = append(hashesUp, *h) - if mth.Events != nil { - mth.Events <- MTHEvent{ + if mth.events != nil { + mth.events <- MTHEvent{ MTHEventFold, level, len(hashesUp) - 1, hashesUp[len(hashesUp)-1][:], @@ -247,8 +265,8 @@ func (mth *MTH) Sum(b []byte) []byte { } if len(mth.hashes)%2 == 1 { hashesUp = append(hashesUp, mth.hashes[len(mth.hashes)-1]) - if mth.Events != nil { - mth.Events <- MTHEvent{ + if mth.events != nil { + mth.events <- MTHEvent{ MTHEventAppend, level, len(hashesUp) - 1, hashesUp[len(hashesUp)-1][:], @@ -259,8 +277,170 @@ func (mth *MTH) Sum(b []byte) []byte { level++ } mth.finished = true - if mth.Events != nil { - close(mth.Events) + if mth.events != nil { + close(mth.events) } return append(b, mth.hashes[0][:]...) } + +type MTHSeqEnt struct { + l int + h [MTHSize]byte +} + +type MTHSeq struct { + hasherLeaf *blake3.Hasher + hasherNode *blake3.Hasher + hashes []MTHSeqEnt + buf *bytes.Buffer + events chan MTHEvent + ctrs []int + finished bool +} + +func MTHSeqNew() *MTHSeq { + mth := MTHSeq{ + hasherLeaf: blake3.New(MTHSize, MTHLeafKey[:]), + hasherNode: blake3.New(MTHSize, MTHNodeKey[:]), + buf: bytes.NewBuffer(make([]byte, 0, 2*MTHBlockSize)), + ctrs: make([]int, 1, 2), + } + return &mth +} + +func (mth *MTHSeq) Reset() { panic("not implemented") } + +func (mth *MTHSeq) Size() int { return MTHSize } + +func (mth *MTHSeq) BlockSize() int { return MTHBlockSize } + +func (mth *MTHSeq) PrependFrom(r io.Reader) (int, error) { + panic("must not reach that code") +} + +func (mth *MTHSeq) Events() chan MTHEvent { + mth.events = make(chan MTHEvent) + return mth.events +} + +func (mth *MTHSeq) SetPktName(pktName string) {} + +func (mth *MTHSeq) PrependSize() int64 { return 0 } + +func (mth *MTHSeq) leafAdd() { + ent := MTHSeqEnt{l: 0} + mth.hasherLeaf.Sum(ent.h[:0]) + mth.hasherLeaf.Reset() + mth.hashes = append(mth.hashes, ent) + if mth.events != nil { + mth.events <- MTHEvent{ + MTHEventAppend, 0, mth.ctrs[0], + mth.hashes[len(mth.hashes)-1].h[:], + } + } + mth.ctrs[0]++ +} + +func (mth *MTHSeq) incr(l int) { + if len(mth.ctrs) <= l { + mth.ctrs = append(mth.ctrs, 0) + } else { + mth.ctrs[l]++ + } +} + +func (mth *MTHSeq) fold() { + for len(mth.hashes) >= 2 { + if mth.hashes[len(mth.hashes)-2].l != mth.hashes[len(mth.hashes)-1].l { + break + } + if _, err := mth.hasherNode.Write(mth.hashes[len(mth.hashes)-2].h[:]); err != nil { + panic(err) + } + if _, err := mth.hasherNode.Write(mth.hashes[len(mth.hashes)-1].h[:]); err != nil { + panic(err) + } + mth.hashes = mth.hashes[:len(mth.hashes)-1] + end := &mth.hashes[len(mth.hashes)-1] + end.l++ + mth.incr(end.l) + mth.hasherNode.Sum(end.h[:0]) + mth.hasherNode.Reset() + if mth.events != nil { + mth.events <- MTHEvent{MTHEventFold, end.l, mth.ctrs[end.l], end.h[:]} + } + } +} + +func (mth *MTHSeq) Write(data []byte) (int, error) { + if mth.finished { + return 0, errors.New("already Sum()ed") + } + n, err := mth.buf.Write(data) + if err != nil { + return n, err + } + for mth.buf.Len() >= MTHBlockSize { + if _, err = mth.hasherLeaf.Write(mth.buf.Next(MTHBlockSize)); err != nil { + return n, err + } + mth.leafAdd() + mth.fold() + } + return n, err +} + +func (mth *MTHSeq) Sum(b []byte) []byte { + if mth.finished { + return append(b, mth.hashes[0].h[:]...) + } + if mth.buf.Len() > 0 { + if _, err := mth.hasherLeaf.Write(mth.buf.Next(MTHBlockSize)); err != nil { + panic(err) + } + mth.leafAdd() + mth.fold() + } + switch mth.ctrs[0] { + case 0: + if _, err := mth.hasherLeaf.Write(nil); err != nil { + panic(err) + } + mth.leafAdd() + fallthrough + case 1: + mth.hashes = append(mth.hashes, mth.hashes[0]) + mth.ctrs[0]++ + if mth.events != nil { + mth.events <- MTHEvent{ + MTHEventAppend, 0, mth.ctrs[0], + mth.hashes[len(mth.hashes)-1].h[:], + } + } + mth.fold() + } + for len(mth.hashes) >= 2 { + l := mth.hashes[len(mth.hashes)-2].l + mth.incr(l) + mth.hashes[len(mth.hashes)-1].l = l + if mth.events != nil { + mth.events <- MTHEvent{ + MTHEventAppend, l, mth.ctrs[l], + mth.hashes[len(mth.hashes)-1].h[:], + } + } + mth.fold() + } + mth.finished = true + if mth.events != nil { + close(mth.events) + } + return append(b, mth.hashes[0].h[:]...) +} + +func MTHNew(size, offset int64) MTH { + if offset == 0 { + return MTHSeqNew() + } + return MTHFatNew(size, offset) +} diff --git a/src/mth_test.go b/src/mth_test.go index 2d36f07..a4737c0 100644 --- a/src/mth_test.go +++ b/src/mth_test.go @@ -26,7 +26,7 @@ import ( "lukechampine.com/blake3" ) -func TestMTHSymmetric(t *testing.T) { +func TestMTHFatSymmetric(t *testing.T) { xof := blake3.New(32, nil).XOF() f := func(size uint32, offset uint32) bool { size %= 2 * 1024 * 1024 @@ -36,13 +36,13 @@ func TestMTHSymmetric(t *testing.T) { } offset = offset % size - mth := MTHNew(int64(size), 0) + mth := MTHFatNew(int64(size), 0) if _, err := io.Copy(mth, bytes.NewReader(data)); err != nil { panic(err) } hsh0 := mth.Sum(nil) - mth = MTHNew(int64(size), int64(offset)) + mth = MTHFatNew(int64(size), int64(offset)) if _, err := io.Copy(mth, bytes.NewReader(data[int(offset):])); err != nil { panic(err) } @@ -53,14 +53,14 @@ func TestMTHSymmetric(t *testing.T) { return false } - mth = MTHNew(0, 0) + mth = MTHFatNew(0, 0) mth.Write(data) if bytes.Compare(hsh0, mth.Sum(nil)) != 0 { return false } data = append(data, 0) - mth = MTHNew(int64(size)+1, 0) + mth = MTHFatNew(int64(size)+1, 0) if _, err := io.Copy(mth, bytes.NewReader(data)); err != nil { panic(err) } @@ -69,7 +69,7 @@ func TestMTHSymmetric(t *testing.T) { return false } - mth = MTHNew(int64(size)+1, int64(offset)) + mth = MTHFatNew(int64(size)+1, int64(offset)) if _, err := io.Copy(mth, bytes.NewReader(data[int(offset):])); err != nil { panic(err) } @@ -80,7 +80,7 @@ func TestMTHSymmetric(t *testing.T) { return false } - mth = MTHNew(0, 0) + mth = MTHFatNew(0, 0) mth.Write(data) if bytes.Compare(hsh00, mth.Sum(nil)) != 0 { return false @@ -93,10 +93,42 @@ func TestMTHSymmetric(t *testing.T) { } } +func TestMTHSeqAndFatEqual(t *testing.T) { + xof := blake3.New(32, nil).XOF() + f := func(size uint32, offset uint32) bool { + size %= 10 * 1024 * 1024 + data := make([]byte, int(size), int(size)+1) + if _, err := io.ReadFull(xof, data); err != nil { + panic(err) + } + fat := MTHFatNew(int64(size), 0) + if _, err := io.Copy(fat, bytes.NewReader(data)); err != nil { + panic(err) + } + hshFat := fat.Sum(nil) + seq := MTHSeqNew() + if _, err := io.Copy(seq, bytes.NewReader(data)); err != nil { + panic(err) + } + return bytes.Compare(hshFat, seq.Sum(nil)) == 0 + } + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + func TestMTHNull(t *testing.T) { - mth := MTHNew(0, 0) - if _, err := mth.Write(nil); err != nil { + fat := MTHFatNew(0, 0) + if _, err := fat.Write(nil); err != nil { t.Error(err) } - mth.Sum(nil) + hshFat := fat.Sum(nil) + + seq := MTHSeqNew() + if _, err := seq.Write(nil); err != nil { + t.Error(err) + } + if bytes.Compare(hshFat, seq.Sum(nil)) != 0 { + t.FailNow() + } } diff --git a/src/sp.go b/src/sp.go index a1f51eb..1bb8f75 100644 --- a/src/sp.go +++ b/src/sp.go @@ -41,14 +41,14 @@ const ( ) type MTHAndOffset struct { - mth *MTH + mth MTH offset uint64 } type SPCheckerTask struct { nodeId *NodeId hsh *[MTHSize]byte - mth *MTH + mth MTH done chan []byte } @@ -1439,7 +1439,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } if hasherAndOffset != nil { delete(state.fileHashers, filePath) - if hasherAndOffset.mth.PrependSize == 0 { + if hasherAndOffset.mth.PrependSize() == 0 { if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 { state.Ctx.LogE( "sp-file-bad-checksum", lesp, -- 2.44.0