X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=eca9544945b1e0d734d6b21d692cce85dc68f372;hb=HEAD;hp=3de629b59b5108b9c2d49bae226f9041cb8ccbe9;hpb=54b0f8a0e20847d666dd445bd92c282fd9ab5dec;p=nncp.git diff --git a/src/sp.go b/src/sp.go index 3de629b..3bef9c3 100644 --- a/src/sp.go +++ b/src/sp.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2023 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -23,9 +23,11 @@ import ( "errors" "fmt" "io" + "log" "os" "path/filepath" "sort" + "strconv" "sync" "time" @@ -38,6 +40,7 @@ const ( MaxSPSize = 1<<16 - 256 PartSuffix = ".part" SPHeadOverhead = 4 + CfgDeadline = "NNCPDEADLINE" ) type MTHAndOffset struct { @@ -131,6 +134,14 @@ type ConnDeadlined interface { } func init() { + if v := os.Getenv(CfgDeadline); v != "" { + i, err := strconv.Atoi(v) + if err != nil { + log.Fatalln("Can not convert", CfgDeadline, "to integer:", err) + } + DefaultDeadline = time.Duration(i) * time.Second + } + var buf bytes.Buffer spHead := SPHead{Type: SPTypeHalt} if _, err := xdr.Marshal(&buf, spHead); err != nil { @@ -276,21 +287,22 @@ func (state *SPState) dirUnlock() { func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { state.writeSPBuf.Reset() - n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ + if _, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ Magic: MagicNNCPSv1.B, Payload: payload, - }) + }); err != nil { + return err + } + n, err := dst.Write(state.writeSPBuf.Bytes()) if err != nil { return err } - if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil { - state.TxLastSeen = time.Now() - state.TxBytes += int64(n) - if !ping { - state.TxLastNonPing = state.TxLastSeen - } + state.TxLastSeen = time.Now() + state.TxBytes += int64(n) + if !ping { + state.TxLastNonPing = state.TxLastSeen } - return err + return nil } func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { @@ -772,14 +784,21 @@ func (state *SPState) StartWorkers( if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { state.wg.Add(1) go func() { - ticker := time.NewTicker(time.Second) + dw, err := state.Ctx.NewDirWatcher( + filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)), + time.Second, + ) + if err != nil { + state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg) + log.Fatalln(err) + } for { select { case <-state.isDead: + dw.Close() state.wg.Done() - ticker.Stop() return - case <-ticker.C: + case <-dw.C: for _, payload := range state.Ctx.infosOur( state.Node.Id, state.Nice, @@ -1096,11 +1115,11 @@ func (state *SPState) StartWorkers( return nil } -func (state *SPState) Wait() { +func (state *SPState) Wait() bool { state.wg.Wait() close(state.payloads) close(state.pings) - state.Duration = time.Now().Sub(state.started) + state.Duration = time.Since(state.started) state.dirUnlock() state.RxSpeed = state.RxBytes state.TxSpeed = state.TxBytes @@ -1112,12 +1131,15 @@ func (state *SPState) Wait() { if txDuration > 0 { state.TxSpeed = state.TxBytes / txDuration } + nothingLeft := len(state.queueTheir) == 0 for _, s := range state.fds { + nothingLeft = false s.fd.Close() } for pktName := range state.progressBars { ProgressKill(pktName) } + return nothingLeft } func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { @@ -1255,7 +1277,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } continue } - if _, err = os.Stat(pktPath + SeenSuffix); err == nil { + if _, err = os.Stat(filepath.Join( + state.Ctx.Spool, state.Node.Id.String(), string(TRx), + SeenDir, Base32Codec.EncodeToString(info.Hash[:]), + )); err == nil { state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string { return logMsg(les) + ": already seen" }) @@ -1440,18 +1465,20 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { humanize.IBytes(uint64(fullsize)), ) } - err = fd.Sync() - if err != nil { - state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string { - return logMsg(les) + ": syncing" - }) - state.closeFd(filePathPart) - continue + if !NoSync { + err = fd.Sync() + if err != nil { + state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string { + return logMsg(les) + ": syncing" + }) + state.closeFd(filePathPart) + continue + } } if hasherAndOffset != nil { delete(state.fileHashers, filePath) if hasherAndOffset.mth.PreaddSize() == 0 { - if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 { + if !bytes.Equal(hasherAndOffset.mth.Sum(nil), file.Hash[:]) { state.Ctx.LogE( "sp-file-bad-checksum", lesp, errors.New("checksum mismatch"), @@ -1576,7 +1603,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return fmt.Sprintf("Packet %s is sent", pktName) }) if state.Ctx.HdrUsage { - os.Remove(pth + HdrSuffix) + os.Remove(JobPath2Hdr(pth)) } } else { state.Ctx.LogE("sp-done", lesp, err, logMsg)