From 97b64e04266a9abe0e468532d066bb9f36a5f0cf Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 20 Feb 2021 15:01:58 +0300 Subject: [PATCH] Intermediate .nock packets step --- doc/call.texi | 11 +++ doc/cmds.texi | 43 +++++++----- doc/news.ru.texi | 5 ++ doc/news.texi | 5 ++ doc/spool.texi | 39 +++++++---- src/call.go | 3 + src/cfg.go | 4 ++ src/check.go | 33 +++++++++ src/cmd/nncp-call/main.go | 2 + src/cmd/nncp-caller/main.go | 1 + src/cmd/nncp-cfgnew/main.go | 1 + src/cmd/nncp-check/main.go | 19 +++++- src/cmd/nncp-daemon/main.go | 7 +- src/cmd/nncp-rm/main.go | 12 +++- src/cmd/nncp-stat/main.go | 22 +++++- src/humanizer.go | 23 +++---- src/jobs.go | 23 ++++++- src/sp.go | 132 +++++++++++++++++++++++------------- 18 files changed, 284 insertions(+), 101 deletions(-) diff --git a/doc/call.texi b/doc/call.texi index 81ee9ba..fe55de5 100644 --- a/doc/call.texi +++ b/doc/call.texi @@ -31,6 +31,7 @@ calls: [ { cron: "*/5 * * * * * *" when-tx-exists: true + nock: true }, ] @end verbatim @@ -85,4 +86,14 @@ created, or skip any kind of packet processing. @item when-tx-exists Call only if packets for sending exists. +@anchor{CfgNoCK} +@item nock +NoCK (no-checksumming) tells not to do checksumming of received files, +assuming that it will be done for example with @ref{nncp-check} command +later. That can help minimizing time spent online, because HDD won't do +simultaneous reading of the data for checksumming and writing of the +received one, but just sequential writing of the file. Pay attention +that you have to make a call to remote node after checksumming is done, +to send notification about successful packet reception. + @end table diff --git a/doc/cmds.texi b/doc/cmds.texi index 6510ed8..5141b08 100644 --- a/doc/cmds.texi +++ b/doc/cmds.texi @@ -102,6 +102,7 @@ $ nncp-call [options] [-rxrate INT] [-txrate INT] [-autotoss*] + [-nock] NODE[:ADDR] [FORCEADDR] @end example @@ -114,15 +115,17 @@ transfer. If @option{-rx} option is specified then only inbound packets transmission is performed. If @option{-tx} option is specified, then -only outbound transmission is performed. @option{-onlinedeadline} -overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}. -@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime, -@emph{maxonlinetime}}. @option{-rxrate}/@option{-txrate} override -@ref{CfgXxRate, rxrate/txrate}. @option{-list} option allows you to list -packets of remote node, without any transmission. +only outbound transmission is performed. -You can specify what packets your want to download, by specifying -@option{-pkts} option with comma-separated list of packets identifiers. +@option{-onlinedeadline} overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}. +@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime, @emph{maxonlinetime}}. +@option{-rxrate}/@option{-txrate} override @ref{CfgXxRate, rxrate/txrate}. +Read @ref{CfgNoCK, more} about @option{-nock} option. + +@option{-list} option allows you to list packets of remote node, without +any transmission. You can specify what packets your want to download, by +specifying @option{-pkts} option with comma-separated list of packets +identifiers. Each @option{NODE} can contain several uniquely identified @option{ADDR}esses in @ref{CfgAddrs, configuration} file. If you do @@ -230,13 +233,16 @@ operating system. @section nncp-check @example -$ nncp-check [options] +$ nncp-check [-nock] [options] @end example Perform @ref{Spool, spool} directory integrity check. Read all files that has Base32-encoded filenames and compare it with recalculated -BLAKE2b hash output of their contents. That supplementary command is -not used often in practice, if ever. +BLAKE2b hash output of their contents. + +The most useful mode of operation is with @option{-nock} option, that +checks integrity of @file{.nock} files, renaming them to ordinary +(verified) encrypted packets. @node nncp-cronexpr @section nncp-cronexpr @@ -252,7 +258,9 @@ next time entities. @section nncp-daemon @example -$ nncp-daemon [options] [-maxconn INT] [-bind ADDR] [-inetd] [-autotoss*] +$ nncp-daemon [options] + [-maxconn INT] [-bind ADDR] [-inetd] + [-autotoss*] [-nock] @end example Start listening TCP daemon, wait for incoming connections and run @@ -278,6 +286,8 @@ uucp stream tcp6 nowait nncpuser /usr/local/bin/nncp-daemon nncp-daemon -quiet - during the call. All @option{-autotoss-*} options is the same as in @ref{nncp-toss} command. +Read @ref{CfgNoCK, more} about @option{-nock} option. + @node nncp-exec @section nncp-exec @@ -509,6 +519,7 @@ $ nncp-rm [options] -tmp $ nncp-rm [options] -lock $ nncp-rm [options] -node NODE -part $ nncp-rm [options] -node NODE -seen +$ nncp-rm [options] -node NODE -nock $ nncp-rm [options] -node NODE [-rx] [-tx] $ nncp-rm [options] -node NODE -pkt PKT @end example @@ -529,10 +540,10 @@ Base32 name) will be deleted. This is useful when you see some packet failing to be processed. @item When either @option{-rx} or @option{-tx} options are specified -(maybe both of them), then delete all packets from that given queues. If -@option{-part} is given, then delete only @file{.part}ly downloaded -ones. If @option{-seen} option is specified, then delete only -@file{.seen} files. +(maybe both of them), then delete all packets from that given queues. +@option{-part} option deletes @file{.part}ly downloaded files. +@option{-seen} option deletes @file{.seen} files. @option{-nock} option +deletes non-checksummed (non-verified) @file{.nock} files. @item @option{-dryrun} option just prints what will be deleted. diff --git a/doc/news.ru.texi b/doc/news.ru.texi index bc42c6e..1c1a307 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -15,6 +15,11 @@ spool директории. Оптимизация: не закрывать файловый дескриптор файла который мы качаем. Прежде каждый его кусочек приводил к дорогим open/close вызовам. +@item +Скачиваемые в режиме online файлы теперь сохраняются с @file{.nock} +суффиксом (non-checksummed), ожидая пока либо @command{nncp-check}, либо +online демоны не выполнят проверку целостности. + @end itemize @node Релиз 6.0.0 diff --git a/doc/news.texi b/doc/news.texi index 565f79e..c6ee9b9 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -16,6 +16,11 @@ many packets in the spool directory. Optimization: do not close file descriptor of the file we download online. Previously each chunk lead to expensive open/close calls. +@item +Online downloaded files are saved with @file{.nock} (non-checksummed) +suffix, waiting either for @command{nncp-check}, or online daemons to +perform integrity check. + @end itemize @node Release 6.0.0 diff --git a/doc/spool.texi b/doc/spool.texi index 5e7812d..aea7976 100644 --- a/doc/spool.texi +++ b/doc/spool.texi @@ -12,6 +12,7 @@ spool/2WHB...OABQ/rx/5ZIB...UMKW.part spool/2WHB...OABQ/tx.lock spool/2WHB...OABQ/toss.lock spool/BYRR...CG6Q/rx.lock +spool/BYRR...CG6Q/rx/MLZ6...Q3SQ.nock spool/BYRR...CG6Q/rx/ spool/BYRR...CG6Q/tx.lock spool/BYRR...CG6Q/tx/AQUT...DGNT.seen @@ -20,22 +21,34 @@ spool/BYRR...CG6Q/tx/VCSR...3VXX.seen spool/BYRR...CG6Q/tx/ZI5U...5RRQ @end example -Except for @file{tmp}, all other directories are Base32-encoded node -identifiers (@file{2WHB...OABQ}, @file{BYRR...CG6Q} in our example). -Each node subdirectory has @file{rx} (received, partially received and -currently unprocessed packets) and @file{tx} (for outbound packets) +@itemize + +@item Except for @file{tmp}, all other directories are Base32-encoded +node identifiers (@file{2WHB...OABQ}, @file{BYRR...CG6Q} in our example). + +@item Each node subdirectory has @file{rx} (received, partially received +and currently unprocessed packets) and @file{tx} (for outbound packets) directories. -Each @file{rx}/@file{tx} directory contains one file per encrypted +@item Each @file{rx}/@file{tx} directory contains one file per encrypted packet. Its filename is Base32 encoded BLAKE2b hash of the contents. So -it can be integrity checked at any time. @file{5ZIB...UMKW.part} is -partially received file from @file{2WHB...OABQ} node. @file{tx} -directory can not contain partially written files -- they are moved -atomically from @file{tmp}. +it can be integrity checked at any time. + +@item @file{5ZIB...UMKW.part} is partially received file from +@file{2WHB...OABQ} node. @file{tx} directory can not contain partially +written files -- they are moved atomically from @file{tmp}. -When @ref{nncp-toss} utility is called with @option{-seen} option, it -will create empty @file{XXX.seen} files, telling that some kind of +@item @file{rx} can contain received, but currently integrity unchecked +files with @file{.nock} extension. It is completely the same as an +ordinary encrypted packets, but its integrity after online download was +not done. After successful checksum verification, @file{.nock} extension +is trimmed. + +@item When @ref{nncp-toss} utility is called with @option{-seen} option, +it will create empty @file{XXX.seen} files, telling that some kind of packet was already tossed sometime. -Only one process can work with @file{rx}/@file{tx} directories at once, -so there are corresponding lock files. +@item Only one process can work with @file{rx}/@file{tx} directories at +once, so there are corresponding lock files. + +@end itemize diff --git a/src/call.go b/src/call.go index ef4d07d..2f635d6 100644 --- a/src/call.go +++ b/src/call.go @@ -34,6 +34,7 @@ type Call struct { OnlineDeadline time.Duration MaxOnlineTime time.Duration WhenTxExists bool + NoCK bool AutoToss bool AutoTossDoSeen bool @@ -51,6 +52,7 @@ func (ctx *Ctx) CallNode( rxRate, txRate int, onlineDeadline, maxOnlineTime time.Duration, listOnly bool, + noCK bool, onlyPkts map[[32]byte]bool, ) (isGood bool) { for _, addr := range addrs { @@ -78,6 +80,7 @@ func (ctx *Ctx) CallNode( rxRate: rxRate, txRate: txRate, listOnly: listOnly, + NoCK: noCK, onlyPkts: onlyPkts, } if err = state.StartI(conn); err == nil { diff --git a/src/cfg.go b/src/cfg.go index 9462433..9d94953 100644 --- a/src/cfg.go +++ b/src/cfg.go @@ -82,6 +82,7 @@ type CallJSON struct { OnlineDeadline *uint `json:"onlinedeadline,omitempty"` MaxOnlineTime *uint `json:"maxonlinetime,omitempty"` WhenTxExists *bool `json:"when-tx-exists,omitempty"` + NoCK *bool `json:"nock"` AutoToss *bool `json:"autotoss,omitempty"` AutoTossDoSeen *bool `json:"autotoss-doseen,omitempty"` @@ -284,6 +285,9 @@ func NewNode(name string, cfg NodeJSON) (*Node, error) { if callCfg.WhenTxExists != nil { call.WhenTxExists = *callCfg.WhenTxExists } + if callCfg.NoCK != nil { + call.NoCK = *callCfg.NoCK + } if callCfg.AutoToss != nil { call.AutoToss = *callCfg.AutoToss } diff --git a/src/check.go b/src/check.go index b2ea671..2569e6a 100644 --- a/src/check.go +++ b/src/check.go @@ -24,10 +24,13 @@ import ( "io" "log" "os" + "path/filepath" "golang.org/x/crypto/blake2b" ) +const NoCKSuffix = ".nock" + func Check(src io.Reader, checksum []byte, les LEs, showPrgrs bool) (bool, error) { hsh, err := blake2b.New256(nil) if err != nil { @@ -70,3 +73,33 @@ func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool { func (ctx *Ctx) Check(nodeId *NodeId) bool { return !(ctx.checkXxIsBad(nodeId, TRx) || ctx.checkXxIsBad(nodeId, TTx)) } + +func (ctx *Ctx) CheckNoCK(nodeId *NodeId, hshValue *[32]byte) (int64, error) { + dirToSync := filepath.Join(ctx.Spool, nodeId.String(), string(TRx)) + pktName := Base32Codec.EncodeToString(hshValue[:]) + pktPath := filepath.Join(dirToSync, pktName) + fd, err := os.Open(pktPath + NoCKSuffix) + if err != nil { + return 0, err + } + fi, err := fd.Stat() + if err != nil { + return 0, err + } + defer fd.Close() + size := fi.Size() + les := LEs{ + {"XX", string(TRx)}, + {"Node", nodeId}, + {"Pkt", pktName}, + {"FullSize", size}, + } + gut, err := Check(fd, hshValue[:], les, ctx.ShowPrgrs) + if err != nil || !gut { + return 0, errors.New("checksum mismatch") + } + if err = os.Rename(pktPath+NoCKSuffix, pktPath); err != nil { + return 0, err + } + return size, DirSync(dirToSync) +} diff --git a/src/cmd/nncp-call/main.go b/src/cmd/nncp-call/main.go index 9762d92..3136e3e 100644 --- a/src/cmd/nncp-call/main.go +++ b/src/cmd/nncp-call/main.go @@ -44,6 +44,7 @@ func main() { rxOnly = flag.Bool("rx", false, "Only receive packets") txOnly = flag.Bool("tx", false, "Only transmit packets") listOnly = flag.Bool("list", false, "Only list remote packets") + noCK = flag.Bool("nock", false, "Do no checksum checking") onlyPktsRaw = flag.String("pkts", "", "Recieve only that packets, comma separated") rxRate = flag.Int("rxrate", 0, "Maximal receive rate, pkts/sec") txRate = flag.Int("txrate", 0, "Maximal transmit rate, pkts/sec") @@ -185,6 +186,7 @@ func main() { onlineDeadline, maxOnlineTime, *listOnly, + *noCK, onlyPkts, ) diff --git a/src/cmd/nncp-caller/main.go b/src/cmd/nncp-caller/main.go index 245c618..7ccefdd 100644 --- a/src/cmd/nncp-caller/main.go +++ b/src/cmd/nncp-caller/main.go @@ -176,6 +176,7 @@ func main() { call.OnlineDeadline, call.MaxOnlineTime, false, + call.NoCK, nil, ) diff --git a/src/cmd/nncp-cfgnew/main.go b/src/cmd/nncp-cfgnew/main.go index 798f31c..08c4875 100644 --- a/src/cmd/nncp-cfgnew/main.go +++ b/src/cmd/nncp-cfgnew/main.go @@ -211,6 +211,7 @@ func main() { # # xx: rx # # addr: lan # # when-tx-exists: true + # # nock: true # # # # autotoss: false # # autotoss-doseen: true diff --git a/src/cmd/nncp-check/main.go b/src/cmd/nncp-check/main.go index 96ddd3f..e985b79 100644 --- a/src/cmd/nncp-check/main.go +++ b/src/cmd/nncp-check/main.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "os" + "path/filepath" "go.cypherpunks.ru/nncp/v5" ) @@ -30,12 +31,13 @@ import ( func usage() { fmt.Fprintf(os.Stderr, nncp.UsageHeader()) fmt.Fprintf(os.Stderr, "nncp-check -- verify Rx/Tx packets checksum\n\n") - fmt.Fprintf(os.Stderr, "Usage: %s [options]\nOptions:\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Usage: %s [-nock] [options]\nOptions:\n", os.Args[0]) flag.PrintDefaults() } func main() { var ( + nock = flag.Bool("nock", false, "Process .nock files") cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") nodeRaw = flag.String("node", "", "Process only that node") spoolPath = flag.String("spool", "", "Override path to spool") @@ -85,7 +87,20 @@ func main() { if nodeOnly != nil && nodeId != *nodeOnly.Id { continue } - if !ctx.Check(node.Id) { + if *nock { + for job := range ctx.JobsNoCK(node.Id) { + if _, err = ctx.CheckNoCK(node.Id, job.HshValue); err != nil { + pktName := nncp.Base32Codec.EncodeToString(job.HshValue[:]) + log.Println(filepath.Join( + ctx.Spool, + nodeId.String(), + string(nncp.TRx), + pktName+nncp.NoCKSuffix, + ), err) + isBad = true + } + } + } else if !ctx.Check(node.Id) { isBad = true } } diff --git a/src/cmd/nncp-daemon/main.go b/src/cmd/nncp-daemon/main.go index 6319deb..e3a73c9 100644 --- a/src/cmd/nncp-daemon/main.go +++ b/src/cmd/nncp-daemon/main.go @@ -70,11 +70,13 @@ func performSP( ctx *nncp.Ctx, conn nncp.ConnDeadlined, nice uint8, + noCK bool, nodeIdC chan *nncp.NodeId, ) { state := nncp.SPState{ Ctx: ctx, Nice: nice, + NoCK: noCK, } if err := state.StartR(conn); err == nil { ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected") @@ -108,6 +110,7 @@ func main() { bind = flag.String("bind", "[::]:5400", "Address to bind to") inetd = flag.Bool("inetd", false, "Is it started as inetd service") maxConn = flag.Int("maxconn", 128, "Maximal number of simultaneous connections") + noCK = flag.Bool("nock", false, "Do no checksum checking") spoolPath = flag.String("spool", "", "Override path to spool") logPath = flag.String("log", "", "Override path to logfile") quiet = flag.Bool("quiet", false, "Print only errors") @@ -160,7 +163,7 @@ func main() { os.Stderr.Close() // #nosec G104 conn := &InetdConn{os.Stdin, os.Stdout} nodeIdC := make(chan *nncp.NodeId) - go performSP(ctx, conn, nice, nodeIdC) + go performSP(ctx, conn, nice, *noCK, nodeIdC) nodeId := <-nodeIdC var autoTossFinish chan struct{} var autoTossBadCode chan bool @@ -197,7 +200,7 @@ func main() { ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted") go func(conn net.Conn) { nodeIdC := make(chan *nncp.NodeId) - go performSP(ctx, conn, nice, nodeIdC) + go performSP(ctx, conn, nice, *noCK, nodeIdC) nodeId := <-nodeIdC var autoTossFinish chan struct{} var autoTossBadCode chan bool diff --git a/src/cmd/nncp-rm/main.go b/src/cmd/nncp-rm/main.go index d5609e6..0802f51 100644 --- a/src/cmd/nncp-rm/main.go +++ b/src/cmd/nncp-rm/main.go @@ -39,6 +39,7 @@ func usage() { fmt.Fprintf(os.Stderr, " %s [options] -lock\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -part\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -seen\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [options] -node NODE -nock\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE {-rx|-tx}\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -pkt PKT\n", os.Args[0]) fmt.Fprintln(os.Stderr, "-older option's time units are: (s)econds, (m)inutes, (h)ours, (d)ays") @@ -56,6 +57,7 @@ func main() { doTx = flag.Bool("tx", false, "Process transfered packets") doPart = flag.Bool("part", false, "Remove only .part files") doSeen = flag.Bool("seen", false, "Remove only .seen files") + doNoCK = flag.Bool("nock", false, "Remove only .nock files") older = flag.String("older", "", "XXX{smhd}: only older than XXX number of time units") dryRun = flag.Bool("dryrun", false, "Do not actually remove files") pktRaw = flag.String("pkt", "", "Packet to remove") @@ -183,6 +185,13 @@ func main() { } return os.Remove(path) } + if *doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix) { + ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + if *dryRun { + return nil + } + return os.Remove(path) + } if *doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix) { ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") if *dryRun { @@ -198,6 +207,7 @@ func main() { return os.Remove(path) } if !*doSeen && + !*doNoCK && !*doPart && (*doRx || *doTx) && ((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) { @@ -210,7 +220,7 @@ func main() { return nil }) } - if *pktRaw != "" || *doRx || *doSeen || *doPart { + if *pktRaw != "" || *doRx || *doSeen || *doNoCK || *doPart { if err = remove(nncp.TRx); err != nil { log.Fatalln("Can not remove:", err) } diff --git a/src/cmd/nncp-stat/main.go b/src/cmd/nncp-stat/main.go index 994f83d..bd350c7 100644 --- a/src/cmd/nncp-stat/main.go +++ b/src/cmd/nncp-stat/main.go @@ -98,6 +98,8 @@ func main() { fmt.Println(node.Name) rxNums := make(map[uint8]int) rxBytes := make(map[uint8]int64) + noCKNums := make(map[uint8]int) + noCKBytes := make(map[uint8]int64) for job := range ctx.Jobs(node.Id, nncp.TRx) { if *showPkt { jobPrint(nncp.TRx, job) @@ -105,6 +107,13 @@ func main() { rxNums[job.PktEnc.Nice] = rxNums[job.PktEnc.Nice] + 1 rxBytes[job.PktEnc.Nice] = rxBytes[job.PktEnc.Nice] + job.Size } + for job := range ctx.JobsNoCK(node.Id) { + if *showPkt { + jobPrint(nncp.TRx, job) + } + noCKNums[job.PktEnc.Nice] = noCKNums[job.PktEnc.Nice] + 1 + noCKBytes[job.PktEnc.Nice] = noCKBytes[job.PktEnc.Nice] + job.Size + } txNums := make(map[uint8]int) txBytes := make(map[uint8]int64) for job := range ctx.Jobs(node.Id, nncp.TTx) { @@ -118,17 +127,26 @@ func main() { for nice = 1; nice > 0; nice++ { rxNum, rxExists := rxNums[nice] txNum, txExists := txNums[nice] - if !(rxExists || txExists) { + noCKNum, noCKExists := noCKNums[nice] + if !(rxExists || txExists || noCKExists) { continue } fmt.Printf( - "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts\n", + "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts", nncp.NicenessFmt(nice), humanize.IBytes(uint64(rxBytes[nice])), rxNum, humanize.IBytes(uint64(txBytes[nice])), txNum, ) + if noCKExists { + fmt.Printf( + " | NoCK: % 10s, % 3d pkts", + humanize.IBytes(uint64(noCKBytes[nice])), + noCKNum, + ) + } + fmt.Printf("\n") } } } diff --git a/src/humanizer.go b/src/humanizer.go index b641cd6..c0de50f 100644 --- a/src/humanizer.go +++ b/src/humanizer.go @@ -47,13 +47,14 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) { if err == nil { nodeS = node.Name } + var sizeParsed uint64 var size string if sizeRaw, exists := le["Size"]; exists { - sp, err := strconv.ParseUint(sizeRaw, 10, 64) + sizeParsed, err = strconv.ParseUint(sizeRaw, 10, 64) if err != nil { return "", err } - size = humanize.IBytes(uint64(sp)) + size = humanize.IBytes(sizeParsed) } var msg string @@ -212,15 +213,13 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) { "Packet %s (%s) (nice %s)", le["Pkt"], size, NicenessFmt(nice), ) - offsetParsed, err := strconv.ParseUint(le["Offset"], 10, 64) - if err != nil { - return "", err - } - sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64) - if err != nil { - return "", err + if offset := le["Offset"]; offset != "" { + offsetParsed, err := strconv.ParseUint(offset, 10, 64) + if err != nil { + return "", err + } + msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed) } - msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed) if m, exists := le["Msg"]; exists { msg += ": " + m } @@ -249,10 +248,6 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) { if err != nil { return "", err } - sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64) - if err != nil { - return "", err - } msg += fmt.Sprintf( "%s %d%% (%s / %s)", le["Pkt"], diff --git a/src/jobs.go b/src/jobs.go index 3dc6bd4..3e97b2e 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -20,6 +20,7 @@ package nncp import ( "os" "path/filepath" + "strings" xdr "github.com/davecgh/go-xdr/xdr2" ) @@ -38,7 +39,7 @@ type Job struct { HshValue *[32]byte } -func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { +func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) jobs := make(chan Job, 16) go func() { @@ -53,7 +54,17 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { return } for _, fi := range fis { - hshValue, err := Base32Codec.DecodeString(fi.Name()) + var hshValue []byte + if nock { + if !strings.HasSuffix(fi.Name(), NoCKSuffix) { + continue + } + hshValue, err = Base32Codec.DecodeString( + strings.TrimSuffix(fi.Name(), NoCKSuffix), + ) + } else { + hshValue, err = Base32Codec.DecodeString(fi.Name()) + } if err != nil { continue } @@ -87,3 +98,11 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { }() return jobs } + +func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { + return ctx.jobsFind(nodeId, xx, false) +} + +func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job { + return ctx.jobsFind(nodeId, TRx, true) +} diff --git a/src/sp.go b/src/sp.go index 9410a2d..26c73e4 100644 --- a/src/sp.go +++ b/src/sp.go @@ -55,8 +55,6 @@ var ( DefaultDeadline = 10 * time.Second PingTimeout = time.Minute - - spCheckerToken chan struct{} ) type FdAndFullSize struct { @@ -153,8 +151,6 @@ func init() { panic(err) } SPFileOverhead = buf.Len() - spCheckerToken = make(chan struct{}, 1) - spCheckerToken <- struct{}{} } func MarshalSP(typ SPType, sp interface{}) []byte { @@ -188,6 +184,7 @@ type SPState struct { Ctx *Ctx Node *Node Nice uint8 + NoCK bool onlineDeadline time.Duration maxOnlineTime time.Duration hs *noise.HandshakeState @@ -220,6 +217,7 @@ type SPState struct { onlyPkts map[[32]byte]bool writeSPBuf bytes.Buffer fds map[string]FdAndFullSize + checkerJobs chan *[32]byte sync.RWMutex } @@ -241,6 +239,14 @@ func (state *SPState) SetDead() { for range state.pings { } }() + go func() { + for _, s := range state.fds { + s.fd.Close() + } + }() + if !state.NoCK { + close(state.checkerJobs) + } } func (state *SPState) NotAlive() bool { @@ -257,6 +263,31 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } +func (state *SPState) SPChecker() { + for hshValue := range state.checkerJobs { + les := LEs{ + {"XX", string(TRx)}, + {"Node", state.Node.Id}, + {"Pkt", Base32Codec.EncodeToString(hshValue[:])}, + } + state.Ctx.LogD("sp-file", les, "checking") + size, err := state.Ctx.CheckNoCK(state.Node.Id, hshValue) + les = append(les, LE{"Size", size}) + if err != nil { + state.Ctx.LogE("sp-file", les, err, "") + continue + } + state.Ctx.LogI("sp-done", les, "") + state.wg.Add(1) + go func(hsh *[32]byte) { + if !state.NotAlive() { + state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh}) + } + state.wg.Done() + }(hshValue) + } +} + func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { state.writeSPBuf.Reset() n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ @@ -450,6 +481,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.infosTheir = make(map[[32]byte]*SPInfo) state.started = started state.xxOnly = xxOnly + var buf []byte var payload []byte state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message") @@ -543,6 +575,20 @@ func (state *SPState) closeFd(pth string) { } } +func (state *SPState) FillExistingNoCK() { + checkerJobs := make([]*[32]byte, 0) + for job := range state.Ctx.JobsNoCK(state.Node.Id) { + if job.PktEnc.Nice > state.Nice { + continue + } + checkerJobs = append(checkerJobs, job.HshValue) + } + for _, job := range checkerJobs { + state.checkerJobs <- job + } + state.wg.Done() +} + func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, @@ -555,6 +601,14 @@ func (state *SPState) StartWorkers( state.mustFinishAt = state.started.Add(state.maxOnlineTime) } + // Checker + if !state.NoCK { + state.checkerJobs = make(chan *[32]byte) + go state.SPChecker() + state.wg.Add(1) + go state.FillExistingNoCK() + } + // Remaining handshake payload sending if len(infosPayloads) > 1 { state.wg.Add(1) @@ -838,9 +892,6 @@ func (state *SPState) StartWorkers( state.wg.Done() state.SetDead() conn.Close() // #nosec G104 - for _, s := range state.fds { - s.fd.Close() - } }() return nil @@ -934,6 +985,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } continue } + if _, err = os.Stat(pktPath + NoCKSuffix); err == nil { + state.Ctx.LogI("sp-info", lesp, "still non checksummed") + continue + } fi, err := os.Stat(pktPath + PartSuffix) var offset int64 if err == nil { @@ -1015,48 +1070,27 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { if fullsize != ourSize { continue } - <-spCheckerToken - go func() { - defer func() { - spCheckerToken <- struct{}{} - }() - if err := fd.Sync(); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") - state.closeFd(filePathPart) - return - } - state.wg.Add(1) - defer state.wg.Done() - if _, err = fd.Seek(0, io.SeekStart); err != nil { - state.closeFd(filePathPart) - state.Ctx.LogE("sp-file", lesp, err, "") - return - } - state.Ctx.LogD("sp-file", lesp, "checking") - gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs) - state.closeFd(filePathPart) - if err != nil || !gut { - state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "") - return - } - state.Ctx.LogI("sp-done", lesp, "") - if err = os.Rename(filePath+PartSuffix, filePath); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "rename") - return - } - if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") - return - } - state.Lock() - delete(state.infosTheir, *file.Hash) - state.Unlock() - state.wg.Add(1) - go func() { - state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) - state.wg.Done() - }() - }() + err = fd.Sync() + state.closeFd(filePathPart) + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "sync") + continue + } + if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "rename") + continue + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "sync") + continue + } + state.Ctx.LogI("sp-file", lesp, "downloaded") + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + if !state.NoCK { + state.checkerJobs <- file.Hash + } case SPTypeDone: lesp := append(les, LE{"Type", "done"}) state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") -- 2.44.0