From: Sergey Matveev Date: Wed, 24 Feb 2021 09:26:05 +0000 (+0300) Subject: Merge branch 'develop' X-Git-Tag: v6.1.0^0 X-Git-Url: http://www.git.cypherpunks.ru/?a=commitdiff_plain;h=16ac75d51758c249f5fc4e8fac9cf71b84331c55;hp=ae2b0d687a4ed40fe0161b79b4a9ec42290ffb6b;p=nncp.git Merge branch 'develop' --- diff --git a/README b/README index e8bb290..eca5b5a 100644 --- a/README +++ b/README @@ -22,7 +22,7 @@ Home page: http://www.nncpgo.org/ Please send questions regarding the use of NNCP, bug reports and patches to nncp-devel mailing list: -https://lists.cypherpunks.ru/pipermail/nncp-devel/ +http://lists.cypherpunks.ru/nncp_002ddevel.html Development Git source code repository currently is located here: http://www.git.cypherpunks.ru/?p=nncp.git;a=summary diff --git a/README.RU b/README.RU index 9642305..9da6599 100644 --- a/README.RU +++ b/README.RU @@ -27,7 +27,7 @@ NNCP это свободное программное обеспечением: Пожалуйста все вопросы касающиеся использования NNCP, отчёты об ошибках и патчи отправляйте в nncp-devel почтовую рассылку: -https://lists.cypherpunks.ru/pipermail/nncp-devel/ +http://lists.cypherpunks.ru/nncp_002ddevel.html Исходный код для разработчика находится в Git репозитории: http://www.git.cypherpunks.ru/?p=nncp.git;a=summary diff --git a/doc/admin.texi b/doc/admin.texi index 02c1579..8c46a68 100644 --- a/doc/admin.texi +++ b/doc/admin.texi @@ -45,7 +45,7 @@ $ echo 'umask: "007"' >> /usr/local/etc/nncp.hjson Example @url{https://www.newsyslog.org/manual.html, newsyslog}'s entry: @example -/var/spool/nncp/log 644 7 100 * CYN +/var/spool/nncp/log 644 7 100 * BCYN @end example @item diff --git a/doc/call.texi b/doc/call.texi index 81ee9ba..fe55de5 100644 --- a/doc/call.texi +++ b/doc/call.texi @@ -31,6 +31,7 @@ calls: [ { cron: "*/5 * * * * * *" when-tx-exists: true + nock: true }, ] @end verbatim @@ -85,4 +86,14 @@ created, or skip any kind of packet processing. @item when-tx-exists Call only if packets for sending exists. +@anchor{CfgNoCK} +@item nock +NoCK (no-checksumming) tells not to do checksumming of received files, +assuming that it will be done for example with @ref{nncp-check} command +later. That can help minimizing time spent online, because HDD won't do +simultaneous reading of the data for checksumming and writing of the +received one, but just sequential writing of the file. Pay attention +that you have to make a call to remote node after checksumming is done, +to send notification about successful packet reception. + @end table diff --git a/doc/cfg.texi b/doc/cfg.texi index 3e0b882..cc6a24a 100644 --- a/doc/cfg.texi +++ b/doc/cfg.texi @@ -9,6 +9,7 @@ Example @url{https://hjson.org/, Hjson} configuration file: log: /var/spool/nncp/log umask: "022" noprogress: true + nohdr: true notify: { file: { @@ -103,6 +104,9 @@ Enabled @strong{noprogress} option disabled progress showing for many commands by default. You can always force its showing with @option{-progress} command line option anyway. +@anchor{CfgNoHdr} +@strong{nohdr} option disables @ref{HdrFile, .hdr} files usage. + @anchor{CfgNotify} @strong{notify} section contains notification settings for successfully tossed file, freq and exec packets. Corresponding @strong{from} and diff --git a/doc/cmds.texi b/doc/cmds.texi index 6510ed8..5141b08 100644 --- a/doc/cmds.texi +++ b/doc/cmds.texi @@ -102,6 +102,7 @@ $ nncp-call [options] [-rxrate INT] [-txrate INT] [-autotoss*] + [-nock] NODE[:ADDR] [FORCEADDR] @end example @@ -114,15 +115,17 @@ transfer. If @option{-rx} option is specified then only inbound packets transmission is performed. If @option{-tx} option is specified, then -only outbound transmission is performed. @option{-onlinedeadline} -overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}. -@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime, -@emph{maxonlinetime}}. @option{-rxrate}/@option{-txrate} override -@ref{CfgXxRate, rxrate/txrate}. @option{-list} option allows you to list -packets of remote node, without any transmission. +only outbound transmission is performed. -You can specify what packets your want to download, by specifying -@option{-pkts} option with comma-separated list of packets identifiers. +@option{-onlinedeadline} overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}. +@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime, @emph{maxonlinetime}}. +@option{-rxrate}/@option{-txrate} override @ref{CfgXxRate, rxrate/txrate}. +Read @ref{CfgNoCK, more} about @option{-nock} option. + +@option{-list} option allows you to list packets of remote node, without +any transmission. You can specify what packets your want to download, by +specifying @option{-pkts} option with comma-separated list of packets +identifiers. Each @option{NODE} can contain several uniquely identified @option{ADDR}esses in @ref{CfgAddrs, configuration} file. If you do @@ -230,13 +233,16 @@ operating system. @section nncp-check @example -$ nncp-check [options] +$ nncp-check [-nock] [options] @end example Perform @ref{Spool, spool} directory integrity check. Read all files that has Base32-encoded filenames and compare it with recalculated -BLAKE2b hash output of their contents. That supplementary command is -not used often in practice, if ever. +BLAKE2b hash output of their contents. + +The most useful mode of operation is with @option{-nock} option, that +checks integrity of @file{.nock} files, renaming them to ordinary +(verified) encrypted packets. @node nncp-cronexpr @section nncp-cronexpr @@ -252,7 +258,9 @@ next time entities. @section nncp-daemon @example -$ nncp-daemon [options] [-maxconn INT] [-bind ADDR] [-inetd] [-autotoss*] +$ nncp-daemon [options] + [-maxconn INT] [-bind ADDR] [-inetd] + [-autotoss*] [-nock] @end example Start listening TCP daemon, wait for incoming connections and run @@ -278,6 +286,8 @@ uucp stream tcp6 nowait nncpuser /usr/local/bin/nncp-daemon nncp-daemon -quiet - during the call. All @option{-autotoss-*} options is the same as in @ref{nncp-toss} command. +Read @ref{CfgNoCK, more} about @option{-nock} option. + @node nncp-exec @section nncp-exec @@ -509,6 +519,7 @@ $ nncp-rm [options] -tmp $ nncp-rm [options] -lock $ nncp-rm [options] -node NODE -part $ nncp-rm [options] -node NODE -seen +$ nncp-rm [options] -node NODE -nock $ nncp-rm [options] -node NODE [-rx] [-tx] $ nncp-rm [options] -node NODE -pkt PKT @end example @@ -529,10 +540,10 @@ Base32 name) will be deleted. This is useful when you see some packet failing to be processed. @item When either @option{-rx} or @option{-tx} options are specified -(maybe both of them), then delete all packets from that given queues. If -@option{-part} is given, then delete only @file{.part}ly downloaded -ones. If @option{-seen} option is specified, then delete only -@file{.seen} files. +(maybe both of them), then delete all packets from that given queues. +@option{-part} option deletes @file{.part}ly downloaded files. +@option{-seen} option deletes @file{.seen} files. @option{-nock} option +deletes non-checksummed (non-verified) @file{.nock} files. @item @option{-dryrun} option just prints what will be deleted. diff --git a/doc/contacts.texi b/doc/contacts.texi index 473df97..4223bf3 100644 --- a/doc/contacts.texi +++ b/doc/contacts.texi @@ -2,7 +2,7 @@ @unnumbered Contacts Please send questions regarding the use of NNCP, bug reports and patches to -@url{https://lists.cypherpunks.ru/pipermail/nncp-devel/, nncp-devel} +@url{http://lists.cypherpunks.ru/nncp_002ddevel.html, nncp-devel} mailing list. Announcements also go to this mailing list. Official website is @url{http://www.nncpgo.org/}. diff --git a/doc/download.texi b/doc/download.texi index df6e4b6..8df33b5 100644 --- a/doc/download.texi +++ b/doc/download.texi @@ -25,6 +25,10 @@ Tarballs include all necessary required libraries: @multitable {XXXXX} {XXXX-XX-XX} {XXXX KiB} {link sign} {xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx} @headitem Version @tab Date @tab Size @tab Tarball @tab SHA256 checksum +@item @ref{Release 6.0.0, 6.0.0} @tab 2021-01-23 @tab 1028 KiB +@tab @url{download/nncp-6.0.0.tar.xz, link} @url{download/nncp-6.0.0.tar.xz.sig, sign} +@tab @code{42FE8AA5 4520B3A1 ABB50D66 1BBBA6A1 41CE4E74 9B4816B0 D4C6845D 67465916} + @item @ref{Release 5.6.0, 5.6.0} @tab 2021-01-17 @tab 1024 KiB @tab @url{download/nncp-5.6.0.tar.xz, link} @url{download/nncp-5.6.0.tar.xz.sig, sign} @tab @code{1DC83F05 F14A3C3B 95820046 C60B170E B8C8936F 142A5B9A 1E943E6F 4CEFBDE3} diff --git a/doc/integration.texi b/doc/integration.texi index 3359a3a..84698cc 100644 --- a/doc/integration.texi +++ b/doc/integration.texi @@ -55,16 +55,27 @@ delivery via NNCP: @example /usr/local/etc/postfix/master.cf: nncp unix - n n - - pipe - flags=F user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient + flags=FRqhu user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient @end example This runs the @command{nncp-exec} command to place outgoing mail into -the NNCP queue after replacing @var{$nexthop} by the the receiving NNCP +the NNCP queue after replacing @var{$nexthop} by the receiving NNCP node and after replacing @var{$recipient} by the recipients. The @command{pipe(8)} delivery agent executes the @command{nncp-exec} command without assistance from the shell, so there are no problems with shell meta characters in command-line parameters. +Pay attention to @code{flags}, containing @code{R}, telling Postfix to +include @code{Return-Path:} header. Otherwise that envelope sender +information will be lost. Possibly you will also need somehow to +preserve that header on the receiving side, because @command{sendmail} +command will replace it. For example you can rename it before feeding to +@command{sendmail} with +@code{reformail -R Return-Path: X-Original-Return-Path: | sendmail}, or +extract with: + +@verbatiminclude sendmail.sh + @item Specify that mail for @emph{example.com}, should be delivered via NNCP, to a host named @emph{nncp-host}: @@ -134,7 +145,7 @@ mail delivery via NNCP: @example /usr/local/etc/postfix/master.cf: nncp unix - n n - - pipe - flags=F user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient + flags=Fqhu user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient @end example This runs the @command{nncp-exec} command to place outgoing mail into diff --git a/doc/news.ru.texi b/doc/news.ru.texi index d758060..9e9c669 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -1,6 +1,36 @@ @node Новости @section Новости +@node Релиз 6.1.0 +@subsection Релиз 6.1.0 +@itemize + +@item +Оптимизация: большинство команд теперь не держат открытыми файловые +дескрипторы. Прежде вы легко могли выйти за пределы максимально +допустимого количества открытых файлов, если у вас было много пакетов в +spool директории. + +@item +Оптимизация: не закрывать файловый дескриптор файла который мы качаем. +Прежде каждый его кусочек приводил к дорогим open/close вызовам. + +@item +Скачиваемые в режиме online файлы теперь сохраняются с @file{.nock} +суффиксом (non-checksummed), ожидая пока либо @command{nncp-check}, либо +online демоны не выполнят проверку целостности. + +@item +Оптимизация: для файлов, скачивание которых не было продолжено, сразу же +вычисляет контрольная сумма, пропуская промежуточный @file{.nock} шаг. + +@item +Возможность хранения заголовков зашифрованных пакетов в @file{.hdr} +файлах, рядом с самими пакетами. Это может существенно повысить скорость +получения списка пакетов на файловых системах с большим размером блока. + +@end itemize + @node Релиз 6.0.0 @subsection Релиз 6.0.0 @itemize diff --git a/doc/news.texi b/doc/news.texi index 866fb53..6be8466 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -3,11 +3,41 @@ See also this page @ref{Новости, on russian}. +@node Release 6.1.0 +@section Release 6.1.0 +@itemize + +@item +Optimization: most commands do not keep opened file descriptors now. +Previously you can exceed maximal number of opened files if you have got +many packets in the spool directory. + +@item +Optimization: do not close file descriptor of the file we download +online. Previously each chunk lead to expensive open/close calls. + +@item +Online downloaded files are saved with @file{.nock} (non-checksummed) +suffix, waiting either for @command{nncp-check}, or online daemons to +perform integrity check. + +@item +Optimization: files, that are not resumed, are checksummed immediately +during the online download, skipping @file{.nock}-intermediate step. + +@item +Ability to store encrypted packet's header in @file{.hdr} file, close to +the packet itself. That can greatly increase performance of packets +listing on filesystems with big block's size. + +@end itemize + @node Release 6.0.0 @section Release 6.0.0 @itemize -@item Log uses human readable and easy machine parseable +@item +Log uses human readable and easy machine parseable @url{https://www.gnu.org/software/recutils/, recfile} format for the records, instead of structured RFC 3339 lines. Old logs are not readable by @command{nncp-log} anymore. diff --git a/doc/sendmail.sh b/doc/sendmail.sh new file mode 100755 index 0000000..b826091 --- /dev/null +++ b/doc/sendmail.sh @@ -0,0 +1,6 @@ +#!/bin/sh -e + +tmp=`mktemp` +trap "rm -f $tmp" HUP PIPE INT QUIT TERM EXIT +cat > $tmp +sendmail -f "`reformail -x Return-Path: < $tmp`" $@ < $tmp diff --git a/doc/spool.texi b/doc/spool.texi index 5e7812d..8dcaa3e 100644 --- a/doc/spool.texi +++ b/doc/spool.texi @@ -3,39 +3,78 @@ Spool directory holds @ref{Encrypted, encrypted packets} received from remote nodes and queued for sending to them. It has the following -example structure: +example structure with just single outbound (@code{tx}) packet +@code{LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ} to the node +@code{2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ}: @example -spool/tmp/ -spool/2WHB...OABQ/rx.lock -spool/2WHB...OABQ/rx/5ZIB...UMKW.part -spool/2WHB...OABQ/tx.lock -spool/2WHB...OABQ/toss.lock -spool/BYRR...CG6Q/rx.lock -spool/BYRR...CG6Q/rx/ -spool/BYRR...CG6Q/tx.lock -spool/BYRR...CG6Q/tx/AQUT...DGNT.seen -spool/BYRR...CG6Q/tx/NSYY...ZUU6 -spool/BYRR...CG6Q/tx/VCSR...3VXX.seen -spool/BYRR...CG6Q/tx/ZI5U...5RRQ +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/toss.lock +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/rx.lock +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/rx/ +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/tx.lock +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/tx/LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ +spool/tmp @end example -Except for @file{tmp}, all other directories are Base32-encoded node -identifiers (@file{2WHB...OABQ}, @file{BYRR...CG6Q} in our example). -Each node subdirectory has @file{rx} (received, partially received and -currently unprocessed packets) and @file{tx} (for outbound packets) -directories. - -Each @file{rx}/@file{tx} directory contains one file per encrypted -packet. Its filename is Base32 encoded BLAKE2b hash of the contents. So -it can be integrity checked at any time. @file{5ZIB...UMKW.part} is -partially received file from @file{2WHB...OABQ} node. @file{tx} -directory can not contain partially written files -- they are moved -atomically from @file{tmp}. - -When @ref{nncp-toss} utility is called with @option{-seen} option, it -will create empty @file{XXX.seen} files, telling that some kind of -packet was already tossed sometime. - -Only one process can work with @file{rx}/@file{tx} directories at once, -so there are corresponding lock files. +@table @file + +@item tmp +directory contains various temporary files that under normal +circumstances are renamed to necessary files inside other directories. +All directories in @file{spool} @strong{have to} be on the same +filesystem for working renaming. + +@item 2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ +is an example Base32-encoded neighbour identifier. + +@item rx, tx +directories are for incoming and outgoing encrypted packets. @file{rx} +contains currently unfinished, non-checked, unprocessed, etc packets. + +@item toss.lock, rx.lock, tx.lock +Lock files. Only single process can work with @file{rx}/@file{tx} +directories at once. + +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ +is an example @ref{Encrypted, encrypted packet}. Its filename is Base32 +encoded BLAKE2b hash of the whole contents. It can be integrity checked +anytime. + +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.part +is an example @strong{partly} received file. It can appear only when +online transfer is used. Its filename is sent by remote side and until +file is fully downloaded -- it plays no role. + +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.nock +non-checksummed (NoCK) @strong{fully} received file. Its checksum is +verified against its filename either by @ref{nncp-check}, or by working +online daemons. If it is correct, then its extension is trimmed. + +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.seen +@ref{nncp-toss} utility can be invoked with @option{-seen} option, +leading to creation of @file{.seen} files, telling that the file with +specified hash has already been processed before. It could be useful +when there are use-cases where multiple ways of packets transfer +available and there is possibility of duplicates reception. You have to +manually remove them, when you do not need them (probably because they +are expired). + +@anchor{HdrFile} +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.hdr +If no @ref{CfgNoHdr, nohdr} option is enabled in configuration file, +then @file{.hdr} files are automatically created for every ordinary +(fully received and checksummed) packet. It literally contains just the +header of the corresponding packet. It will be automatically created +even during simple @ref{nncp-stat} call. On filesystems with big +blocksize (ZFS for example) it can greatly help listing the packets in +directories, because it prevents unnecessary read-amplification. On +other filesystems probably it won't help at all, or even harm +performance. + +There is a hack: you can create more dense @file{.hdr} allocation by +removing all @file{.hdr} files and then running @command{nncp-stat}, +that will recreate them. In many cases many @file{.hdr} files will be +allocated more or less linearly on the disk, decreasing listing time +even more. + +@end table diff --git a/makedist.sh b/makedist.sh index 8537c9e..eef4931 100755 --- a/makedist.sh +++ b/makedist.sh @@ -170,7 +170,7 @@ GPG key ID: 0x2B25868E75A1A953 NNCP releases Fingerprint: 92C2 F0AE FE73 208E 46BF F3DE 2B25 868E 75A1 A953 Please send questions regarding the use of NNCP, bug reports and patches -to mailing list: https://lists.cypherpunks.ru/pipermail/nncp-devel/ +to mailing list: http://lists.cypherpunks.ru/nncp_002ddevel.html EOF cat < nice { ctx.LogD("nncp-bundle", les, "too nice") - job.Fd.Close() // #nosec G104 continue } + fd, err := os.Open(job.Path) + if err != nil { + log.Fatalln("Error during opening:", err) + } if err = tarWr.WriteHeader(&tar.Header{ Format: tar.FormatUSTAR, Name: nncp.NNCPBundlePrefix, @@ -155,7 +158,7 @@ func main() { log.Fatalln("Error writing tar header:", err) } if _, err = nncp.CopyProgressed( - tarWr, job.Fd, "Tx", + tarWr, bufio.NewReader(fd), "Tx", append(les, nncp.LEs{ {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])}, {K: "FullSize", V: job.Size}, @@ -164,7 +167,9 @@ func main() { ); err != nil { log.Fatalln("Error during copying to tar:", err) } - job.Fd.Close() // #nosec G104 + if err = fd.Close(); err != nil { + log.Fatalln("Error during closing:", err) + } if err = tarWr.Flush(); err != nil { log.Fatalln("Error during tar flushing:", err) } @@ -172,8 +177,10 @@ func main() { log.Fatalln("Error during stdout flushing:", err) } if *doDelete { - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { log.Fatalln("Error during deletion:", err) + } else if ctx.HdrUsage { + os.Remove(job.Path + nncp.HdrSuffix) } } ctx.LogI("nncp-bundle", append(les, nncp.LE{K: "Size", V: job.Size}), "") @@ -298,7 +305,10 @@ func main() { if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName { ctx.LogI("nncp-bundle", les, "removed") if !*dryRun { - os.Remove(dstPath) // #nosec G104 + os.Remove(dstPath) + if ctx.HdrUsage { + os.Remove(dstPath + nncp.HdrSuffix) + } } } else { ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") @@ -407,6 +417,9 @@ func main() { if err = nncp.DirSync(dstDirPath); err != nil { log.Fatalln("Error during syncing:", err) } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncBuf, dstPath) + } } } for _, le := range les { diff --git a/src/cmd/nncp-call/main.go b/src/cmd/nncp-call/main.go index 9762d92..3136e3e 100644 --- a/src/cmd/nncp-call/main.go +++ b/src/cmd/nncp-call/main.go @@ -44,6 +44,7 @@ func main() { rxOnly = flag.Bool("rx", false, "Only receive packets") txOnly = flag.Bool("tx", false, "Only transmit packets") listOnly = flag.Bool("list", false, "Only list remote packets") + noCK = flag.Bool("nock", false, "Do no checksum checking") onlyPktsRaw = flag.String("pkts", "", "Recieve only that packets, comma separated") rxRate = flag.Int("rxrate", 0, "Maximal receive rate, pkts/sec") txRate = flag.Int("txrate", 0, "Maximal transmit rate, pkts/sec") @@ -185,6 +186,7 @@ func main() { onlineDeadline, maxOnlineTime, *listOnly, + *noCK, onlyPkts, ) diff --git a/src/cmd/nncp-caller/main.go b/src/cmd/nncp-caller/main.go index b702b49..7ccefdd 100644 --- a/src/cmd/nncp-caller/main.go +++ b/src/cmd/nncp-caller/main.go @@ -138,7 +138,6 @@ func main() { ctx.LogD("caller", les, "checking tx existence") txExists := false for job := range ctx.Jobs(node.Id, nncp.TTx) { - job.Fd.Close() if job.PktEnc.Nice > call.Nice { continue } @@ -177,6 +176,7 @@ func main() { call.OnlineDeadline, call.MaxOnlineTime, false, + call.NoCK, nil, ) diff --git a/src/cmd/nncp-cfgnew/main.go b/src/cmd/nncp-cfgnew/main.go index 798f31c..f511c35 100644 --- a/src/cmd/nncp-cfgnew/main.go +++ b/src/cmd/nncp-cfgnew/main.go @@ -104,6 +104,8 @@ func main() { # umask: "022" # Omit progress showing by default # noprogress: true + # Do not use .hdr files + # nohdr: true # Enable notification email sending # notify: { @@ -211,6 +213,7 @@ func main() { # # xx: rx # # addr: lan # # when-tx-exists: true + # # nock: true # # # # autotoss: false # # autotoss-doseen: true diff --git a/src/cmd/nncp-check/main.go b/src/cmd/nncp-check/main.go index 96ddd3f..e985b79 100644 --- a/src/cmd/nncp-check/main.go +++ b/src/cmd/nncp-check/main.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "os" + "path/filepath" "go.cypherpunks.ru/nncp/v5" ) @@ -30,12 +31,13 @@ import ( func usage() { fmt.Fprintf(os.Stderr, nncp.UsageHeader()) fmt.Fprintf(os.Stderr, "nncp-check -- verify Rx/Tx packets checksum\n\n") - fmt.Fprintf(os.Stderr, "Usage: %s [options]\nOptions:\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Usage: %s [-nock] [options]\nOptions:\n", os.Args[0]) flag.PrintDefaults() } func main() { var ( + nock = flag.Bool("nock", false, "Process .nock files") cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") nodeRaw = flag.String("node", "", "Process only that node") spoolPath = flag.String("spool", "", "Override path to spool") @@ -85,7 +87,20 @@ func main() { if nodeOnly != nil && nodeId != *nodeOnly.Id { continue } - if !ctx.Check(node.Id) { + if *nock { + for job := range ctx.JobsNoCK(node.Id) { + if _, err = ctx.CheckNoCK(node.Id, job.HshValue); err != nil { + pktName := nncp.Base32Codec.EncodeToString(job.HshValue[:]) + log.Println(filepath.Join( + ctx.Spool, + nodeId.String(), + string(nncp.TRx), + pktName+nncp.NoCKSuffix, + ), err) + isBad = true + } + } + } else if !ctx.Check(node.Id) { isBad = true } } diff --git a/src/cmd/nncp-daemon/main.go b/src/cmd/nncp-daemon/main.go index 6319deb..e3a73c9 100644 --- a/src/cmd/nncp-daemon/main.go +++ b/src/cmd/nncp-daemon/main.go @@ -70,11 +70,13 @@ func performSP( ctx *nncp.Ctx, conn nncp.ConnDeadlined, nice uint8, + noCK bool, nodeIdC chan *nncp.NodeId, ) { state := nncp.SPState{ Ctx: ctx, Nice: nice, + NoCK: noCK, } if err := state.StartR(conn); err == nil { ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected") @@ -108,6 +110,7 @@ func main() { bind = flag.String("bind", "[::]:5400", "Address to bind to") inetd = flag.Bool("inetd", false, "Is it started as inetd service") maxConn = flag.Int("maxconn", 128, "Maximal number of simultaneous connections") + noCK = flag.Bool("nock", false, "Do no checksum checking") spoolPath = flag.String("spool", "", "Override path to spool") logPath = flag.String("log", "", "Override path to logfile") quiet = flag.Bool("quiet", false, "Print only errors") @@ -160,7 +163,7 @@ func main() { os.Stderr.Close() // #nosec G104 conn := &InetdConn{os.Stdin, os.Stdout} nodeIdC := make(chan *nncp.NodeId) - go performSP(ctx, conn, nice, nodeIdC) + go performSP(ctx, conn, nice, *noCK, nodeIdC) nodeId := <-nodeIdC var autoTossFinish chan struct{} var autoTossBadCode chan bool @@ -197,7 +200,7 @@ func main() { ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted") go func(conn net.Conn) { nodeIdC := make(chan *nncp.NodeId) - go performSP(ctx, conn, nice, nodeIdC) + go performSP(ctx, conn, nice, *noCK, nodeIdC) nodeId := <-nodeIdC var autoTossFinish chan struct{} var autoTossBadCode chan bool diff --git a/src/cmd/nncp-rm/main.go b/src/cmd/nncp-rm/main.go index d5609e6..b036d70 100644 --- a/src/cmd/nncp-rm/main.go +++ b/src/cmd/nncp-rm/main.go @@ -39,6 +39,8 @@ func usage() { fmt.Fprintf(os.Stderr, " %s [options] -lock\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -part\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -seen\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [options] -node NODE -nock\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [options] -node NODE -hdr\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE {-rx|-tx}\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -pkt PKT\n", os.Args[0]) fmt.Fprintln(os.Stderr, "-older option's time units are: (s)econds, (m)inutes, (h)ours, (d)ays") @@ -50,12 +52,14 @@ func main() { var ( cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") doTmp = flag.Bool("tmp", false, "Remove all temporary files") + doHdr = flag.Bool("hdr", false, "Remove all .hdr files") doLock = flag.Bool("lock", false, "Remove all lock files") nodeRaw = flag.String("node", "", "Node to remove files in") doRx = flag.Bool("rx", false, "Process received packets") doTx = flag.Bool("tx", false, "Process transfered packets") doPart = flag.Bool("part", false, "Remove only .part files") doSeen = flag.Bool("seen", false, "Remove only .seen files") + doNoCK = flag.Bool("nock", false, "Remove only .nock files") older = flag.String("older", "", "XXX{smhd}: only older than XXX number of time units") dryRun = flag.Bool("dryrun", false, "Do not actually remove files") pktRaw = flag.String("pkt", "", "Packet to remove") @@ -176,14 +180,10 @@ func main() { ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping") return nil } - if *doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix) { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") - if *dryRun { - return nil - } - return os.Remove(path) - } - if *doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix) { + if (*doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix)) || + (*doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix)) || + (*doHdr && strings.HasSuffix(info.Name(), nncp.HdrSuffix)) || + (*doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix)) { ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") if *dryRun { return nil @@ -197,8 +197,7 @@ func main() { } return os.Remove(path) } - if !*doSeen && - !*doPart && + if !*doSeen && !*doNoCK && !*doHdr && !*doPart && (*doRx || *doTx) && ((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) { ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") @@ -210,12 +209,12 @@ func main() { return nil }) } - if *pktRaw != "" || *doRx || *doSeen || *doPart { + if *pktRaw != "" || *doRx || *doSeen || *doNoCK || *doHdr || *doPart { if err = remove(nncp.TRx); err != nil { log.Fatalln("Can not remove:", err) } } - if *pktRaw != "" || *doTx { + if *pktRaw != "" || *doTx || *doHdr { if err = remove(nncp.TTx); err != nil { log.Fatalln("Can not remove:", err) } diff --git a/src/cmd/nncp-stat/main.go b/src/cmd/nncp-stat/main.go index 04aaed2..bd350c7 100644 --- a/src/cmd/nncp-stat/main.go +++ b/src/cmd/nncp-stat/main.go @@ -98,18 +98,25 @@ func main() { fmt.Println(node.Name) rxNums := make(map[uint8]int) rxBytes := make(map[uint8]int64) + noCKNums := make(map[uint8]int) + noCKBytes := make(map[uint8]int64) for job := range ctx.Jobs(node.Id, nncp.TRx) { - job.Fd.Close() // #nosec G104 if *showPkt { jobPrint(nncp.TRx, job) } rxNums[job.PktEnc.Nice] = rxNums[job.PktEnc.Nice] + 1 rxBytes[job.PktEnc.Nice] = rxBytes[job.PktEnc.Nice] + job.Size } + for job := range ctx.JobsNoCK(node.Id) { + if *showPkt { + jobPrint(nncp.TRx, job) + } + noCKNums[job.PktEnc.Nice] = noCKNums[job.PktEnc.Nice] + 1 + noCKBytes[job.PktEnc.Nice] = noCKBytes[job.PktEnc.Nice] + job.Size + } txNums := make(map[uint8]int) txBytes := make(map[uint8]int64) for job := range ctx.Jobs(node.Id, nncp.TTx) { - job.Fd.Close() // #nosec G104 if *showPkt { jobPrint(nncp.TTx, job) } @@ -120,17 +127,26 @@ func main() { for nice = 1; nice > 0; nice++ { rxNum, rxExists := rxNums[nice] txNum, txExists := txNums[nice] - if !(rxExists || txExists) { + noCKNum, noCKExists := noCKNums[nice] + if !(rxExists || txExists || noCKExists) { continue } fmt.Printf( - "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts\n", + "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts", nncp.NicenessFmt(nice), humanize.IBytes(uint64(rxBytes[nice])), rxNum, humanize.IBytes(uint64(txBytes[nice])), txNum, ) + if noCKExists { + fmt.Printf( + " | NoCK: % 10s, % 3d pkts", + humanize.IBytes(uint64(noCKBytes[nice])), + noCKNum, + ) + } + fmt.Printf("\n") } } } diff --git a/src/cmd/nncp-xfer/main.go b/src/cmd/nncp-xfer/main.go index 06db800..093f108 100644 --- a/src/cmd/nncp-xfer/main.go +++ b/src/cmd/nncp-xfer/main.go @@ -28,7 +28,6 @@ import ( "os" "path/filepath" - xdr "github.com/davecgh/go-xdr/xdr2" "go.cypherpunks.ru/nncp/v5" ) @@ -183,8 +182,7 @@ func main() { isBad = true continue } - var pktEnc nncp.PktEnc - _, err = xdr.Unmarshal(fd, &pktEnc) + pktEnc, pktEncRaw, err := ctx.HdrRead(fd) if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 { ctx.LogD("nncp-xfer", les, "is not a packet") fd.Close() // #nosec G104 @@ -249,6 +247,14 @@ func main() { isBad = true } } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join( + ctx.Spool, + nodeId.String(), + string(nncp.TRx), + tmp.Checksum(), + )) + } } } @@ -315,39 +321,42 @@ Tx: } les = les[:len(les)-1] for job := range ctx.Jobs(&nodeId, nncp.TTx) { - pktName := filepath.Base(job.Fd.Name()) + pktName := filepath.Base(job.Path) les := append(les, nncp.LE{K: "Pkt", V: pktName}) if job.PktEnc.Nice > nice { ctx.LogD("nncp-xfer", les, "too nice") - job.Fd.Close() // #nosec G104 continue } if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) { ctx.LogD("nncp-xfer", les, "already exists") - job.Fd.Close() // #nosec G104 continue } if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) { ctx.LogD("nncp-xfer", les, "already exists") - job.Fd.Close() // #nosec G104 continue } tmp, err := nncp.TempFile(dstPath, "xfer") if err != nil { ctx.LogE("nncp-xfer", les, err, "mktemp") - job.Fd.Close() // #nosec G104 isBad = true break } les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()}) ctx.LogD("nncp-xfer", les, "created") + fd, err := os.Open(job.Path) + if err != nil { + ctx.LogE("nncp-xfer", les, err, "open") + tmp.Close() // #nosec G104 + isBad = true + continue + } bufW := bufio.NewWriter(tmp) copied, err := nncp.CopyProgressed( - bufW, bufio.NewReader(job.Fd), "Tx", + bufW, bufio.NewReader(fd), "Tx", append(les, nncp.LE{K: "FullSize", V: job.Size}), ctx.ShowPrgrs, ) - job.Fd.Close() // #nosec G104 + fd.Close() // #nosec G104 if err != nil { ctx.LogE("nncp-xfer", les, err, "copy") tmp.Close() // #nosec G104 @@ -383,9 +392,11 @@ Tx: les = les[:len(les)-1] ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "") if !*keep { - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("nncp-xfer", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + nncp.HdrSuffix) } } } diff --git a/src/ctx.go b/src/ctx.go index ebd3102..d838828 100644 --- a/src/ctx.go +++ b/src/ctx.go @@ -40,6 +40,7 @@ type Ctx struct { UmaskForce *int Quiet bool ShowPrgrs bool + HdrUsage bool Debug bool NotifyFile *FromToJSON NotifyFreq *FromToJSON diff --git a/src/go.mod b/src/go.mod index 2742c74..a85ef94 100644 --- a/src/go.mod +++ b/src/go.mod @@ -6,14 +6,14 @@ require ( github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 github.com/hjson/hjson-go v3.1.0+incompatible - github.com/klauspost/compress v1.11.4 + github.com/klauspost/compress v1.11.7 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect go.cypherpunks.ru/balloon v1.1.1 go.cypherpunks.ru/recfile v0.4.3 - golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad - golang.org/x/net v0.0.0-20201224014010-6772e930b67b - golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 - golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 + golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 + golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d + golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43 + golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect ) diff --git a/src/go.sum b/src/go.sum index 8ea9c6f..83c3eac 100644 --- a/src/go.sum +++ b/src/go.sum @@ -8,8 +8,8 @@ github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVf github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA= github.com/hjson/hjson-go v3.1.0+incompatible h1:DY/9yE8ey8Zv22bY+mHV1uk2yRy0h8tKhZ77hEdi0Aw= github.com/hjson/hjson-go v3.1.0+incompatible/go.mod h1:qsetwF8NlsTsOTwZTApNlTCerV+b2GjYRRcIk4JMFio= -github.com/klauspost/compress v1.11.4 h1:kz40R/YWls3iqT9zX9AHN3WoVsrAWVyui5sxuLqiXqU= -github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg= +github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -20,19 +20,21 @@ go.cypherpunks.ru/balloon v1.1.1/go.mod h1:k4s4ozrIrhpBjj78Z7LX8ZHxMQ+XE7DZUWl8g go.cypherpunks.ru/recfile v0.4.3 h1:ephokihmV//p0ob6gx2FWXvm28/NBDbWTOJPUNahxO8= go.cypherpunks.ru/recfile v0.4.3/go.mod h1:sR+KajB+vzofL3SFVFwKt3Fke0FaCcN1g3YPNAhU3qI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= -golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= -golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d h1:1aflnvSoWWLI2k/dMUAl5lvU1YO4Mb4hz0gh+1rjcxU= +golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 h1:/dSxr6gT0FNI1MO5WLJo8mTmItROeOKTkDn+7OwWBos= -golang.org/x/sys v0.0.0-20210105210732-16f7687f5001/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43 h1:SgQ6LNaYJU0JIuEHv9+s6EbhSCwYeAf5Yvj6lpYlqAE= +golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE= +golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/src/humanizer.go b/src/humanizer.go index 9ea5e99..c0de50f 100644 --- a/src/humanizer.go +++ b/src/humanizer.go @@ -47,13 +47,14 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) { if err == nil { nodeS = node.Name } + var sizeParsed uint64 var size string if sizeRaw, exists := le["Size"]; exists { - sp, err := strconv.ParseUint(sizeRaw, 10, 64) + sizeParsed, err = strconv.ParseUint(sizeRaw, 10, 64) if err != nil { return "", err } - size = humanize.IBytes(uint64(sp)) + size = humanize.IBytes(sizeParsed) } var msg string @@ -203,7 +204,6 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) { if err, exists := le["Err"]; exists { msg += ": " + err } - case "sp-info": nice, err := NicenessParse(le["Nice"]) if err != nil { @@ -213,15 +213,13 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) { "Packet %s (%s) (nice %s)", le["Pkt"], size, NicenessFmt(nice), ) - offsetParsed, err := strconv.ParseUint(le["Offset"], 10, 64) - if err != nil { - return "", err - } - sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64) - if err != nil { - return "", err + if offset := le["Offset"]; offset != "" { + offsetParsed, err := strconv.ParseUint(offset, 10, 64) + if err != nil { + return "", err + } + msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed) } - msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed) if m, exists := le["Msg"]; exists { msg += ": " + m } @@ -250,10 +248,6 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) { if err != nil { return "", err } - sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64) - if err != nil { - return "", err - } msg += fmt.Sprintf( "%s %d%% (%s / %s)", le["Pkt"], diff --git a/src/jobs.go b/src/jobs.go index 8fe4564..37a31b1 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -18,9 +18,10 @@ along with this program. If not, see . package nncp import ( - "io" + "bytes" "os" "path/filepath" + "strings" xdr "github.com/davecgh/go-xdr/xdr2" ) @@ -30,16 +31,54 @@ type TRxTx string const ( TRx TRxTx = "rx" TTx TRxTx = "tx" + + HdrSuffix = ".hdr" ) type Job struct { PktEnc *PktEnc - Fd *os.File + Path string Size int64 HshValue *[32]byte } -func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { +func (ctx *Ctx) HdrRead(fd *os.File) (*PktEnc, []byte, error) { + var pktEnc PktEnc + _, err := xdr.Unmarshal(fd, &pktEnc) + if err != nil { + return nil, nil, err + } + var raw bytes.Buffer + if _, err = xdr.Marshal(&raw, pktEnc); err != nil { + panic(err) + } + return &pktEnc, raw.Bytes(), nil +} + +func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error { + tmpHdr, err := ctx.NewTmpFile() + if err != nil { + ctx.LogE("hdr-write", []LE{}, err, "new") + return err + } + if _, err = tmpHdr.Write(pktEncRaw); err != nil { + ctx.LogE("hdr-write", []LE{}, err, "write") + os.Remove(tmpHdr.Name()) + return err + } + if err = tmpHdr.Close(); err != nil { + ctx.LogE("hdr-write", []LE{}, err, "close") + os.Remove(tmpHdr.Name()) + return err + } + if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil { + ctx.LogE("hdr-write", []LE{}, err, "rename") + return err + } + return err +} + +func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) jobs := make(chan Job, 16) go func() { @@ -54,33 +93,58 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { return } for _, fi := range fis { - hshValue, err := Base32Codec.DecodeString(fi.Name()) - if err != nil { - continue + name := fi.Name() + var hshValue []byte + if nock { + if !strings.HasSuffix(name, NoCKSuffix) || + len(name) != Base32Encoded32Len+len(NoCKSuffix) { + continue + } + hshValue, err = Base32Codec.DecodeString( + strings.TrimSuffix(name, NoCKSuffix), + ) + } else { + if len(name) != Base32Encoded32Len { + continue + } + hshValue, err = Base32Codec.DecodeString(name) } - fd, err := os.Open(filepath.Join(rxPath, fi.Name())) if err != nil { continue } - var pktEnc PktEnc - if _, err = xdr.Unmarshal(fd, &pktEnc); err != nil || pktEnc.Magic != MagicNNCPEv4 { - fd.Close() // #nosec G104 + pth := filepath.Join(rxPath, name) + hdrExists := true + var fd *os.File + if nock { + fd, err = os.Open(pth) + } else { + fd, err = os.Open(pth + HdrSuffix) + if err != nil && os.IsNotExist(err) { + hdrExists = false + fd, err = os.Open(pth) + } + } + if err != nil { continue } - if _, err = fd.Seek(0, io.SeekStart); err != nil { - fd.Close() // #nosec G104 + pktEnc, pktEncRaw, err := ctx.HdrRead(fd) + fd.Close() + if err != nil || pktEnc.Magic != MagicNNCPEv4 { continue } ctx.LogD("jobs", LEs{ {"XX", string(xx)}, {"Node", pktEnc.Sender}, - {"Name", fi.Name()}, + {"Name", name}, {"Nice", int(pktEnc.Nice)}, {"Size", fi.Size()}, }, "taken") + if !hdrExists && ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, pth) + } job := Job{ - PktEnc: &pktEnc, - Fd: fd, + PktEnc: pktEnc, + Path: pth, Size: fi.Size(), HshValue: new([32]byte), } @@ -90,3 +154,11 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { }() return jobs } + +func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { + return ctx.jobsFind(nodeId, xx, false) +} + +func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job { + return ctx.jobsFind(nodeId, TRx, true) +} diff --git a/src/log.go b/src/log.go index 5c98bd7..25a4946 100644 --- a/src/log.go +++ b/src/log.go @@ -43,6 +43,8 @@ func (les LEs) Rec() string { switch v := le.V.(type) { case int, int8, uint8, int64, uint64: val = fmt.Sprintf("%d", v) + case bool: + val = fmt.Sprintf("%v", v) default: val = fmt.Sprintf("%s", v) } diff --git a/src/nncp.go b/src/nncp.go index 954dad1..16e5b9a 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -37,8 +37,10 @@ You should have received a copy of the GNU General Public License along with this program. If not, see .` ) +const Base32Encoded32Len = 52 + var ( - Version string = "6.0.0" + Version string = "6.1.0" Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) ) diff --git a/src/pkt.go b/src/pkt.go index 4dea1bf..c2715e4 100644 --- a/src/pkt.go +++ b/src/pkt.go @@ -192,14 +192,14 @@ func PktEncWrite( size, padSize int64, data io.Reader, out io.Writer, -) error { +) ([]byte, error) { pubEph, prvEph, err := box.GenerateKey(rand.Reader) if err != nil { - return err + return nil, err } var pktBuf bytes.Buffer if _, err := xdr.Marshal(&pktBuf, pkt); err != nil { - return err + return nil, err } tbs := PktTbs{ Magic: MagicNNCPEv4, @@ -210,7 +210,7 @@ func PktEncWrite( } var tbsBuf bytes.Buffer if _, err = xdr.Marshal(&tbsBuf, &tbs); err != nil { - return err + return nil, err } signature := new([ed25519.SignatureSize]byte) copy(signature[:], ed25519.Sign(our.SignPrv, tbsBuf.Bytes())) @@ -222,26 +222,31 @@ func PktEncWrite( ExchPub: *pubEph, Sign: *signature, } - if _, err = xdr.Marshal(out, &pktEnc); err != nil { - return err + tbsBuf.Reset() + if _, err = xdr.Marshal(&tbsBuf, &pktEnc); err != nil { + return nil, err + } + pktEncRaw := tbsBuf.Bytes() + if _, err = out.Write(pktEncRaw); err != nil { + return nil, err } sharedKey := new([32]byte) curve25519.ScalarMult(sharedKey, prvEph, their.ExchPub) kdf, err := blake2b.NewXOF(KDFXOFSize, sharedKey[:]) if err != nil { - return err + return nil, err } if _, err = kdf.Write(MagicNNCPEv4[:]); err != nil { - return err + return nil, err } key := make([]byte, chacha20poly1305.KeySize) if _, err = io.ReadFull(kdf, key); err != nil { - return err + return nil, err } aead, err := chacha20poly1305.New(key) if err != nil { - return err + return nil, err } nonce := make([]byte, aead.NonceSize()) @@ -249,31 +254,31 @@ func PktEncWrite( sizeBuf := make([]byte, 8+aead.Overhead()) binary.BigEndian.PutUint64(sizeBuf, uint64(sizeWithTags(int64(fullSize)))) if _, err = out.Write(aead.Seal(sizeBuf[:0], nonce, sizeBuf[:8], nil)); err != nil { - return err + return nil, err } lr := io.LimitedReader{R: data, N: size} mr := io.MultiReader(&pktBuf, &lr) written, err := aeadProcess(aead, nonce, true, mr, out) if err != nil { - return err + return nil, err } if written != fullSize { - return io.ErrUnexpectedEOF + return nil, io.ErrUnexpectedEOF } if padSize > 0 { if _, err = io.ReadFull(kdf, key); err != nil { - return err + return nil, err } kdf, err = blake2b.NewXOF(blake2b.OutputLengthUnknown, key) if err != nil { - return err + return nil, err } if _, err = io.CopyN(out, kdf, padSize); err != nil { - return err + return nil, err } } - return nil + return pktEncRaw, nil } func TbsVerify(our *NodeOur, their *Node, pktEnc *PktEnc) (bool, error) { diff --git a/src/pkt_test.go b/src/pkt_test.go index cbf7818..079acbd 100644 --- a/src/pkt_test.go +++ b/src/pkt_test.go @@ -44,7 +44,7 @@ func TestPktEncWrite(t *testing.T) { if err != nil { panic(err) } - err = PktEncWrite( + _, err = PktEncWrite( nodeOur, nodeTheir.Their(), pkt, @@ -95,7 +95,7 @@ func TestPktEncRead(t *testing.T) { if err != nil { panic(err) } - err = PktEncWrite( + _, err = PktEncWrite( node1, node2.Their(), pkt, diff --git a/src/progress.go b/src/progress.go index 62e0f01..ca1f361 100644 --- a/src/progress.go +++ b/src/progress.go @@ -141,7 +141,7 @@ func Progress(prefix string, les LEs) { progressBarsLock.Unlock() } what := pkt - if len(what) >= 52 { // Base32 encoded + if len(what) >= Base32Encoded32Len { // Base32 encoded what = what[:16] + ".." + what[len(what)-16:] } what = prefix + " " + what diff --git a/src/sp.go b/src/sp.go index 3c6d2d4..6f1af9f 100644 --- a/src/sp.go +++ b/src/sp.go @@ -21,6 +21,7 @@ import ( "bytes" "crypto/subtle" "errors" + "hash" "io" "os" "path/filepath" @@ -30,6 +31,7 @@ import ( xdr "github.com/davecgh/go-xdr/xdr2" "github.com/flynn/noise" + "golang.org/x/crypto/blake2b" ) const ( @@ -38,6 +40,11 @@ const ( SPHeadOverhead = 4 ) +type SPCheckerQueues struct { + appeared chan *[32]byte + checked chan *[32]byte +} + var ( MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1} @@ -56,9 +63,19 @@ var ( DefaultDeadline = 10 * time.Second PingTimeout = time.Minute - spCheckerToken chan struct{} + spCheckers = make(map[NodeId]*SPCheckerQueues) ) +type FdAndFullSize struct { + fd *os.File + fullSize int64 +} + +type HasherAndOffset struct { + h hash.Hash + offset uint64 +} + type SPType uint8 const ( @@ -148,8 +165,6 @@ func init() { panic(err) } SPFileOverhead = buf.Len() - spCheckerToken = make(chan struct{}, 1) - spCheckerToken <- struct{}{} } func MarshalSP(typ SPType, sp interface{}) []byte { @@ -183,6 +198,7 @@ type SPState struct { Ctx *Ctx Node *Node Nice uint8 + NoCK bool onlineDeadline time.Duration maxOnlineTime time.Duration hs *noise.HandshakeState @@ -214,6 +230,9 @@ type SPState struct { listOnly bool onlyPkts map[[32]byte]bool writeSPBuf bytes.Buffer + fds map[string]FdAndFullSize + fileHashers map[string]*HasherAndOffset + checkerQueues SPCheckerQueues sync.RWMutex } @@ -235,6 +254,11 @@ func (state *SPState) SetDead() { for range state.pings { } }() + go func() { + for _, s := range state.fds { + s.fd.Close() + } + }() } func (state *SPState) NotAlive() bool { @@ -251,6 +275,25 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } +func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) { + for hshValue := range appeared { + les := LEs{ + {"XX", string(TRx)}, + {"Node", nodeId}, + {"Pkt", Base32Codec.EncodeToString(hshValue[:])}, + } + ctx.LogD("sp-checker", les, "checking") + size, err := ctx.CheckNoCK(nodeId, hshValue) + les = append(les, LE{"Size", size}) + if err != nil { + ctx.LogE("sp-checker", les, err, "") + continue + } + ctx.LogI("sp-done", les, "") + go func(hsh *[32]byte) { checked <- hsh }(hshValue) + } +} + func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { state.writeSPBuf.Reset() n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ @@ -292,7 +335,6 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { - job.Fd.Close() // #nosec G104 if job.PktEnc.Nice > nice { continue } @@ -445,6 +487,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.infosTheir = make(map[[32]byte]*SPInfo) state.started = started state.xxOnly = xxOnly + var buf []byte var payload []byte state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message") @@ -530,17 +573,60 @@ func (state *SPState) StartR(conn ConnDeadlined) error { return err } +func (state *SPState) closeFd(pth string) { + s, exists := state.fds[pth] + delete(state.fds, pth) + if exists { + s.fd.Close() + } +} + func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, payload []byte, ) error { les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} + state.fds = make(map[string]FdAndFullSize) + state.fileHashers = make(map[string]*HasherAndOffset) state.isDead = make(chan struct{}) if state.maxOnlineTime > 0 { state.mustFinishAt = state.started.Add(state.maxOnlineTime) } + // Checker + if !state.NoCK { + queues := spCheckers[*state.Node.Id] + if queues == nil { + queues = &SPCheckerQueues{ + appeared: make(chan *[32]byte), + checked: make(chan *[32]byte), + } + spCheckers[*state.Node.Id] = queues + go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked) + } + state.checkerQueues = *queues + go func() { + for job := range state.Ctx.JobsNoCK(state.Node.Id) { + if job.PktEnc.Nice <= state.Nice { + state.checkerQueues.appeared <- job.HshValue + } + } + }() + state.wg.Add(1) + go func() { + defer state.wg.Done() + for { + select { + case <-state.isDead: + return + case hsh := <-state.checkerQueues.checked: + state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh}) + } + } + }() + } + // Remaining handshake payload sending if len(infosPayloads) > 1 { state.wg.Add(1) @@ -683,22 +769,29 @@ func (state *SPState) StartWorkers( {"Size", int64(freq.Offset)}, }...) state.Ctx.LogD("sp-file", lesp, "queueing") - fd, err := os.Open(filepath.Join( + pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), Base32Codec.EncodeToString(freq.Hash[:]), - )) - if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") - return - } - fi, err := fd.Stat() - if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") - return + ) + fdAndFullSize, exists := state.fds[pth] + if !exists { + fd, err := os.Open(pth) + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "") + return + } + fi, err := fd.Stat() + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "") + return + } + fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()} + state.fds[pth] = fdAndFullSize } - fullSize := fi.Size() + fd := fdAndFullSize.fd + fullSize := fdAndFullSize.fullSize var buf []byte if freq.Offset < uint64(fullSize) { state.Ctx.LogD("sp-file", lesp, "seeking") @@ -715,7 +808,7 @@ func (state *SPState) StartWorkers( buf = buf[:n] state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read") } - fd.Close() // #nosec G104 + state.closeFd(pth) payload = MarshalSP(SPTypeFile, SPFile{ Hash: freq.Hash, Offset: freq.Offset, @@ -862,8 +955,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Lock() state.queueTheir = nil state.Unlock() + case SPTypePing: state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "") + case SPTypeInfo: infosGot = true lesp := append(les, LE{"Type", "info"}) @@ -910,6 +1005,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } continue } + if _, err = os.Stat(pktPath + NoCKSuffix); err == nil { + state.Ctx.LogI("sp-info", lesp, "still non checksummed") + continue + } fi, err := os.Stat(pktPath + PartSuffix) var offset int64 if err == nil { @@ -926,6 +1025,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { SPFreq{info.Hash, uint64(offset)}, )) } + case SPTypeFile: lesp := append(les, LE{"Type", "file"}) state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") @@ -945,29 +1045,60 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { string(TRx), ) filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:])) + filePathPart := filePath + PartSuffix state.Ctx.LogD("sp-file", lesp, "opening part") - fd, err := os.OpenFile( - filePath+PartSuffix, - os.O_RDWR|os.O_CREATE, - os.FileMode(0666), - ) - if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") - return nil, err + fdAndFullSize, exists := state.fds[filePathPart] + var fd *os.File + if exists { + fd = fdAndFullSize.fd + } else { + fd, err = os.OpenFile( + filePathPart, + os.O_RDWR|os.O_CREATE, + os.FileMode(0666), + ) + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "") + return nil, err + } + state.fds[filePathPart] = FdAndFullSize{fd: fd} + if file.Offset == 0 { + h, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + state.fileHashers[filePath] = &HasherAndOffset{h: h} + } } state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking") if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { state.Ctx.LogE("sp-file", lesp, err, "") - fd.Close() // #nosec G104 + state.closeFd(filePathPart) return nil, err } state.Ctx.LogD("sp-file", lesp, "writing") - _, err = fd.Write(file.Payload) - if err != nil { + if _, err = fd.Write(file.Payload); err != nil { state.Ctx.LogE("sp-file", lesp, err, "") - fd.Close() // #nosec G104 + state.closeFd(filePathPart) return nil, err } + hasherAndOffset, hasherExists := state.fileHashers[filePath] + if hasherExists { + if hasherAndOffset.offset == file.Offset { + if _, err = hasherAndOffset.h.Write(file.Payload); err != nil { + panic(err) + } + hasherAndOffset.offset += uint64(len(file.Payload)) + } else { + state.Ctx.LogE( + "sp-file", lesp, + errors.New("offset differs"), + "deleting hasher", + ) + delete(state.fileHashers, filePath) + hasherExists = false + } + } ourSize := int64(file.Offset + uint64(len(file.Payload))) lesp[len(lesp)-1].V = ourSize fullsize := int64(0) @@ -982,51 +1113,71 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { Progress("Rx", lesp) } if fullsize != ourSize { - fd.Close() // #nosec G104 continue } - <-spCheckerToken - go func() { - defer func() { - spCheckerToken <- struct{}{} - }() - if err := fd.Sync(); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") - fd.Close() // #nosec G104 - return - } - state.wg.Add(1) - defer state.wg.Done() - if _, err = fd.Seek(0, io.SeekStart); err != nil { - fd.Close() // #nosec G104 - state.Ctx.LogE("sp-file", lesp, err, "") - return - } - state.Ctx.LogD("sp-file", lesp, "checking") - gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs) - fd.Close() // #nosec G104 - if err != nil || !gut { + err = fd.Sync() + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "sync") + state.closeFd(filePathPart) + continue + } + if hasherExists { + if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 { state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "") - return + continue } - state.Ctx.LogI("sp-done", lesp, "") - if err = os.Rename(filePath+PartSuffix, filePath); err != nil { + if err = os.Rename(filePathPart, filePath); err != nil { state.Ctx.LogE("sp-file", lesp, err, "rename") - return + continue } if err = DirSync(dirToSync); err != nil { state.Ctx.LogE("sp-file", lesp, err, "sync") - return + continue } - state.Lock() - delete(state.infosTheir, *file.Hash) - state.Unlock() + state.Ctx.LogI("sp-file", lesp, "done") state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) state.wg.Done() }() - }() + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + if !state.Ctx.HdrUsage { + state.closeFd(filePathPart) + continue + } + if _, err = fd.Seek(0, io.SeekStart); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "seek") + state.closeFd(filePathPart) + continue + } + _, pktEncRaw, err := state.Ctx.HdrRead(fd) + state.closeFd(filePathPart) + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "HdrRead") + continue + } + state.Ctx.HdrWrite(pktEncRaw, filePath) + continue + } + state.closeFd(filePathPart) + if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "rename") + continue + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "sync") + continue + } + state.Ctx.LogI("sp-file", lesp, "downloaded") + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + if !state.NoCK { + state.checkerQueues.appeared <- file.Hash + } + case SPTypeDone: lesp := append(les, LE{"Type", "done"}) state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") @@ -1036,19 +1187,23 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return nil, err } lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])}) + lesp = append(lesp, LE{"XX", string(TTx)}) state.Ctx.LogD("sp-done", lesp, "removing") - err := os.Remove(filepath.Join( + pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), Base32Codec.EncodeToString(done.Hash[:]), - )) - lesp = append(lesp, LE{"XX", string(TTx)}) - if err == nil { + ) + if err = os.Remove(pth); err == nil { state.Ctx.LogI("sp-done", lesp, "") + if state.Ctx.HdrUsage { + os.Remove(pth + HdrSuffix) + } } else { state.Ctx.LogE("sp-done", lesp, err, "") } + case SPTypeFreq: lesp := append(les, LE{"Type", "freq"}) state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") @@ -1081,6 +1236,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } else { state.Ctx.LogD("sp-process", lesp, "unknown") } + default: state.Ctx.LogE( "sp-process", diff --git a/src/tmp.go b/src/tmp.go index e1e9127..b99ef3d 100644 --- a/src/tmp.go +++ b/src/tmp.go @@ -92,6 +92,10 @@ func DirSync(dirPath string) error { return fd.Close() } +func (tmp *TmpFileWHash) Checksum() string { + return Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) +} + func (tmp *TmpFileWHash) Commit(dir string) error { var err error if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil { @@ -108,7 +112,7 @@ func (tmp *TmpFileWHash) Commit(dir string) error { if err = tmp.Fd.Close(); err != nil { return err } - checksum := Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) + checksum := tmp.Checksum() tmp.ctx.LogD("tmp", LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, "commit") if err = os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)); err != nil { return err diff --git a/src/toss.go b/src/toss.go index cc49784..576abb1 100644 --- a/src/toss.go +++ b/src/toss.go @@ -83,23 +83,24 @@ func (ctx *Ctx) Toss( } defer decompressor.Close() for job := range ctx.Jobs(nodeId, TRx) { - pktName := filepath.Base(job.Fd.Name()) + pktName := filepath.Base(job.Path) les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}} if job.PktEnc.Nice > nice { ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice") - job.Fd.Close() // #nosec G104 continue } + fd, err := os.Open(job.Path) + if err != nil { + ctx.LogE("rx", les, err, "open") + isBad = true + continue + } + pipeR, pipeW := io.Pipe() go func(job Job) error { pipeWB := bufio.NewWriter(pipeW) - _, _, err := PktEncRead( - ctx.Self, - ctx.Neigh, - bufio.NewReader(job.Fd), - pipeWB, - ) - job.Fd.Close() // #nosec G104 + _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB) + fd.Close() // #nosec G104 if err != nil { return pipeW.CloseWithError(err) } @@ -109,7 +110,6 @@ func (ctx *Ctx) Toss( return pipeW.Close() }(job) var pkt Pkt - var err error var pktSize int64 var pktSizeBlocks int64 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil { @@ -197,13 +197,15 @@ func (ctx *Ctx) Toss( ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } } case PktTypeFile: @@ -293,13 +295,15 @@ func (ctx *Ctx) Toss( ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } if len(sendmail) > 0 && ctx.NotifyFile != nil { cmd := exec.Command( @@ -362,13 +366,15 @@ func (ctx *Ctx) Toss( ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } if len(sendmail) > 0 && ctx.NotifyFreq != nil { cmd := exec.Command( @@ -408,13 +414,15 @@ func (ctx *Ctx) Toss( ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } } default: diff --git a/src/toss_test.go b/src/toss_test.go index 8a16dd9..807c49a 100644 --- a/src/toss_test.go +++ b/src/toss_test.go @@ -383,7 +383,12 @@ func TestTossFreq(t *testing.T) { } for job := range ctx.Jobs(ctx.Self.Id, TTx) { var buf bytes.Buffer - _, _, err := PktEncRead(ctx.Self, ctx.Neigh, job.Fd, &buf) + fd, err := os.Open(job.Path) + if err != nil { + t.Error(err) + return false + } + _, _, err = PktEncRead(ctx.Self, ctx.Neigh, fd, &buf) if err != nil { t.Error(err) return false @@ -454,7 +459,7 @@ func TestTossTrns(t *testing.T) { } copy(pktTrans.Path[:], nodeOur.Id[:]) var dst bytes.Buffer - if err := PktEncWrite( + if _, err := PktEncWrite( ctx.Self, ctx.Neigh[*nodeOur.Id], &pktTrans, diff --git a/src/tx.go b/src/tx.go index c89957b..3e57f9b 100644 --- a/src/tx.go +++ b/src/tx.go @@ -62,7 +62,9 @@ func (ctx *Ctx) Tx( } expectedSize := size for i := 0; i < len(hops); i++ { - expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize) + expectedSize = PktEncOverhead + + PktSizeOverhead + + sizeWithTags(PktOverhead+expectedSize) } padSize := minSize - expectedSize if padSize < 0 { @@ -79,16 +81,23 @@ func (ctx *Ctx) Tx( errs := make(chan error) curSize := size pipeR, pipeW := io.Pipe() + var pktEncRaw []byte go func(size int64, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", LEs{ {"Node", hops[0].Id}, {"Nice", int(nice)}, {"Size", size}, }, "wrote") - errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) + pktEncRaw, err = PktEncWrite( + ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, + ) + errs <- err dst.Close() // #nosec G104 }(curSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize + curSize = PktEncOverhead + + PktSizeOverhead + + sizeWithTags(PktOverhead+curSize) + + padSize var pipeRPrev io.Reader for i := 1; i < len(hops); i++ { @@ -101,7 +110,8 @@ func (ctx *Ctx) Tx( {"Nice", int(nice)}, {"Size", size}, }, "trns wrote") - errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + errs <- err dst.Close() // #nosec G104 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW) curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) @@ -124,6 +134,12 @@ func (ctx *Ctx) Tx( nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104 + if err != nil { + return lastNode, err + } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) + } return lastNode, err } diff --git a/src/tx_test.go b/src/tx_test.go index ab0c1c6..7e07a05 100644 --- a/src/tx_test.go +++ b/src/tx_test.go @@ -98,9 +98,13 @@ func TestTx(t *testing.T) { return false } txJob := sentJobs[0] - defer txJob.Fd.Close() + fd, err := os.Open(txJob.Path) + if err != nil { + panic(err) + } + defer fd.Close() var bufR bytes.Buffer - if _, err = io.Copy(&bufR, txJob.Fd); err != nil { + if _, err = io.Copy(&bufR, fd); err != nil { panic(err) } var bufW bytes.Buffer