]> Cypherpunks.ru repositories - nncp.git/commitdiff
Intermediate .nock packets step
authorSergey Matveev <stargrave@stargrave.org>
Sat, 20 Feb 2021 12:01:58 +0000 (15:01 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sat, 20 Feb 2021 19:37:04 +0000 (22:37 +0300)
18 files changed:
doc/call.texi
doc/cmds.texi
doc/news.ru.texi
doc/news.texi
doc/spool.texi
src/call.go
src/cfg.go
src/check.go
src/cmd/nncp-call/main.go
src/cmd/nncp-caller/main.go
src/cmd/nncp-cfgnew/main.go
src/cmd/nncp-check/main.go
src/cmd/nncp-daemon/main.go
src/cmd/nncp-rm/main.go
src/cmd/nncp-stat/main.go
src/humanizer.go
src/jobs.go
src/sp.go

index 81ee9bafd465a0b5d15044c3406fc035b5d431f6..fe55de5456d2b9f82c4aa83b428f6c13155d4c0b 100644 (file)
@@ -31,6 +31,7 @@ calls: [
     {
         cron: "*/5 * * * * * *"
         when-tx-exists: true
+        nock: true
     },
 ]
 @end verbatim
@@ -85,4 +86,14 @@ created, or skip any kind of packet processing.
 @item when-tx-exists
 Call only if packets for sending exists.
 
+@anchor{CfgNoCK}
+@item nock
+NoCK (no-checksumming) tells not to do checksumming of received files,
+assuming that it will be done for example with @ref{nncp-check} command
+later. That can help minimizing time spent online, because HDD won't do
+simultaneous reading of the data for checksumming and writing of the
+received one, but just sequential writing of the file. Pay attention
+that you have to make a call to remote node after checksumming is done,
+to send notification about successful packet reception.
+
 @end table
index 6510ed889c61067ac62486a338333ee7cd77cd6e..5141b08dd09bf0d98f0f0c4e686d71ca8452d941 100644 (file)
@@ -102,6 +102,7 @@ $ nncp-call [options]
     [-rxrate INT]
     [-txrate INT]
     [-autotoss*]
+    [-nock]
     NODE[:ADDR] [FORCEADDR]
 @end example
 
@@ -114,15 +115,17 @@ transfer.
 
 If @option{-rx} option is specified then only inbound packets
 transmission is performed. If @option{-tx} option is specified, then
-only outbound transmission is performed. @option{-onlinedeadline}
-overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}.
-@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime,
-@emph{maxonlinetime}}. @option{-rxrate}/@option{-txrate} override
-@ref{CfgXxRate, rxrate/txrate}. @option{-list} option allows you to list
-packets of remote node, without any transmission.
+only outbound transmission is performed.
 
-You can specify what packets your want to download, by specifying
-@option{-pkts} option with comma-separated list of packets identifiers.
+@option{-onlinedeadline} overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}.
+@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime, @emph{maxonlinetime}}.
+@option{-rxrate}/@option{-txrate} override @ref{CfgXxRate, rxrate/txrate}.
+Read @ref{CfgNoCK, more} about @option{-nock} option.
+
+@option{-list} option allows you to list packets of remote node, without
+any transmission. You can specify what packets your want to download, by
+specifying @option{-pkts} option with comma-separated list of packets
+identifiers.
 
 Each @option{NODE} can contain several uniquely identified
 @option{ADDR}esses in @ref{CfgAddrs, configuration} file. If you do
@@ -230,13 +233,16 @@ operating system.
 @section nncp-check
 
 @example
-$ nncp-check [options]
+$ nncp-check [-nock] [options]
 @end example
 
 Perform @ref{Spool, spool} directory integrity check. Read all files
 that has Base32-encoded filenames and compare it with recalculated
-BLAKE2b hash output of their contents. That supplementary command is
-not used often in practice, if ever.
+BLAKE2b hash output of their contents.
+
+The most useful mode of operation is with @option{-nock} option, that
+checks integrity of @file{.nock} files, renaming them to ordinary
+(verified) encrypted packets.
 
 @node nncp-cronexpr
 @section nncp-cronexpr
@@ -252,7 +258,9 @@ next time entities.
 @section nncp-daemon
 
 @example
-$ nncp-daemon [options] [-maxconn INT] [-bind ADDR] [-inetd] [-autotoss*]
+$ nncp-daemon [options]
+    [-maxconn INT] [-bind ADDR] [-inetd]
+    [-autotoss*] [-nock]
 @end example
 
 Start listening TCP daemon, wait for incoming connections and run
@@ -278,6 +286,8 @@ uucp        stream  tcp6    nowait  nncpuser        /usr/local/bin/nncp-daemon      nncp-daemon -quiet -
 during the call. All @option{-autotoss-*} options is the same as in
 @ref{nncp-toss} command.
 
+Read @ref{CfgNoCK, more} about @option{-nock} option.
+
 @node nncp-exec
 @section nncp-exec
 
@@ -509,6 +519,7 @@ $ nncp-rm [options] -tmp
 $ nncp-rm [options] -lock
 $ nncp-rm [options] -node NODE -part
 $ nncp-rm [options] -node NODE -seen
+$ nncp-rm [options] -node NODE -nock
 $ nncp-rm [options] -node NODE [-rx] [-tx]
 $ nncp-rm [options] -node NODE -pkt PKT
 @end example
@@ -529,10 +540,10 @@ Base32 name) will be deleted. This is useful when you see some packet
 failing to be processed.
 
 @item When either @option{-rx} or @option{-tx} options are specified
-(maybe both of them), then delete all packets from that given queues. If
-@option{-part} is given, then delete only @file{.part}ly downloaded
-ones. If @option{-seen} option is specified, then delete only
-@file{.seen} files.
+(maybe both of them), then delete all packets from that given queues.
+@option{-part} option deletes @file{.part}ly downloaded files.
+@option{-seen} option deletes @file{.seen} files. @option{-nock} option
+deletes non-checksummed (non-verified) @file{.nock} files.
 
 @item @option{-dryrun} option just prints what will be deleted.
 
index bc42c6e54f5de99de0a7a631ac387277bb9a7182..1c1a30785225c14c7cb4c4a289fb88402fe3e731 100644 (file)
@@ -15,6 +15,11 @@ spool директории.
 Оптимизация: не закрывать файловый дескриптор файла который мы качаем.
 Прежде каждый его кусочек приводил к дорогим open/close вызовам.
 
+@item
+Скачиваемые в режиме online файлы теперь сохраняются с @file{.nock}
+суффиксом (non-checksummed), ожидая пока либо @command{nncp-check}, либо
+online демоны не выполнят проверку целостности.
+
 @end itemize
 
 @node Релиз 6.0.0
index 565f79e0af01e45fe54e3bdf6246792e1cdee7f6..c6ee9b95a0b3a37084d6ca227ad31c0745080a67 100644 (file)
@@ -16,6 +16,11 @@ many packets in the spool directory.
 Optimization: do not close file descriptor of the file we download
 online. Previously each chunk lead to expensive open/close calls.
 
+@item
+Online downloaded files are saved with @file{.nock} (non-checksummed)
+suffix, waiting either for @command{nncp-check}, or online daemons to
+perform integrity check.
+
 @end itemize
 
 @node Release 6.0.0
index 5e7812d50b6a0944356aed56ce12bcc488662b55..aea79769b315e5babb10581a1fb434af983278a7 100644 (file)
@@ -12,6 +12,7 @@ spool/2WHB...OABQ/rx/5ZIB...UMKW.part
 spool/2WHB...OABQ/tx.lock
 spool/2WHB...OABQ/toss.lock
 spool/BYRR...CG6Q/rx.lock
+spool/BYRR...CG6Q/rx/MLZ6...Q3SQ.nock
 spool/BYRR...CG6Q/rx/
 spool/BYRR...CG6Q/tx.lock
 spool/BYRR...CG6Q/tx/AQUT...DGNT.seen
@@ -20,22 +21,34 @@ spool/BYRR...CG6Q/tx/VCSR...3VXX.seen
 spool/BYRR...CG6Q/tx/ZI5U...5RRQ
 @end example
 
-Except for @file{tmp}, all other directories are Base32-encoded node
-identifiers (@file{2WHB...OABQ}, @file{BYRR...CG6Q} in our example).
-Each node subdirectory has @file{rx} (received, partially received and
-currently unprocessed packets) and @file{tx} (for outbound packets)
+@itemize
+
+@item Except for @file{tmp}, all other directories are Base32-encoded
+node identifiers (@file{2WHB...OABQ}, @file{BYRR...CG6Q} in our example).
+
+@item Each node subdirectory has @file{rx} (received, partially received
+and currently unprocessed packets) and @file{tx} (for outbound packets)
 directories.
 
-Each @file{rx}/@file{tx} directory contains one file per encrypted
+@item Each @file{rx}/@file{tx} directory contains one file per encrypted
 packet. Its filename is Base32 encoded BLAKE2b hash of the contents. So
-it can be integrity checked at any time. @file{5ZIB...UMKW.part} is
-partially received file from @file{2WHB...OABQ} node. @file{tx}
-directory can not contain partially written files -- they are moved
-atomically from @file{tmp}.
+it can be integrity checked at any time.
+
+@item @file{5ZIB...UMKW.part} is partially received file from
+@file{2WHB...OABQ} node. @file{tx} directory can not contain partially
+written files -- they are moved atomically from @file{tmp}.
 
-When @ref{nncp-toss} utility is called with @option{-seen} option, it
-will create empty @file{XXX.seen} files, telling that some kind of
+@item @file{rx} can contain received, but currently integrity unchecked
+files with @file{.nock} extension. It is completely the same as an
+ordinary encrypted packets, but its integrity after online download was
+not done. After successful checksum verification, @file{.nock} extension
+is trimmed.
+
+@item When @ref{nncp-toss} utility is called with @option{-seen} option,
+it will create empty @file{XXX.seen} files, telling that some kind of
 packet was already tossed sometime.
 
-Only one process can work with @file{rx}/@file{tx} directories at once,
-so there are corresponding lock files.
+@item Only one process can work with @file{rx}/@file{tx} directories at
+once, so there are corresponding lock files.
+
+@end itemize
index ef4d07d757c90097d39ef486dfcd6bf938bb7259..2f635d63d0aa7d36b9fb33a87206cd173db9f4fb 100644 (file)
@@ -34,6 +34,7 @@ type Call struct {
        OnlineDeadline time.Duration
        MaxOnlineTime  time.Duration
        WhenTxExists   bool
+       NoCK           bool
 
        AutoToss       bool
        AutoTossDoSeen bool
@@ -51,6 +52,7 @@ func (ctx *Ctx) CallNode(
        rxRate, txRate int,
        onlineDeadline, maxOnlineTime time.Duration,
        listOnly bool,
+       noCK bool,
        onlyPkts map[[32]byte]bool,
 ) (isGood bool) {
        for _, addr := range addrs {
@@ -78,6 +80,7 @@ func (ctx *Ctx) CallNode(
                        rxRate:         rxRate,
                        txRate:         txRate,
                        listOnly:       listOnly,
+                       NoCK:           noCK,
                        onlyPkts:       onlyPkts,
                }
                if err = state.StartI(conn); err == nil {
index 946243396b7403c99aa499e22fd870a123516365..9d94953d319a1ae1ecee17cf9d64d090ad5fab4f 100644 (file)
@@ -82,6 +82,7 @@ type CallJSON struct {
        OnlineDeadline *uint   `json:"onlinedeadline,omitempty"`
        MaxOnlineTime  *uint   `json:"maxonlinetime,omitempty"`
        WhenTxExists   *bool   `json:"when-tx-exists,omitempty"`
+       NoCK           *bool   `json:"nock"`
 
        AutoToss       *bool `json:"autotoss,omitempty"`
        AutoTossDoSeen *bool `json:"autotoss-doseen,omitempty"`
@@ -284,6 +285,9 @@ func NewNode(name string, cfg NodeJSON) (*Node, error) {
                if callCfg.WhenTxExists != nil {
                        call.WhenTxExists = *callCfg.WhenTxExists
                }
+               if callCfg.NoCK != nil {
+                       call.NoCK = *callCfg.NoCK
+               }
                if callCfg.AutoToss != nil {
                        call.AutoToss = *callCfg.AutoToss
                }
index b2ea671325d0a027b956e05286a0bda4dd36d6f3..2569e6af15c360d334e7e650d00cfacdae7679f1 100644 (file)
@@ -24,10 +24,13 @@ import (
        "io"
        "log"
        "os"
+       "path/filepath"
 
        "golang.org/x/crypto/blake2b"
 )
 
+const NoCKSuffix = ".nock"
+
 func Check(src io.Reader, checksum []byte, les LEs, showPrgrs bool) (bool, error) {
        hsh, err := blake2b.New256(nil)
        if err != nil {
@@ -70,3 +73,33 @@ func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool {
 func (ctx *Ctx) Check(nodeId *NodeId) bool {
        return !(ctx.checkXxIsBad(nodeId, TRx) || ctx.checkXxIsBad(nodeId, TTx))
 }
+
+func (ctx *Ctx) CheckNoCK(nodeId *NodeId, hshValue *[32]byte) (int64, error) {
+       dirToSync := filepath.Join(ctx.Spool, nodeId.String(), string(TRx))
+       pktName := Base32Codec.EncodeToString(hshValue[:])
+       pktPath := filepath.Join(dirToSync, pktName)
+       fd, err := os.Open(pktPath + NoCKSuffix)
+       if err != nil {
+               return 0, err
+       }
+       fi, err := fd.Stat()
+       if err != nil {
+               return 0, err
+       }
+       defer fd.Close()
+       size := fi.Size()
+       les := LEs{
+               {"XX", string(TRx)},
+               {"Node", nodeId},
+               {"Pkt", pktName},
+               {"FullSize", size},
+       }
+       gut, err := Check(fd, hshValue[:], les, ctx.ShowPrgrs)
+       if err != nil || !gut {
+               return 0, errors.New("checksum mismatch")
+       }
+       if err = os.Rename(pktPath+NoCKSuffix, pktPath); err != nil {
+               return 0, err
+       }
+       return size, DirSync(dirToSync)
+}
index 9762d9216cd950f00390d1ea9a2ba1422eeb03a5..3136e3e9221e8a11236addfc917b0a4be24ae1e4 100644 (file)
@@ -44,6 +44,7 @@ func main() {
                rxOnly      = flag.Bool("rx", false, "Only receive packets")
                txOnly      = flag.Bool("tx", false, "Only transmit packets")
                listOnly    = flag.Bool("list", false, "Only list remote packets")
+               noCK        = flag.Bool("nock", false, "Do no checksum checking")
                onlyPktsRaw = flag.String("pkts", "", "Recieve only that packets, comma separated")
                rxRate      = flag.Int("rxrate", 0, "Maximal receive rate, pkts/sec")
                txRate      = flag.Int("txrate", 0, "Maximal transmit rate, pkts/sec")
@@ -185,6 +186,7 @@ func main() {
                onlineDeadline,
                maxOnlineTime,
                *listOnly,
+               *noCK,
                onlyPkts,
        )
 
index 245c6183689a075c3e596a5b5ca1bc43c5f604da..7ccefdd7a56b7eb622bb03665ef54c3d8a44e524 100644 (file)
@@ -176,6 +176,7 @@ func main() {
                                                        call.OnlineDeadline,
                                                        call.MaxOnlineTime,
                                                        false,
+                                                       call.NoCK,
                                                        nil,
                                                )
 
index 798f31cdb534f0e893ba495211df0611c3d136b5..08c4875fd65e2f9f31451912809fa6dd84a93610 100644 (file)
@@ -211,6 +211,7 @@ func main() {
     #   #     xx: rx
     #   #     addr: lan
     #   #     when-tx-exists: true
+    #   #     nock: true
     #   #
     #   #     autotoss: false
     #   #     autotoss-doseen: true
index 96ddd3fcf00520157ae4bb8f06778b24528810e0..e985b79ce729fbc8e76bada4f7b70842927ead6d 100644 (file)
@@ -23,6 +23,7 @@ import (
        "fmt"
        "log"
        "os"
+       "path/filepath"
 
        "go.cypherpunks.ru/nncp/v5"
 )
@@ -30,12 +31,13 @@ import (
 func usage() {
        fmt.Fprintf(os.Stderr, nncp.UsageHeader())
        fmt.Fprintf(os.Stderr, "nncp-check -- verify Rx/Tx packets checksum\n\n")
-       fmt.Fprintf(os.Stderr, "Usage: %s [options]\nOptions:\n", os.Args[0])
+       fmt.Fprintf(os.Stderr, "Usage: %s [-nock] [options]\nOptions:\n", os.Args[0])
        flag.PrintDefaults()
 }
 
 func main() {
        var (
+               nock      = flag.Bool("nock", false, "Process .nock files")
                cfgPath   = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
                nodeRaw   = flag.String("node", "", "Process only that node")
                spoolPath = flag.String("spool", "", "Override path to spool")
@@ -85,7 +87,20 @@ func main() {
                if nodeOnly != nil && nodeId != *nodeOnly.Id {
                        continue
                }
-               if !ctx.Check(node.Id) {
+               if *nock {
+                       for job := range ctx.JobsNoCK(node.Id) {
+                               if _, err = ctx.CheckNoCK(node.Id, job.HshValue); err != nil {
+                                       pktName := nncp.Base32Codec.EncodeToString(job.HshValue[:])
+                                       log.Println(filepath.Join(
+                                               ctx.Spool,
+                                               nodeId.String(),
+                                               string(nncp.TRx),
+                                               pktName+nncp.NoCKSuffix,
+                                       ), err)
+                                       isBad = true
+                               }
+                       }
+               } else if !ctx.Check(node.Id) {
                        isBad = true
                }
        }
index 6319debf56957061cbb1f34a41bf0f0db0ae6ab7..e3a73c9e72c162da46a8914a1b6709aed83d41b6 100644 (file)
@@ -70,11 +70,13 @@ func performSP(
        ctx *nncp.Ctx,
        conn nncp.ConnDeadlined,
        nice uint8,
+       noCK bool,
        nodeIdC chan *nncp.NodeId,
 ) {
        state := nncp.SPState{
                Ctx:  ctx,
                Nice: nice,
+               NoCK: noCK,
        }
        if err := state.StartR(conn); err == nil {
                ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected")
@@ -108,6 +110,7 @@ func main() {
                bind      = flag.String("bind", "[::]:5400", "Address to bind to")
                inetd     = flag.Bool("inetd", false, "Is it started as inetd service")
                maxConn   = flag.Int("maxconn", 128, "Maximal number of simultaneous connections")
+               noCK      = flag.Bool("nock", false, "Do no checksum checking")
                spoolPath = flag.String("spool", "", "Override path to spool")
                logPath   = flag.String("log", "", "Override path to logfile")
                quiet     = flag.Bool("quiet", false, "Print only errors")
@@ -160,7 +163,7 @@ func main() {
                os.Stderr.Close() // #nosec G104
                conn := &InetdConn{os.Stdin, os.Stdout}
                nodeIdC := make(chan *nncp.NodeId)
-               go performSP(ctx, conn, nice, nodeIdC)
+               go performSP(ctx, conn, nice, *noCK, nodeIdC)
                nodeId := <-nodeIdC
                var autoTossFinish chan struct{}
                var autoTossBadCode chan bool
@@ -197,7 +200,7 @@ func main() {
                ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted")
                go func(conn net.Conn) {
                        nodeIdC := make(chan *nncp.NodeId)
-                       go performSP(ctx, conn, nice, nodeIdC)
+                       go performSP(ctx, conn, nice, *noCK, nodeIdC)
                        nodeId := <-nodeIdC
                        var autoTossFinish chan struct{}
                        var autoTossBadCode chan bool
index d5609e609514a5ce76d2dea516ab766deb0e6957..0802f51dcebc99ac06bd96301dcb243976039d31 100644 (file)
@@ -39,6 +39,7 @@ func usage() {
        fmt.Fprintf(os.Stderr, "       %s [options] -lock\n", os.Args[0])
        fmt.Fprintf(os.Stderr, "       %s [options] -node NODE -part\n", os.Args[0])
        fmt.Fprintf(os.Stderr, "       %s [options] -node NODE -seen\n", os.Args[0])
+       fmt.Fprintf(os.Stderr, "       %s [options] -node NODE -nock\n", os.Args[0])
        fmt.Fprintf(os.Stderr, "       %s [options] -node NODE {-rx|-tx}\n", os.Args[0])
        fmt.Fprintf(os.Stderr, "       %s [options] -node NODE -pkt PKT\n", os.Args[0])
        fmt.Fprintln(os.Stderr, "-older option's time units are: (s)econds, (m)inutes, (h)ours, (d)ays")
@@ -56,6 +57,7 @@ func main() {
                doTx      = flag.Bool("tx", false, "Process transfered packets")
                doPart    = flag.Bool("part", false, "Remove only .part files")
                doSeen    = flag.Bool("seen", false, "Remove only .seen files")
+               doNoCK    = flag.Bool("nock", false, "Remove only .nock files")
                older     = flag.String("older", "", "XXX{smhd}: only older than XXX number of time units")
                dryRun    = flag.Bool("dryrun", false, "Do not actually remove files")
                pktRaw    = flag.String("pkt", "", "Packet to remove")
@@ -183,6 +185,13 @@ func main() {
                                        }
                                        return os.Remove(path)
                                }
+                               if *doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix) {
+                                       ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
+                                       if *dryRun {
+                                               return nil
+                                       }
+                                       return os.Remove(path)
+                               }
                                if *doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix) {
                                        ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
                                        if *dryRun {
@@ -198,6 +207,7 @@ func main() {
                                        return os.Remove(path)
                                }
                                if !*doSeen &&
+                                       !*doNoCK &&
                                        !*doPart &&
                                        (*doRx || *doTx) &&
                                        ((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) {
@@ -210,7 +220,7 @@ func main() {
                                return nil
                        })
        }
-       if *pktRaw != "" || *doRx || *doSeen || *doPart {
+       if *pktRaw != "" || *doRx || *doSeen || *doNoCK || *doPart {
                if err = remove(nncp.TRx); err != nil {
                        log.Fatalln("Can not remove:", err)
                }
index 994f83db6a8a1fdb66373f61e7d5fc9a20e90d2b..bd350c71def19ca60f7feb35ee64178ab4f04144 100644 (file)
@@ -98,6 +98,8 @@ func main() {
                fmt.Println(node.Name)
                rxNums := make(map[uint8]int)
                rxBytes := make(map[uint8]int64)
+               noCKNums := make(map[uint8]int)
+               noCKBytes := make(map[uint8]int64)
                for job := range ctx.Jobs(node.Id, nncp.TRx) {
                        if *showPkt {
                                jobPrint(nncp.TRx, job)
@@ -105,6 +107,13 @@ func main() {
                        rxNums[job.PktEnc.Nice] = rxNums[job.PktEnc.Nice] + 1
                        rxBytes[job.PktEnc.Nice] = rxBytes[job.PktEnc.Nice] + job.Size
                }
+               for job := range ctx.JobsNoCK(node.Id) {
+                       if *showPkt {
+                               jobPrint(nncp.TRx, job)
+                       }
+                       noCKNums[job.PktEnc.Nice] = noCKNums[job.PktEnc.Nice] + 1
+                       noCKBytes[job.PktEnc.Nice] = noCKBytes[job.PktEnc.Nice] + job.Size
+               }
                txNums := make(map[uint8]int)
                txBytes := make(map[uint8]int64)
                for job := range ctx.Jobs(node.Id, nncp.TTx) {
@@ -118,17 +127,26 @@ func main() {
                for nice = 1; nice > 0; nice++ {
                        rxNum, rxExists := rxNums[nice]
                        txNum, txExists := txNums[nice]
-                       if !(rxExists || txExists) {
+                       noCKNum, noCKExists := noCKNums[nice]
+                       if !(rxExists || txExists || noCKExists) {
                                continue
                        }
                        fmt.Printf(
-                               "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts\n",
+                               "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts",
                                nncp.NicenessFmt(nice),
                                humanize.IBytes(uint64(rxBytes[nice])),
                                rxNum,
                                humanize.IBytes(uint64(txBytes[nice])),
                                txNum,
                        )
+                       if noCKExists {
+                               fmt.Printf(
+                                       " | NoCK: % 10s, % 3d pkts",
+                                       humanize.IBytes(uint64(noCKBytes[nice])),
+                                       noCKNum,
+                               )
+                       }
+                       fmt.Printf("\n")
                }
        }
 }
index b641cd6d803b0cc26017884171b9bf45ab13e2e6..c0de50ff63f5975ecf289baa781cdf6fa620f2b4 100644 (file)
@@ -47,13 +47,14 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) {
        if err == nil {
                nodeS = node.Name
        }
+       var sizeParsed uint64
        var size string
        if sizeRaw, exists := le["Size"]; exists {
-               sp, err := strconv.ParseUint(sizeRaw, 10, 64)
+               sizeParsed, err = strconv.ParseUint(sizeRaw, 10, 64)
                if err != nil {
                        return "", err
                }
-               size = humanize.IBytes(uint64(sp))
+               size = humanize.IBytes(sizeParsed)
        }
 
        var msg string
@@ -212,15 +213,13 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) {
                        "Packet %s (%s) (nice %s)",
                        le["Pkt"], size, NicenessFmt(nice),
                )
-               offsetParsed, err := strconv.ParseUint(le["Offset"], 10, 64)
-               if err != nil {
-                       return "", err
-               }
-               sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64)
-               if err != nil {
-                       return "", err
+               if offset := le["Offset"]; offset != "" {
+                       offsetParsed, err := strconv.ParseUint(offset, 10, 64)
+                       if err != nil {
+                               return "", err
+                       }
+                       msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed)
                }
-               msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed)
                if m, exists := le["Msg"]; exists {
                        msg += ": " + m
                }
@@ -249,10 +248,6 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) {
                if err != nil {
                        return "", err
                }
-               sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64)
-               if err != nil {
-                       return "", err
-               }
                msg += fmt.Sprintf(
                        "%s %d%% (%s / %s)",
                        le["Pkt"],
index 3dc6bd431e3641439ae6d5f2c3b00df0f9052670..3e97b2ea52f89860a22b2c4c4081ff774cd1e8ac 100644 (file)
@@ -20,6 +20,7 @@ package nncp
 import (
        "os"
        "path/filepath"
+       "strings"
 
        xdr "github.com/davecgh/go-xdr/xdr2"
 )
@@ -38,7 +39,7 @@ type Job struct {
        HshValue *[32]byte
 }
 
-func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
+func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job {
        rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx))
        jobs := make(chan Job, 16)
        go func() {
@@ -53,7 +54,17 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
                        return
                }
                for _, fi := range fis {
-                       hshValue, err := Base32Codec.DecodeString(fi.Name())
+                       var hshValue []byte
+                       if nock {
+                               if !strings.HasSuffix(fi.Name(), NoCKSuffix) {
+                                       continue
+                               }
+                               hshValue, err = Base32Codec.DecodeString(
+                                       strings.TrimSuffix(fi.Name(), NoCKSuffix),
+                               )
+                       } else {
+                               hshValue, err = Base32Codec.DecodeString(fi.Name())
+                       }
                        if err != nil {
                                continue
                        }
@@ -87,3 +98,11 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
        }()
        return jobs
 }
+
+func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
+       return ctx.jobsFind(nodeId, xx, false)
+}
+
+func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job {
+       return ctx.jobsFind(nodeId, TRx, true)
+}
index 9410a2d905eb106a6cb2cbb53a7dca8b5895f4c5..26c73e482d5dce4303510ef1f41b9879ba321c1f 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -55,8 +55,6 @@ var (
 
        DefaultDeadline = 10 * time.Second
        PingTimeout     = time.Minute
-
-       spCheckerToken chan struct{}
 )
 
 type FdAndFullSize struct {
@@ -153,8 +151,6 @@ func init() {
                panic(err)
        }
        SPFileOverhead = buf.Len()
-       spCheckerToken = make(chan struct{}, 1)
-       spCheckerToken <- struct{}{}
 }
 
 func MarshalSP(typ SPType, sp interface{}) []byte {
@@ -188,6 +184,7 @@ type SPState struct {
        Ctx            *Ctx
        Node           *Node
        Nice           uint8
+       NoCK           bool
        onlineDeadline time.Duration
        maxOnlineTime  time.Duration
        hs             *noise.HandshakeState
@@ -220,6 +217,7 @@ type SPState struct {
        onlyPkts       map[[32]byte]bool
        writeSPBuf     bytes.Buffer
        fds            map[string]FdAndFullSize
+       checkerJobs    chan *[32]byte
        sync.RWMutex
 }
 
@@ -241,6 +239,14 @@ func (state *SPState) SetDead() {
                for range state.pings {
                }
        }()
+       go func() {
+               for _, s := range state.fds {
+                       s.fd.Close()
+               }
+       }()
+       if !state.NoCK {
+               close(state.checkerJobs)
+       }
 }
 
 func (state *SPState) NotAlive() bool {
@@ -257,6 +263,31 @@ func (state *SPState) dirUnlock() {
        state.Ctx.UnlockDir(state.txLock)
 }
 
+func (state *SPState) SPChecker() {
+       for hshValue := range state.checkerJobs {
+               les := LEs{
+                       {"XX", string(TRx)},
+                       {"Node", state.Node.Id},
+                       {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
+               }
+               state.Ctx.LogD("sp-file", les, "checking")
+               size, err := state.Ctx.CheckNoCK(state.Node.Id, hshValue)
+               les = append(les, LE{"Size", size})
+               if err != nil {
+                       state.Ctx.LogE("sp-file", les, err, "")
+                       continue
+               }
+               state.Ctx.LogI("sp-done", les, "")
+               state.wg.Add(1)
+               go func(hsh *[32]byte) {
+                       if !state.NotAlive() {
+                               state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
+                       }
+                       state.wg.Done()
+               }(hshValue)
+       }
+}
+
 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
        state.writeSPBuf.Reset()
        n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
@@ -450,6 +481,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        state.infosTheir = make(map[[32]byte]*SPInfo)
        state.started = started
        state.xxOnly = xxOnly
+
        var buf []byte
        var payload []byte
        state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
@@ -543,6 +575,20 @@ func (state *SPState) closeFd(pth string) {
        }
 }
 
+func (state *SPState) FillExistingNoCK() {
+       checkerJobs := make([]*[32]byte, 0)
+       for job := range state.Ctx.JobsNoCK(state.Node.Id) {
+               if job.PktEnc.Nice > state.Nice {
+                       continue
+               }
+               checkerJobs = append(checkerJobs, job.HshValue)
+       }
+       for _, job := range checkerJobs {
+               state.checkerJobs <- job
+       }
+       state.wg.Done()
+}
+
 func (state *SPState) StartWorkers(
        conn ConnDeadlined,
        infosPayloads [][]byte,
@@ -555,6 +601,14 @@ func (state *SPState) StartWorkers(
                state.mustFinishAt = state.started.Add(state.maxOnlineTime)
        }
 
+       // Checker
+       if !state.NoCK {
+               state.checkerJobs = make(chan *[32]byte)
+               go state.SPChecker()
+               state.wg.Add(1)
+               go state.FillExistingNoCK()
+       }
+
        // Remaining handshake payload sending
        if len(infosPayloads) > 1 {
                state.wg.Add(1)
@@ -838,9 +892,6 @@ func (state *SPState) StartWorkers(
                state.wg.Done()
                state.SetDead()
                conn.Close() // #nosec G104
-               for _, s := range state.fds {
-                       s.fd.Close()
-               }
        }()
 
        return nil
@@ -934,6 +985,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                }
                                continue
                        }
+                       if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
+                               state.Ctx.LogI("sp-info", lesp, "still non checksummed")
+                               continue
+                       }
                        fi, err := os.Stat(pktPath + PartSuffix)
                        var offset int64
                        if err == nil {
@@ -1015,48 +1070,27 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        if fullsize != ourSize {
                                continue
                        }
-                       <-spCheckerToken
-                       go func() {
-                               defer func() {
-                                       spCheckerToken <- struct{}{}
-                               }()
-                               if err := fd.Sync(); err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "sync")
-                                       state.closeFd(filePathPart)
-                                       return
-                               }
-                               state.wg.Add(1)
-                               defer state.wg.Done()
-                               if _, err = fd.Seek(0, io.SeekStart); err != nil {
-                                       state.closeFd(filePathPart)
-                                       state.Ctx.LogE("sp-file", lesp, err, "")
-                                       return
-                               }
-                               state.Ctx.LogD("sp-file", lesp, "checking")
-                               gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
-                               state.closeFd(filePathPart)
-                               if err != nil || !gut {
-                                       state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
-                                       return
-                               }
-                               state.Ctx.LogI("sp-done", lesp, "")
-                               if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "rename")
-                                       return
-                               }
-                               if err = DirSync(dirToSync); err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "sync")
-                                       return
-                               }
-                               state.Lock()
-                               delete(state.infosTheir, *file.Hash)
-                               state.Unlock()
-                               state.wg.Add(1)
-                               go func() {
-                                       state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
-                                       state.wg.Done()
-                               }()
-                       }()
+                       err = fd.Sync()
+                       state.closeFd(filePathPart)
+                       if err != nil {
+                               state.Ctx.LogE("sp-file", lesp, err, "sync")
+                               continue
+                       }
+                       if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
+                               state.Ctx.LogE("sp-file", lesp, err, "rename")
+                               continue
+                       }
+                       if err = DirSync(dirToSync); err != nil {
+                               state.Ctx.LogE("sp-file", lesp, err, "sync")
+                               continue
+                       }
+                       state.Ctx.LogI("sp-file", lesp, "downloaded")
+                       state.Lock()
+                       delete(state.infosTheir, *file.Hash)
+                       state.Unlock()
+                       if !state.NoCK {
+                               state.checkerJobs <- file.Hash
+                       }
                case SPTypeDone:
                        lesp := append(les, LE{"Type", "done"})
                        state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")