X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=7049142421509aeedebbb9bd36f58a7b03c3d8d7;hb=0fad171c0d79ad583c0faf5427e22d1d62a0a52d;hp=00eaac6b50d58ee0dc8665fa04d2a0a54b6c913b;hpb=116c4cf07862ff127054fca98b1fdac9451fb3a0;p=nncp.git diff --git a/src/sp.go b/src/sp.go index 00eaac6..7049142 100644 --- a/src/sp.go +++ b/src/sp.go @@ -22,7 +22,6 @@ import ( "crypto/subtle" "errors" "fmt" - "hash" "io" "os" "path/filepath" @@ -33,7 +32,6 @@ import ( xdr "github.com/davecgh/go-xdr/xdr2" "github.com/dustin/go-humanize" "github.com/flynn/noise" - "golang.org/x/crypto/blake2b" ) const ( @@ -42,9 +40,16 @@ const ( SPHeadOverhead = 4 ) -type SPCheckerQueues struct { - appeared chan *[32]byte - checked chan *[32]byte +type MTHAndOffset struct { + mth *MTH + offset uint64 +} + +type SPCheckerTask struct { + nodeId *NodeId + hsh *[MTHSize]byte + mth *MTH + done chan []byte } var ( @@ -65,8 +70,9 @@ var ( DefaultDeadline = 10 * time.Second PingTimeout = time.Minute - spCheckers = make(map[NodeId]*SPCheckerQueues) - SPCheckersWg sync.WaitGroup + spCheckerTasks chan SPCheckerTask + SPCheckerWg sync.WaitGroup + spCheckerOnce sync.Once ) type FdAndFullSize struct { @@ -74,11 +80,6 @@ type FdAndFullSize struct { fullSize int64 } -type HasherAndOffset struct { - h hash.Hash - offset uint64 -} - type SPType uint8 const ( @@ -97,22 +98,22 @@ type SPHead struct { type SPInfo struct { Nice uint8 Size uint64 - Hash *[32]byte + Hash *[MTHSize]byte } type SPFreq struct { - Hash *[32]byte + Hash *[MTHSize]byte Offset uint64 } type SPFile struct { - Hash *[32]byte + Hash *[MTHSize]byte Offset uint64 Payload []byte } type SPDone struct { - Hash *[32]byte + Hash *[MTHSize]byte } type SPRaw struct { @@ -149,25 +150,26 @@ func init() { copy(SPPingMarshalized, buf.Bytes()) buf.Reset() - spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)} + spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)} if _, err := xdr.Marshal(&buf, spInfo); err != nil { panic(err) } SPInfoOverhead = buf.Len() buf.Reset() - spFreq := SPFreq{Hash: new([32]byte), Offset: 123} + spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123} if _, err := xdr.Marshal(&buf, spFreq); err != nil { panic(err) } SPFreqOverhead = buf.Len() buf.Reset() - spFile := SPFile{Hash: new([32]byte), Offset: 123} + spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123} if _, err := xdr.Marshal(&buf, spFile); err != nil { panic(err) } SPFileOverhead = buf.Len() + spCheckerTasks = make(chan SPCheckerTask) } func MarshalSP(typ SPType, sp interface{}) []byte { @@ -209,8 +211,8 @@ type SPState struct { csTheir *noise.CipherState payloads chan []byte pings chan struct{} - infosTheir map[[32]byte]*SPInfo - infosOurSeen map[[32]byte]uint8 + infosTheir map[[MTHSize]byte]*SPInfo + infosOurSeen map[[MTHSize]byte]uint8 queueTheir []*FreqWithNice wg sync.WaitGroup RxBytes int64 @@ -231,12 +233,11 @@ type SPState struct { txRate int isDead chan struct{} listOnly bool - onlyPkts map[[32]byte]bool + onlyPkts map[[MTHSize]byte]bool writeSPBuf bytes.Buffer fds map[string]FdAndFullSize fdsLock sync.RWMutex - fileHashers map[string]*HasherAndOffset - checkerQueues SPCheckerQueues + fileHashers map[string]*MTHAndOffset progressBars map[string]struct{} sync.RWMutex } @@ -275,40 +276,6 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } -func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) { - for hshValue := range appeared { - pktName := Base32Codec.EncodeToString(hshValue[:]) - les := LEs{ - {"XX", string(TRx)}, - {"Node", nodeId}, - {"Pkt", pktName}, - } - SPCheckersWg.Add(1) - ctx.LogD("sp-checker", les, func(les LEs) string { - return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName) - }) - size, err := ctx.CheckNoCK(nodeId, hshValue) - les = append(les, LE{"Size", size}) - if err != nil { - ctx.LogE("sp-checker", les, err, func(les LEs) string { - return fmt.Sprintf( - "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName, - humanize.IBytes(uint64(size)), - ) - }) - continue - } - ctx.LogI("sp-checker-done", les, func(les LEs) string { - return fmt.Sprintf( - "Packet %s is retreived (%s)", - pktName, humanize.IBytes(uint64(size)), - ) - }) - SPCheckersWg.Done() - go func(hsh *[32]byte) { checked <- hsh }(hshValue) - } -} - func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { state.writeSPBuf.Reset() n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ @@ -346,7 +313,7 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { return sp.Payload, nil } -func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte { +func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte { var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { @@ -438,8 +405,8 @@ func (state *SPState) StartI(conn ConnDeadlined) error { state.hs = hs state.payloads = make(chan []byte) state.pings = make(chan struct{}) - state.infosTheir = make(map[[32]byte]*SPInfo) - state.infosOurSeen = make(map[[32]byte]uint8) + state.infosTheir = make(map[[MTHSize]byte]*SPInfo) + state.infosOurSeen = make(map[[MTHSize]byte]uint8) state.progressBars = make(map[string]struct{}) state.started = started state.rxLock = rxLock @@ -556,8 +523,8 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.hs = hs state.payloads = make(chan []byte) state.pings = make(chan struct{}) - state.infosOurSeen = make(map[[32]byte]uint8) - state.infosTheir = make(map[[32]byte]*SPInfo) + state.infosOurSeen = make(map[[MTHSize]byte]uint8) + state.infosTheir = make(map[[MTHSize]byte]*SPInfo) state.progressBars = make(map[string]struct{}) state.started = started state.xxOnly = xxOnly @@ -686,43 +653,13 @@ func (state *SPState) StartWorkers( ) error { les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} state.fds = make(map[string]FdAndFullSize) - state.fileHashers = make(map[string]*HasherAndOffset) + state.fileHashers = make(map[string]*MTHAndOffset) state.isDead = make(chan struct{}) if state.maxOnlineTime > 0 { state.mustFinishAt = state.started.Add(state.maxOnlineTime) } - - // Checker if !state.NoCK { - queues := spCheckers[*state.Node.Id] - if queues == nil { - queues = &SPCheckerQueues{ - appeared: make(chan *[32]byte), - checked: make(chan *[32]byte), - } - spCheckers[*state.Node.Id] = queues - go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked) - } - state.checkerQueues = *queues - go func() { - for job := range state.Ctx.JobsNoCK(state.Node.Id) { - if job.PktEnc.Nice <= state.Nice { - state.checkerQueues.appeared <- job.HshValue - } - } - }() - state.wg.Add(1) - go func() { - defer state.wg.Done() - for { - select { - case <-state.isDead: - return - case hsh := <-state.checkerQueues.checked: - state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh}) - } - } - }() + spCheckerOnce.Do(func() { go SPChecker(state.Ctx) }) } // Remaining handshake payload sending @@ -1033,7 +970,12 @@ func (state *SPState) StartWorkers( } state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg) conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 - if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { + ct, err := state.csOur.Encrypt(nil, nil, payload) + if err != nil { + state.Ctx.LogE("sp-encrypting", les, err, logMsg) + return + } + if err := state.WriteSP(conn, ct, ping); err != nil { state.Ctx.LogE("sp-sending", les, err, logMsg) return } @@ -1139,7 +1081,6 @@ func (state *SPState) Wait() { close(state.payloads) close(state.pings) state.Duration = time.Now().Sub(state.started) - SPCheckersWg.Wait() state.dirUnlock() state.RxSpeed = state.RxBytes state.TxSpeed = state.TxBytes @@ -1367,6 +1308,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { pktName, humanize.IBytes(uint64(len(file.Payload))), ) } + fullsize := int64(0) + state.RLock() + infoTheir, ok := state.infosTheir[*file.Hash] + state.RUnlock() + if !ok { + state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": unknown file" + }) + continue + } + fullsize = int64(infoTheir.Size) + lesp = append(lesp, LE{"FullSize", fullsize}) dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), @@ -1380,6 +1333,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.fdsLock.RLock() fdAndFullSize, exists := state.fds[filePathPart] state.fdsLock.RUnlock() + hasherAndOffset := state.fileHashers[filePath] var fd *os.File if exists { fd = fdAndFullSize.fd @@ -1398,12 +1352,12 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.fdsLock.Lock() state.fds[filePathPart] = FdAndFullSize{fd: fd} state.fdsLock.Unlock() - if file.Offset == 0 { - h, err := blake2b.New256(nil) - if err != nil { - panic(err) + if !state.NoCK { + hasherAndOffset = &MTHAndOffset{ + mth: MTHNew(fullsize, int64(file.Offset)), + offset: file.Offset, } - state.fileHashers[filePath] = &HasherAndOffset{h: h} + state.fileHashers[filePath] = hasherAndOffset } } state.Ctx.LogD( @@ -1429,34 +1383,25 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.closeFd(filePathPart) return nil, err } - hasherAndOffset, hasherExists := state.fileHashers[filePath] - if hasherExists { + if hasherAndOffset != nil { if hasherAndOffset.offset == file.Offset { - if _, err = hasherAndOffset.h.Write(file.Payload); err != nil { + if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil { panic(err) } hasherAndOffset.offset += uint64(len(file.Payload)) } else { - state.Ctx.LogD( - "sp-file-offset-differs", lesp, + state.Ctx.LogE( + "sp-file-offset-differs", lesp, errors.New("offset differs"), func(les LEs) string { - return logMsg(les) + ": offset differs, deleting hasher" + return logMsg(les) + ": deleting hasher" }, ) delete(state.fileHashers, filePath) - hasherExists = false + hasherAndOffset = nil } } ourSize := int64(file.Offset + uint64(len(file.Payload))) - lesp[len(lesp)-1].V = ourSize - fullsize := int64(0) - state.RLock() - infoTheir, ok := state.infosTheir[*file.Hash] - state.RUnlock() - if ok { - fullsize = int64(infoTheir.Size) - } - lesp = append(lesp, LE{"FullSize", fullsize}) + lesp[len(lesp)-2].V = ourSize if state.Ctx.ShowPrgrs { state.progressBars[pktName] = struct{}{} Progress("Rx", lesp) @@ -1483,59 +1428,64 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.closeFd(filePathPart) continue } - if hasherExists { - if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 { - state.Ctx.LogE( - "sp-file-bad-checksum", lesp, - errors.New("checksum mismatch"), - logMsg, - ) - continue - } - if err = os.Rename(filePathPart, filePath); err != nil { - state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { - return logMsg(les) + ": renaming" - }) - continue - } - if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { - return logMsg(les) + ": dirsyncing" - }) - continue - } - state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string { - return logMsg(les) + ": done" - }) - state.wg.Add(1) - go func() { - state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) - state.wg.Done() - }() - state.Lock() - delete(state.infosTheir, *file.Hash) - state.Unlock() - if !state.Ctx.HdrUsage { - state.closeFd(filePathPart) - continue - } - if _, err = fd.Seek(0, io.SeekStart); err != nil { - state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { - return logMsg(les) + ": seeking" + if hasherAndOffset != nil { + delete(state.fileHashers, filePath) + if hasherAndOffset.mth.PrependSize == 0 { + if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 { + state.Ctx.LogE( + "sp-file-bad-checksum", lesp, + errors.New("checksum mismatch"), + logMsg, + ) + state.closeFd(filePathPart) + continue + } + if err = os.Rename(filePathPart, filePath); err != nil { + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) + state.closeFd(filePathPart) + continue + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) + state.closeFd(filePathPart) + continue + } + state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string { + return logMsg(les) + ": done" }) + state.wg.Add(1) + go func() { + state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) + state.wg.Done() + }() + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + if !state.Ctx.HdrUsage { + continue + } + if _, err = fd.Seek(0, io.SeekStart); err != nil { + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) + state.closeFd(filePathPart) + continue + } + _, pktEncRaw, err := state.Ctx.HdrRead(fd) state.closeFd(filePathPart) + if err != nil { + state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": HdrReading" + }) + continue + } + state.Ctx.HdrWrite(pktEncRaw, filePath) continue } - _, pktEncRaw, err := state.Ctx.HdrRead(fd) - state.closeFd(filePathPart) - if err != nil { - state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string { - return logMsg(les) + ": HdrReading" - }) - continue - } - state.Ctx.HdrWrite(pktEncRaw, filePath) - continue } state.closeFd(filePathPart) if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { @@ -1556,8 +1506,15 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() - if !state.NoCK { - state.checkerQueues.appeared <- file.Hash + if hasherAndOffset != nil { + go func() { + spCheckerTasks <- SPCheckerTask{ + nodeId: state.Node.Id, + hsh: file.Hash, + mth: hasherAndOffset.mth, + done: state.payloads, + } + }() } case SPTypeDone: @@ -1701,3 +1658,41 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } return payloadsSplit(replies), nil } + +func SPChecker(ctx *Ctx) { + for t := range spCheckerTasks { + pktName := Base32Codec.EncodeToString(t.hsh[:]) + les := LEs{ + {"XX", string(TRx)}, + {"Node", t.nodeId}, + {"Pkt", pktName}, + } + SPCheckerWg.Add(1) + ctx.LogD("sp-checker", les, func(les LEs) string { + return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName) + }) + size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth) + les = append(les, LE{"Size", size}) + if err != nil { + ctx.LogE("sp-checker", les, err, func(les LEs) string { + return fmt.Sprintf( + "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName, + humanize.IBytes(uint64(size)), + ) + }) + SPCheckerWg.Done() + continue + } + ctx.LogI("sp-checker-done", les, func(les LEs) string { + return fmt.Sprintf( + "Packet %s is retreived (%s)", + pktName, humanize.IBytes(uint64(size)), + ) + }) + SPCheckerWg.Done() + go func(t SPCheckerTask) { + defer func() { recover() }() + t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh}) + }(t) + } +}