X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=eca9544945b1e0d734d6b21d692cce85dc68f372;hb=2570c230757df9e463ea072d9110564b53031ce3;hp=1bb8f75d6e628b4b294f0b931dbdaf6d7af6b33c;hpb=4859da5e7e24cb8ba262d8e5d793b9070c2d00b6;p=nncp.git diff --git a/src/sp.go b/src/sp.go index 1bb8f75..eca9544 100644 --- a/src/sp.go +++ b/src/sp.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -23,9 +23,11 @@ import ( "errors" "fmt" "io" + "log" "os" "path/filepath" "sort" + "strconv" "sync" "time" @@ -38,6 +40,7 @@ const ( MaxSPSize = 1<<16 - 256 PartSuffix = ".part" SPHeadOverhead = 4 + CfgDeadline = "NNCPDEADLINE" ) type MTHAndOffset struct { @@ -131,6 +134,14 @@ type ConnDeadlined interface { } func init() { + if v := os.Getenv(CfgDeadline); v != "" { + i, err := strconv.Atoi(v) + if err != nil { + log.Fatalln("Can not convert", CfgDeadline, "to integer:", err) + } + DefaultDeadline = time.Duration(i) * time.Second + } + var buf bytes.Buffer spHead := SPHead{Type: SPTypeHalt} if _, err := xdr.Marshal(&buf, spHead); err != nil { @@ -438,7 +449,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { NicenessFmt(state.Nice), ) }) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-startI", les, err, func(les LEs) string { return fmt.Sprintf( @@ -457,7 +468,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { NicenessFmt(state.Nice), ) }) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) if buf, err = state.ReadSP(conn); err != nil { state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { return fmt.Sprintf( @@ -537,7 +548,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } les := LEs{{"Nice", int(state.Nice)}} state.Ctx.LogD("sp-startR", les, logMsg) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) if buf, err = state.ReadSP(conn); err != nil { state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err @@ -549,6 +560,9 @@ func (state *SPState) StartR(conn ConnDeadlined) error { var node *Node for _, n := range state.Ctx.Neigh { + if n.NoisePub == nil { + continue + } if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 { node = n break @@ -611,7 +625,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.dirUnlock() return err } - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) if err = state.WriteSP(conn, buf, false); err != nil { state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string { return fmt.Sprintf( @@ -752,7 +766,7 @@ func (state *SPState) StartWorkers( break Deadlined: state.SetDead() - conn.Close() // #nosec G104 + conn.Close() case now := <-pingTicker.C: if now.After(state.TxLastSeen.Add(PingTimeout)) { state.wg.Add(1) @@ -769,14 +783,21 @@ func (state *SPState) StartWorkers( if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { state.wg.Add(1) go func() { - ticker := time.NewTicker(time.Second) + dw, err := state.Ctx.NewDirWatcher( + filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)), + time.Second, + ) + if err != nil { + state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg) + log.Fatalln(err) + } for { select { case <-state.isDead: + dw.Close() state.wg.Done() - ticker.Stop() return - case <-ticker.C: + case <-dw.C: for _, payload := range state.Ctx.infosOur( state.Node.Id, state.Nice, @@ -806,6 +827,7 @@ func (state *SPState) StartWorkers( defer conn.Close() defer state.SetDead() defer state.wg.Done() + buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) for { if state.NotAlive() { return @@ -874,6 +896,9 @@ func (state *SPState) StartWorkers( fdAndFullSize, exists := state.fds[pth] state.fdsLock.RUnlock() if !exists { + state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string { + return logMsg(les) + ": opening" + }) fd, err := os.Open(pth) if err != nil { state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string { @@ -895,7 +920,8 @@ func (state *SPState) StartWorkers( } fd := fdAndFullSize.fd fullSize := fdAndFullSize.fullSize - var buf []byte + lesp = append(lesp, LE{"FullSize", fullSize}) + var bufRead []byte if freq.Offset < uint64(fullSize) { state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string { return logMsg(les) + ": seeking" @@ -906,7 +932,6 @@ func (state *SPState) StartWorkers( }) return } - buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string { @@ -914,12 +939,13 @@ func (state *SPState) StartWorkers( }) return } - buf = buf[:n] + bufRead = buf[:n] lesp = append( les, LE{"XX", string(TTx)}, LE{"Pkt", pktName}, LE{"Size", int64(n)}, + LE{"FullSize", fullSize}, ) state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string { return fmt.Sprintf( @@ -927,14 +953,15 @@ func (state *SPState) StartWorkers( logMsg(les), humanize.IBytes(uint64(n)), ) }) + } else { + state.closeFd(pth) } - state.closeFd(pth) payload = MarshalSP(SPTypeFile, SPFile{ Hash: freq.Hash, Offset: freq.Offset, - Payload: buf, + Payload: bufRead, }) - ourSize := freq.Offset + uint64(len(buf)) + ourSize := freq.Offset + uint64(len(bufRead)) lesp = append( les, LE{"XX", string(TTx)}, @@ -946,27 +973,29 @@ func (state *SPState) StartWorkers( state.progressBars[pktName] = struct{}{} Progress("Tx", lesp) } + if ourSize == uint64(fullSize) { + state.closeFd(pth) + state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string { + return logMsg(les) + ": finished" + }) + if state.Ctx.ShowPrgrs { + delete(state.progressBars, pktName) + } + } state.Lock() - if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { + for i, q := range state.queueTheir { + if *q.freq.Hash != *freq.Hash { + continue + } if ourSize == uint64(fullSize) { - state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string { - return logMsg(les) + ": finished" - }) - if len(state.queueTheir) > 1 { - state.queueTheir = state.queueTheir[1:] - } else { - state.queueTheir = state.queueTheir[:0] - } - if state.Ctx.ShowPrgrs { - delete(state.progressBars, pktName) - } + state.queueTheir = append( + state.queueTheir[:i], + state.queueTheir[i+1:]..., + ) } else { - state.queueTheir[0].freq.Offset += uint64(len(buf)) + q.freq.Offset = ourSize } - } else { - state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string { - return logMsg(les) + ": queue disappeared" - }) + break } state.Unlock() } @@ -978,7 +1007,7 @@ func (state *SPState) StartWorkers( ) } state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) ct, err := state.csOur.Encrypt(nil, nil, payload) if err != nil { state.Ctx.LogE("sp-encrypting", les, err, logMsg) @@ -1005,7 +1034,7 @@ func (state *SPState) StartWorkers( ) } state.Ctx.LogD("sp-recv-wait", les, logMsg) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) payload, err := state.ReadSP(conn) if err != nil { if err == io.EOF { @@ -1079,13 +1108,13 @@ func (state *SPState) StartWorkers( state.SetDead() state.wg.Done() state.SetDead() - conn.Close() // #nosec G104 + conn.Close() }() return nil } -func (state *SPState) Wait() { +func (state *SPState) Wait() bool { state.wg.Wait() close(state.payloads) close(state.pings) @@ -1101,12 +1130,15 @@ func (state *SPState) Wait() { if txDuration > 0 { state.TxSpeed = state.TxBytes / txDuration } + nothingLeft := len(state.queueTheir) == 0 for _, s := range state.fds { + nothingLeft = false s.fd.Close() } for pktName := range state.progressBars { ProgressKill(pktName) } + return nothingLeft } func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { @@ -1244,7 +1276,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } continue } - if _, err = os.Stat(pktPath + SeenSuffix); err == nil { + if _, err = os.Stat(filepath.Join( + state.Ctx.Spool, state.Node.Id.String(), string(TRx), + SeenDir, Base32Codec.EncodeToString(info.Hash[:]), + )); err == nil { state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string { return logMsg(les) + ": already seen" }) @@ -1319,9 +1354,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } fullsize := int64(0) state.RLock() - infoTheir, ok := state.infosTheir[*file.Hash] + infoTheir := state.infosTheir[*file.Hash] state.RUnlock() - if !ok { + if infoTheir == nil { state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string { return logMsg(les) + ": unknown file" }) @@ -1429,17 +1464,19 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { humanize.IBytes(uint64(fullsize)), ) } - err = fd.Sync() - if err != nil { - state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string { - return logMsg(les) + ": syncing" - }) - state.closeFd(filePathPart) - continue + if !NoSync { + err = fd.Sync() + if err != nil { + state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string { + return logMsg(les) + ": syncing" + }) + state.closeFd(filePathPart) + continue + } } if hasherAndOffset != nil { delete(state.fileHashers, filePath) - if hasherAndOffset.mth.PrependSize() == 0 { + if hasherAndOffset.mth.PreaddSize() == 0 { if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 { state.Ctx.LogE( "sp-file-bad-checksum", lesp, @@ -1515,16 +1552,17 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() - if hasherAndOffset != nil { - go func() { - spCheckerTasks <- SPCheckerTask{ - nodeId: state.Node.Id, - hsh: file.Hash, - mth: hasherAndOffset.mth, - done: state.payloads, - } - }() - } + go func() { + t := SPCheckerTask{ + nodeId: state.Node.Id, + hsh: file.Hash, + done: state.payloads, + } + if hasherAndOffset != nil { + t.mth = hasherAndOffset.mth + } + spCheckerTasks <- t + }() case SPTypeDone: lesp := append(les, LE{"Type", "done"}) @@ -1564,7 +1602,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return fmt.Sprintf("Packet %s is sent", pktName) }) if state.Ctx.HdrUsage { - os.Remove(pth + HdrSuffix) + os.Remove(JobPath2Hdr(pth)) } } else { state.Ctx.LogE("sp-done", lesp, err, logMsg)